This is engine that schedules tasks to workers – separate processes dedicated for certain atoms execution, possibly running on other machines, connected via amqp (or other supported kombu transports).
Note
This engine is under active development and is usable and does work but is missing some features (please check the blueprint page for known issues and plans) that will make it more production ready.
There are two communication sides, the executor (and associated engine derivative) and worker that communicate using a proxy component. The proxy is designed to accept/publish messages from/into a named exchange.
Let’s consider how communication between an executor and a worker happens. First of all an engine resolves all atoms dependencies and schedules atoms that can be performed at the moment. This uses the same scheduling and dependency resolution logic that is used for every other engine type. Then the atoms which can be executed immediately (ones that are dependent on outputs of other tasks will be executed when that output is ready) are executed by the worker-based engine executor in the following manner:
Proxy
publishes task
request (format is described below) into a named exchange using a routing
key that is used to deliver request to particular workers topic. The
executor then waits for the task requests to be accepted and confirmed by
workers. If the executor doesn’t get a task confirmation from workers within
the given timeout the task is considered as timed-out and a timeout
exception is raised.failure
object
that contains what has failed (and why).PENDING
to the RUNNING
state. Once a
task request is in the RUNNING
state it can’t be timed-out (considering
that the task execution process may take an unpredictable amount of time).Note
Failure
objects are not directly
json-serializable (they contain references to tracebacks which are not
serializable), so they are converted to dicts before sending and converted
from dicts after receiving on both executor & worker sides (this
translation is lossy since the traceback can’t be fully retained, due
to its contents containing internal interpreter references and
details).
taskflow.engines.worker_based.protocol.
make_an_event
(new_state)[source]¶Turns a new/target state into an event name.
taskflow.engines.worker_based.protocol.
build_a_machine
(freeze=True)[source]¶Builds a state machine that requests are allowed to go through.
taskflow.engines.worker_based.protocol.
failure_to_dict
(failure)[source]¶Attempts to convert a failure object into a jsonifyable dictionary.
taskflow.engines.worker_based.protocol.
Message
[source]¶Bases: object
Base class for all message types.
taskflow.engines.worker_based.protocol.
Notify
(**data)[source]¶Bases: taskflow.engines.worker_based.protocol.Message
Represents notify message type.
TYPE
= 'NOTIFY'¶String constant representing this message type.
RESPONSE_SCHEMA
= {'additionalProperties': False, 'properties': {'topic': {'type': 'string'}, 'tasks': {'items': {'type': 'string'}, 'type': 'array'}}, 'required': ['topic', 'tasks'], 'type': 'object'}¶Expected notify response message schema (in json schema format).
SENDER_SCHEMA
= {'additionalProperties': False, 'type': 'object'}¶Expected sender request message schema (in json schema format).
taskflow.engines.worker_based.protocol.
Request
(task, uuid, action, arguments, timeout=60, result=<object object>, failures=None)[source]¶Bases: taskflow.engines.worker_based.protocol.Message
Represents request with execution results.
Every request is created in the WAITING state and is expired within the given timeout if it does not transition out of the (WAITING, PENDING) states.
State machine a request goes through as it progresses (or expires):
+------------+------------+---------+----------+---------+
| Start | Event | End | On Enter | On Exit |
+------------+------------+---------+----------+---------+
| FAILURE[$] | . | . | . | . |
| PENDING | on_failure | FAILURE | . | . |
| PENDING | on_running | RUNNING | . | . |
| RUNNING | on_failure | FAILURE | . | . |
| RUNNING | on_success | SUCCESS | . | . |
| SUCCESS[$] | . | . | . | . |
| WAITING[^] | on_failure | FAILURE | . | . |
| WAITING[^] | on_pending | PENDING | . | . |
+------------+------------+---------+----------+---------+
TYPE
= 'REQUEST'¶String constant representing this message type.
SCHEMA
= {'properties': {'task_name': {'type': 'string'}, 'result': {}, 'task_cls': {'type': 'string'}, 'task_version': {'oneOf': [{'type': 'string'}, {'type': 'array'}]}, 'action': {'enum': ['execute', 'revert'], 'type': 'string'}, 'arguments': {'type': 'object'}, 'failures': {'type': 'object'}}, 'required': ['task_cls', 'task_name', 'task_version', 'action'], 'type': 'object'}¶Expected message schema (in json schema format).
current_state
¶Current state the request is in.
expired
¶Check if request has expired.
When new request is created its state is set to the WAITING, creation time is stored and timeout is given via constructor arguments.
Request is considered to be expired when it is in the WAITING/PENDING state for more then the given timeout (it is not considered to be expired in any other state).
to_dict
()[source]¶Return json-serializable request.
To convert requests that have failed due to some exception this will convert all failure.Failure objects into dictionaries (which will then be reconstituted by the receiver).
transition_and_log_error
(new_state, logger=None)[source]¶Transitions and logs an error if that transitioning raises.
This overlays the transition function and performs nearly the same functionality but instead of raising if the transition was not valid it logs a warning to the provided logger and returns False to indicate that the transition was not performed (note that this is different from the transition function where False means ignored).
taskflow.engines.worker_based.protocol.
Response
(state, **data)[source]¶Bases: taskflow.engines.worker_based.protocol.Message
Represents response message type.
TYPE
= 'RESPONSE'¶String constant representing this message type.
SCHEMA
= {'additionalProperties': False, 'definitions': {'completion': {'additionalProperties': False, 'properties': {'result': {}}, 'type': 'object', 'required': ['result']}, 'event': {'additionalProperties': False, 'properties': {'event_type': {'type': 'string'}, 'details': {'type': 'object'}}, 'type': 'object', 'required': ['event_type', 'details']}, 'empty': {'additionalProperties': False, 'type': 'object'}}, 'properties': {'state': {'enum': ['WAITING', 'PENDING', 'RUNNING', 'SUCCESS', 'FAILURE', 'EVENT'], 'type': 'string'}, 'data': {'anyOf': [{'$ref': '#/definitions/event'}, {'$ref': '#/definitions/completion'}, {'$ref': '#/definitions/empty'}]}}, 'required': ['state', 'data'], 'type': 'object'}¶Expected message schema (in json schema format).
Failure
) [passed to revert only]Additionally, the following parameters are added to the request message:
Example:
{
"action": "execute",
"arguments": {
"x": 111
},
"task_cls": "taskflow.tests.utils.TaskOneArgOneReturn",
"task_name": "taskflow.tests.utils.TaskOneArgOneReturn",
"task_version": [
1,
0
]
}
When reverting:
{
"action": "revert",
"arguments": {},
"failures": {
"taskflow.tests.utils.TaskWithFailure": {
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
},
"result": [
"failure",
{
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
],
"task_cls": "taskflow.tests.utils.TaskWithFailure",
"task_name": "taskflow.tests.utils.TaskWithFailure",
"task_version": [
1,
0
]
}
When running:
{
"data": {},
"state": "RUNNING"
}
When progressing:
{
"details": {
"progress": 0.5
},
"event_type": "update_progress",
"state": "EVENT"
}
When succeeded:
{
"data": {
"result": 666
},
"state": "SUCCESS"
}
When failed:
{
"data": {
"result": {
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
},
"state": "FAILURE"
}
WAITING - Request placed on queue (or other kombu message bus/transport) but not yet consumed.
PENDING - Worker accepted request and is pending to run using its executor (threads, processes, or other).
FAILURE - Worker failed after running request (due to task exception) or
no worker moved/started executing (by placing the request into RUNNING
state) with-in specified time span (this defaults to 60 seconds unless
overridden).
RUNNING - Workers executor (using threads, processes…) has started to run requested task (once this state is transitioned to any request timeout no longer becomes applicable; since at this point it is unknown how long a task will run since it can not be determined if a task is just taking a long time or has failed).
SUCCESS - Worker finished running task without exception.
Note
During the WAITING
and PENDING
stages the engine keeps track
of how long the request has been alive for and if a timeout is reached
the request will automatically transition to FAILURE
and any further
transitions from a worker will be disallowed (for example, if a worker
accepts the request in the future and sets the task to PENDING
this
transition will be logged and ignored). This timeout can be adjusted and/or
removed by setting the engine transition_timeout
option to a
higher/lower value or by setting it to None
(to remove the timeout
completely). In the future this will be improved to be more dynamic
by implementing the blueprints associated with failover and
info/resilence.
To use the worker based engine a set of workers must first be established on remote machines. These workers must be provided a list of task objects, task names, modules names (or entrypoints that can be examined for valid tasks) they can respond to (this is done so that arbitrary code execution is not possible).
For complete parameters and object usage please visit
Worker
.
Example:
from taskflow.engines.worker_based import worker as w
config = {
'url': 'amqp://guest:guest@localhost:5672//',
'exchange': 'test-exchange',
'topic': 'test-tasks',
'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
}
worker = w.Worker(**config)
worker.run()
To use the worker based engine a flow must be constructed (which contains tasks that are visible on remote machines) and the specific worker based engine entrypoint must be selected. Certain configuration options must also be provided so that the transport backend can be configured and initialized correctly. Otherwise the usage should be mostly transparent (and is nearly identical to using any other engine type).
For complete parameters and object usage please see
WorkerBasedActionEngine
.
Example with amqp transport:
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
url='amqp://guest:guest@localhost:5672//',
exchange='test-exchange',
topics=['topic1', 'topic2'])
eng.run()
Example with filesystem transport:
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
exchange='test-exchange',
topics=['topic1', 'topic2'],
transport='filesystem',
transport_options={
'data_folder_in': '/tmp/in',
'data_folder_out': '/tmp/out',
})
eng.run()
Additional supported keyword arguments:
executor
: a class that provides a
WorkerTaskExecutor
interface; it will be used for executing, reverting and waiting for remote
tasks.taskflow.engines.worker_based.engine.
WorkerBasedActionEngine
(flow, flow_detail, backend, options)[source]¶Bases: taskflow.engines.action_engine.engine.ActionEngine
Worker based action engine.
Specific backend options (extracted from provided engine options):
Parameters: |
|
---|
Warning
External usage of internal engine functions, components and modules should be kept to a minimum as they may be altered, refactored or moved to other locations without notice (and without the typical deprecation cycle).
taskflow.engines.worker_based.dispatcher.
Handler
(process_message, validator=None)[source]¶Bases: object
Component(s) that will be called on reception of messages.
process_message
¶Main callback that is called to process a received message.
This is only called after the format has been validated (using
the validator
callback if applicable) and only after the message
has been acknowledged.
validator
¶Optional callback that will be activated before processing.
This callback if present is expected to validate the message and
raise InvalidFormat
if the message
is not valid.
taskflow.engines.worker_based.dispatcher.
TypeDispatcher
(type_handlers=None, requeue_filters=None)[source]¶Bases: object
Receives messages and dispatches to type specific handlers.
type_handlers
¶Dictionary of message type -> callback to handle that message.
The callback(s) will be activated by looking for a message property ‘type’ and locating a callback in this dictionary that maps to that type; if one is found it is expected to be a callback that accepts two positional parameters; the first being the message data and the second being the message object. If a callback is not found then the message is rejected and it will be up to the underlying message transport to determine what this means/implies…
requeue_filters
¶List of filters (callbacks) to request a message to be requeued.
The callback(s) will be activated before the message has been acked and it can be used to instruct the dispatcher to requeue the message instead of processing it. The callback, when called, will be provided two positional parameters; the first being the message data and the second being the message object. Using these provided parameters the filter should return a truthy object if the message should be requeued and a falsey object if it should not.
taskflow.engines.worker_based.endpoint.
Endpoint
(task_cls)[source]¶Bases: object
Represents a single task with execute/revert methods.
taskflow.engines.worker_based.executor.
WorkerTaskExecutor
(uuid, exchange, topics, transition_timeout=60, url=None, transport=None, transport_options=None, retry_options=None, worker_expiry=60)[source]¶Bases: taskflow.engines.action_engine.executor.TaskExecutor
Executes tasks on remote workers.
revert_task
(task, task_uuid, arguments, result, failures, progress_callback=None)[source]¶Schedules task reversion.
wait_for_workers
(workers=1, timeout=None)[source]¶Waits for geq workers to notify they are ready to do work.
NOTE(harlowja): if a timeout is provided this function will wait until that timeout expires, if the amount of workers does not reach the desired amount of workers before the timeout expires then this will return how many workers are still needed, otherwise it will return zero.
taskflow.engines.worker_based.proxy.
Proxy
(topic, exchange, type_handlers=None, on_wait=None, url=None, transport=None, transport_options=None, retry_options=None)[source]¶Bases: object
A proxy processes messages from/to the named exchange.
For internal usage only (not for public consumption).
DEFAULT_RETRY_OPTIONS
= {'interval_max': 1, 'interval_start': 1, 'interval_step': 1, 'max_retries': 3}¶Settings used (by default) to reconnect under transient failures.
See: http://kombu.readthedocs.org/ (and connection ensure_options
) for
what these values imply/mean…
dispatcher
¶Dispatcher internally used to dispatch message(s) that match.
connection_details
¶Details about the connection (read-only).
is_running
¶Return whether the proxy is running.
taskflow.engines.worker_based.worker.
Worker
(exchange, topic, tasks, executor=None, threads_count=None, url=None, transport=None, transport_options=None, retry_options=None)[source]¶Bases: object
Worker that can be started on a remote host for handling tasks requests.
Parameters: |
|
---|
A banner that can be useful to display before running.
taskflow.engines.worker_based.types.
TopicWorker
(topic, tasks, identity=<object object>)[source]¶Bases: object
A (read-only) worker and its relevant information + useful methods.
taskflow.engines.worker_based.types.
ProxyWorkerFinder
(uuid, proxy, topics, beat_periodicity=5, worker_expiry=60)[source]¶Bases: object
Requests and receives responses about workers topic+task details.
total_workers
¶Number of workers currently known.
wait_for_workers
(workers=1, timeout=None)[source]¶Waits for geq workers to notify they are ready to do work.
NOTE(harlowja): if a timeout is provided this function will wait until that timeout expires, if the amount of workers does not reach the desired amount of workers before the timeout expires then this will return how many workers are still needed, otherwise it will return zero.
messages_processed
¶How many notify response messages have been processed.
maybe_publish
()[source]¶Periodically called to publish notify message to each topic.
These messages (especially the responses) are how this find learns about workers and what tasks they can perform (so that we can then match workers to tasks to run).
Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.