Notifications and listeners¶
Overview¶
Engines provide a way to receive notification on task and flow state transitions (see states), which is useful for monitoring, logging, metrics, debugging and plenty of other tasks.
To receive these notifications you should register a callback with
an instance of the Notifier
class that is attached to Engine
attributes atom_notifier
and notifier
.
TaskFlow also comes with a set of predefined listeners, and provides means to write your own listeners, which can be more convenient than using raw callbacks.
Receiving notifications with callbacks¶
Flow notifications¶
To receive notification on flow state changes use the
Notifier
instance available as the
notifier
property of an engine.
A basic example is:
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def flow_transition(state, details):
... print("Flow '%s' transition to state %s" % (details['flow_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
... CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.notifier.register(ANY, flow_transition)
>>> eng.run()
Flow 'cat-dog' transition to state RUNNING
meow
woof
Flow 'cat-dog' transition to state SUCCESS
Task notifications¶
To receive notification on task state changes use the
Notifier
instance available as the
atom_notifier
property of an engine.
A basic example is:
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>> def task_transition(state, details):
... print("Task '%s' transition to state %s" % (details['task_name'], state))
...
>>>
>>> flo = linear_flow.Flow("cat-dog")
>>> flo.add(CatTalk(), DogTalk(provides="dog"))
<taskflow.patterns.linear_flow.Flow object at 0x...>
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> eng.atom_notifier.register(ANY, task_transition)
>>> eng.run()
Task 'CatTalk' transition to state RUNNING
meow
Task 'CatTalk' transition to state SUCCESS
Task 'DogTalk' transition to state RUNNING
woof
Task 'DogTalk' transition to state SUCCESS
Listeners¶
TaskFlow comes with a set of predefined listeners – helper classes that can be used to do various actions on flow and/or tasks transitions. You can also create your own listeners easily, which may be more convenient than using raw callbacks for some use cases.
For example, this is how you can use
PrintingListener
:
>>> from taskflow.listeners import printing
>>> class CatTalk(task.Task):
... def execute(self, meow):
... print(meow)
... return "cat"
...
>>> class DogTalk(task.Task):
... def execute(self, woof):
... print(woof)
... return 'dog'
...
>>>
>>> flo = linear_flow.Flow("cat-dog").add(
... CatTalk(), DogTalk(provides="dog"))
>>> eng = engines.load(flo, store={'meow': 'meow', 'woof': 'woof'})
>>> with printing.PrintingListener(eng):
... eng.run()
...
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'RUNNING' from state 'PENDING'
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'RUNNING' from state 'PENDING'
meow
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'CatTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'cat' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'RUNNING' from state 'PENDING'
woof
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved task 'DogTalk' (...) into state 'SUCCESS' from state 'RUNNING' with result 'dog' (failure=False)
<taskflow.engines.action_engine.engine.SerialActionEngine object at ...> has moved flow 'cat-dog' (...) into state 'SUCCESS' from state 'RUNNING'
Interfaces¶
- taskflow.listeners.base.FINISH_STATES = ('FAILURE', 'SUCCESS', 'REVERTED', 'REVERT_FAILURE')¶
These states will results be usable, other states do not produce results.
- taskflow.listeners.base.DEFAULT_LISTEN_FOR = ('*',)¶
What is listened for by default…
- class taskflow.listeners.base.Listener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]¶
Bases:
object
Base class for listeners.
A listener can be attached to an engine to do various actions on flow and atom state transitions. It implements the context manager protocol to be able to register and unregister with a given engine automatically when a context is entered and when it is exited.
To implement a listener, derive from this class and override
_flow_receiver
and/or_task_receiver
and/or_retry_receiver
methods (in this class, they do nothing).
- class taskflow.listeners.base.DumpingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]¶
Bases:
Listener
Abstract base class for dumping listeners.
This provides a simple listener that can be attached to an engine which can be derived from to dump task and/or flow state transitions to some target backend.
To implement your own dumping listener derive from this class and override the
_dump
method.
Implementations¶
Printing and logging listeners¶
- class taskflow.listeners.logging.LoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, level=10)[source]¶
Bases:
DumpingListener
Listener that logs notifications it receives.
It listens for task and flow notifications and writes those notifications to a provided logger, or logger of its module (
taskflow.listeners.logging
) if none is provided (and no class attribute is overridden). The log level can also be configured,logging.DEBUG
is used by default when none is provided.
- class taskflow.listeners.logging.DynamicLoggingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), log=None, failure_level=30, level=10, hide_inputs_outputs_of=(), fail_formatter=None)[source]¶
Bases:
Listener
Listener that logs notifications it receives.
It listens for task and flow notifications and writes those notifications to a provided logger, or logger of its module (
taskflow.listeners.logging
) if none is provided (and no class attribute is overridden). The log level can slightly be configured andlogging.DEBUG
orlogging.WARNING
(unless overridden via a constructor parameter) will be selected automatically based on the execution state and results produced.The following flow states cause
logging.WARNING
(or provided level) to be used:states.FAILURE
states.REVERTED
The following task states cause
logging.WARNING
(or provided level) to be used:states.FAILURE
states.RETRYING
states.REVERTING
states.REVERT_FAILURE
When a task produces a
Failure
object as its result (typically this happens when a task raises an exception) this will always switch the logger to uselogging.WARNING
(if the failure object contains aexc_info
tuple this will also be logged to provide a meaningful traceback).
- class taskflow.listeners.printing.PrintingListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), stderr=False)[source]¶
Bases:
DumpingListener
Writes the task and flow notifications messages to stdout or stderr.
Timing listeners¶
- class taskflow.listeners.timing.DurationListener(engine)[source]¶
Bases:
Listener
Listener that captures task duration.
It records how long a task took to execute (or fail) to storage. It saves the duration in seconds as float value to task metadata with key
'duration'
.
- class taskflow.listeners.timing.PrintingDurationListener(engine, printer=None)[source]¶
Bases:
DurationListener
Listener that prints the duration as well as recording it.
- class taskflow.listeners.timing.EventTimeListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',))[source]¶
Bases:
Listener
Listener that captures task, flow, and retry event timestamps.
It records how when an event is received (using unix time) to storage. It saves the timestamps under keys (in atom or flow details metadata) of the format
{event}-timestamp
whereevent
is the state/event name that has been received.This information can be later extracted/examined to derive durations…
Claim listener¶
- class taskflow.listeners.claims.CheckingClaimListener(engine, job, board, owner, on_job_loss=None)[source]¶
Bases:
Listener
Listener that interacts [engine, job, jobboard]; ensures claim is valid.
This listener (or a derivative) can be associated with an engines notification system after the job has been claimed (so that the jobs work can be worked on by that engine). This listener (after associated) will check that the job is still claimed whenever the engine notifies of a task or flow state change. If the job is not claimed when a state change occurs, a associated handler (or the default) will be activated to determine how to react to this hopefully exceptional case.
NOTE(harlowja): this may create more traffic than desired to the jobboard backend (zookeeper or other), since the amount of state change per task and flow is non-zero (and checking during each state change will result in quite a few calls to that management system to check the jobs claim status); this could be later optimized to check less (or only check on a smaller set of states)
NOTE(harlowja): if a custom
on_job_loss
callback is provided it must accept three positional arguments, the first being the current engine being ran, the second being the ‘task/flow’ state and the third being the details that were sent from the engine to listeners for inspection.
Capturing listener¶
- class taskflow.listeners.capturing.CaptureListener(engine, task_listen_for=('*',), flow_listen_for=('*',), retry_listen_for=('*',), capture_flow=True, capture_task=True, capture_retry=True, skip_tasks=None, skip_retries=None, skip_flows=None, values=None)[source]¶
Bases:
Listener
A listener that captures transitions and saves them locally.
NOTE(harlowja): this listener is mainly useful for testing (where it is useful to test the appropriate/expected transitions, produced results… occurred after engine running) but it could have other usages as well.
- Variables:
values – Captured transitions + details (the result of the
_format_capture()
method) are stored into this list (a previous list to append to may be provided using the constructor keyword argument of the same name); by default this stores tuples of the format(kind, state, details)
.
- FLOW = 'flow'¶
Kind that denotes a ‘flow’ capture.
- TASK = 'task'¶
Kind that denotes a ‘task’ capture.
- RETRY = 'retry'¶
Kind that denotes a ‘retry’ capture.
Formatters¶
- class taskflow.formatters.FailureFormatter(engine, hide_inputs_outputs_of=())[source]¶
Bases:
object
Formats a failure and connects it to associated atoms & engine.
- format(fail, atom_matcher)[source]¶
Returns a (exc_info, details) tuple about the failure.
The
exc_info
tuple should be a standard three element (exctype, value, traceback) tuple that will be used for further logging. A non-empty string is typically returned fordetails
; it should contain any string info about the failure (with any specific details theexc_info
may not have/contain).