Notification Listener¶
A notification listener is used to process notification messages sent by a
notifier that uses the messaging
driver.
A notification listener subscribes to the topic - and optionally exchange - in the supplied target. Notification messages sent by notifier clients to the target’s topic/exchange are received by the listener.
If multiple listeners subscribe to the same target, the notification will be received by only one of the listeners. The receiving listener is selected from the group using a best-effort round-robin algorithm.
This delivery pattern can be altered somewhat by specifying a pool name for the listener. Listeners with the same pool name behave like a subgroup within the group of listeners subscribed to the same topic/exchange. Each subgroup of listeners will receive a copy of the notification to be consumed by one member of the subgroup. Therefore, multiple copies of the notification will be delivered - one to the group of listeners that have no pool name (if they exist), and one to each subgroup of listeners that share the same pool name.
Note that not all transport drivers have implemented support for listener pools. Those drivers that do not support pools will raise a NotImplementedError if a pool name is specified to get_notification_listener().
A notification listener exposes a number of endpoints, each of which contain a
set of methods. Each method’s name corresponds to a notification’s priority.
When a notification is received it is dispatched to the method named like the
notification’s priority - e.g. info
notifications are dispatched to the
info() method, etc.
Optionally a notification endpoint can define a NotificationFilter. Notification messages that do not match the filter’s rules will not be passed to the endpoint’s methods.
Parameters to endpoint methods are: the request context supplied by the client, the publisher_id of the notification message, the event_type, the payload and metadata. The metadata parameter is a mapping containing a unique message_id and a timestamp.
An endpoint method can explicitly return oslo_messaging.NotificationResult.HANDLED to acknowledge a message or oslo_messaging.NotificationResult.REQUEUE to requeue the message. Note that not all transport drivers implement support for requeueing. In order to use this feature, applications should assert that the feature is available by passing allow_requeue=True to get_notification_listener(). If the driver does not support requeueing, it will raise NotImplementedError at this point.
The message is acknowledged only if all endpoints either return oslo_messaging.NotificationResult.HANDLED or None.
Each notification listener is associated with an executor which controls how incoming notification messages will be received and dispatched. Refer to the Executor documentation for descriptions of the other types of executors.
Note: If the “eventlet” executor is used, the threading and time library need to be monkeypatched.
Notification listener have start(), stop() and wait() messages to begin handling requests, stop handling requests, and wait for all in-process requests to complete after the listener has been stopped.
To create a notification listener, you supply a transport, list of targets and a list of endpoints.
A transport can be obtained simply by calling the get_notification_transport() method:
transport = messaging.get_notification_transport(conf)
which will load the appropriate transport driver according to the user’s messaging configuration. See get_notification_transport() for more details.
A simple example of a notification listener with multiple endpoints might be:
from oslo_config import cfg
import oslo_messaging
class NotificationEndpoint(object):
filter_rule = oslo_messaging.NotificationFilter(
publisher_id='^compute.*')
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
class ErrorEndpoint(object):
filter_rule = oslo_messaging.NotificationFilter(
event_type='^instance\..*\.start$',
context={'ctxt_key': 'regexp'})
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [
oslo_messaging.Target(topic='notifications'),
oslo_messaging.Target(topic='notifications_bis')
]
endpoints = [
NotificationEndpoint(),
ErrorEndpoint(),
]
pool = "listener-workers"
server = oslo_messaging.get_notification_listener(transport, targets,
endpoints, pool=pool)
server.start()
server.wait()
By supplying a serializer object, a listener can deserialize a request context and arguments from primitive types.
- oslo_messaging.get_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None, allow_requeue=False, pool=None)¶
Construct a notification listener
The executor parameter controls how incoming messages will be received and dispatched.
If the eventlet executor is used, the threading and time library need to be monkeypatched.
- Parameters:
transport (Transport) – the messaging transport
targets (list of Target) – the exchanges and topics to listen on
endpoints (list) – a list of endpoint objects
executor (str) – name of message executor - available values are ‘eventlet’ and ‘threading’
serializer (Serializer) – an optional entity serializer
allow_requeue (bool) – whether NotificationResult.REQUEUE support is needed
pool (str) – the pool name
- Raises:
NotImplementedError
- oslo_messaging.get_batch_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None, allow_requeue=False, pool=None, batch_size=None, batch_timeout=None)¶
Construct a batch notification listener
The executor parameter controls how incoming messages will be received and dispatched.
If the eventlet executor is used, the threading and time library need to be monkeypatched.
- Parameters:
transport (Transport) – the messaging transport
targets (list of Target) – the exchanges and topics to listen on
endpoints (list) – a list of endpoint objects
executor (str) – name of message executor - available values are ‘eventlet’ and ‘threading’
serializer (Serializer) – an optional entity serializer
allow_requeue (bool) – whether NotificationResult.REQUEUE support is needed
pool (str) – the pool name
batch_size (int) – number of messages to wait before calling endpoints callacks
batch_timeout (int) – number of seconds to wait before calling endpoints callacks
- Raises:
NotImplementedError