mistral.engine package
Submodules
mistral.engine.action_handler module
mistral.engine.actions module
-
class mistral.engine.actions.Action(action_def, action_ex=None, task_ex=None)
Bases: object
Action.
Represents a workflow action and defines interface that can be used by
Mistral engine or its components in order to manipulate with actions.
-
complete(result)
Complete action and process its result.
Parameters: | result – Action result. |
-
fail(msg)
-
is_sync(input_dict)
Determines if action is synchronous.
Parameters: | input_dict – Dictionary with input parameters. |
-
run(input_dict, target, index=0, desc='', save=True, safe_rerun=False)
Immediately run action.
This method runs method w/o scheduling its run for a later time.
From engine perspective action will be processed in synchronous
mode.
Parameters: |
- input_dict – Action input.
- target – Target (group of action executors).
- index – Action execution index. Makes sense for some types.
- desc – Action execution description.
- save – True if action execution object needs to be saved.
- safe_rerun – If true, action would be re-run if executor dies
during execution.
|
Returns: | Action output.
|
-
schedule(input_dict, target, index=0, desc='', safe_rerun=False)
Schedule action run.
This method is needed to schedule action run so its result can
be received later by engine. In this sense it will be running in
asynchronous mode from engine perspective (don’t confuse with
executor asynchrony when executor doesn’t immediately send a
result).
Parameters: |
- input_dict – Action input.
- target – Target (group of action executors).
- index – Action execution index. Makes sense for some types.
- desc – Action execution description.
- safe_rerun – If true, action would be re-run if executor dies
during execution.
|
-
validate_input(input_dict)
Validates action input parameters.
Parameters: | input_dict – Dictionary with input parameters. |
-
class mistral.engine.actions.AdHocAction(action_def, action_ex=None, task_ex=None)
Bases: mistral.engine.actions.PythonAction
Ad-hoc action.
-
validate_input(input_dict)
-
class mistral.engine.actions.PythonAction(action_def, action_ex=None, task_ex=None)
Bases: mistral.engine.actions.Action
Regular Python action.
-
complete(*args, **kwargs)
-
is_sync(input_dict)
-
run(*args, **kwargs)
-
schedule(*args, **kwargs)
-
validate_input(input_dict)
-
class mistral.engine.actions.WorkflowAction(action_def, action_ex=None, task_ex=None)
Bases: mistral.engine.actions.Action
Workflow action.
-
complete(*args, **kwargs)
-
is_sync(input_dict)
-
run(*args, **kwargs)
-
schedule(*args, **kwargs)
-
validate_input(input_dict)
mistral.engine.base module
-
class mistral.engine.base.Engine
Bases: object
Engine interface.
-
on_action_complete(action_ex_id, result, wf_action=False, async=False)
Accepts action result and continues the workflow.
Action execution result here is a result which comes from an
action/workflow associated which the task.
:param action_ex_id: Action execution id.
:param result: Action/workflow result. Instance of
mistral.workflow.base.Result
Parameters: |
- wf_action – If True it means that the given id points to
a workflow execution rather than action execution. It happens
when a nested workflow execution sends its result to a parent
workflow.
- async – If True, run action in asynchronous mode (w/o waiting
for completion).
|
Returns: | Action(or workflow if wf_action=True) execution object.
|
-
pause_workflow(wf_ex_id)
Pauses workflow.
Parameters: | wf_ex_id – Execution id. |
Returns: | Workflow execution object. |
-
rerun_workflow(task_ex_id, reset=True, env=None)
Rerun workflow from the specified task.
Parameters: |
- task_ex_id – Task execution id.
- reset – If True, reset task state including deleting its action
executions.
- env – Workflow environment.
|
Returns: | Workflow execution object.
|
-
resume_workflow(wf_ex_id, env=None)
Resumes workflow.
Parameters: |
- wf_ex_id – Execution id.
- env – Workflow environment.
|
Returns: | Workflow execution object.
|
-
rollback_workflow(wf_ex_id)
Rolls back workflow execution.
Parameters: | wf_ex_id – Execution id. |
Returns: | Workflow execution object. |
-
start_action(action_name, action_input, description=None, **params)
Starts the specific action.
Parameters: |
- action_name – Action name.
- action_input – Action input data as a dictionary.
- description – Execution description.
- params – Additional options for action running.
|
Returns: | Action execution object.
|
-
start_workflow(wf_identifier, wf_input, description='', **params)
Starts the specified workflow.
Parameters: |
- wf_identifier – Workflow ID or name. Workflow ID is recommended,
workflow name will be deprecated since Mitaka.
- wf_input – Workflow input data as a dictionary.
- description – Execution description.
- params – Additional workflow type specific parameters.
|
Returns: | Workflow execution object.
|
-
stop_workflow(wf_ex_id, state, message)
Stops workflow.
Parameters: |
- wf_ex_id – Workflow execution id.
- state – State assigned to the workflow. Permitted states are
SUCCESS or ERROR.
- message – Optional information string.
|
Returns: | Workflow execution.
|
-
class mistral.engine.base.EventEngine
Bases: object
Action event trigger interface.
-
create_event_trigger(trigger, events)
-
delete_event_trigger(trigger, events)
-
class mistral.engine.base.Executor
Bases: object
Action executor interface.
-
run_action(action_ex_id, action_class_str, attributes, action_params, safe_rerun, redelivered=False)
Runs action.
Parameters: |
- action_ex_id – Corresponding action execution id.
- action_class_str – Path to action class in dot notation.
- attributes – Attributes of action class which will be set to.
- action_params – Action parameters.
- safe_rerun – Tells if given action can be safely rerun.
- redelivered – Tells if given action was run before on another
executor.
|
-
class mistral.engine.base.TaskPolicy
Bases: object
Task policy.
Provides interface to perform any work after a task has completed.
An example of task policy may be ‘retry’ policy that makes engine
to run a task repeatedly if it finishes with a failure.
-
after_task_complete(task_ex, task_spec)
Called right after task completes.
Parameters: |
- task_ex – Completed task DB model.
- task_spec – Completed task specification.
|
-
before_task_start(task_ex, task_spec)
Called right before task start.
Parameters: |
- task_ex – DB model for task that is about to start.
- task_spec – Task specification.
|
mistral.engine.default_engine module
-
class mistral.engine.default_engine.DefaultEngine(engine_client)
Bases: mistral.engine.base.Engine, mistral.coordination.Service
-
on_action_complete(*args, **kw)
-
pause_workflow(*args, **kw)
-
rerun_workflow(*args, **kw)
-
resume_workflow(*args, **kw)
-
rollback_workflow(*args, **kw)
-
start_action(*args, **kw)
-
start_workflow(*args, **kw)
-
stop_workflow(*args, **kw)
mistral.engine.default_executor module
-
class mistral.engine.default_executor.DefaultExecutor(engine_client)
Bases: mistral.engine.base.Executor, mistral.coordination.Service
-
run_action(*args, **kwargs)
Runs action.
Parameters: |
- action_ex_id – Action execution id.
- action_class_str – Path to action class in dot notation.
- attributes – Attributes of action class which will be set to.
- action_params – Action parameters.
- safe_rerun – Tells if given action can be safely rerun.
- redelivered – Tells if given action was run before on another
executor.
|
mistral.engine.dispatcher module
mistral.engine.policies module
-
class mistral.engine.policies.ConcurrencyPolicy(concurrency)
Bases: mistral.engine.base.TaskPolicy
-
before_task_start(task_ex, task_spec)
-
class mistral.engine.policies.PauseBeforePolicy(expression)
Bases: mistral.engine.base.TaskPolicy
-
before_task_start(task_ex, task_spec)
-
class mistral.engine.policies.RetryPolicy(count, delay, break_on, continue_on)
Bases: mistral.engine.base.TaskPolicy
-
after_task_complete(task_ex, task_spec)
Possible Cases:
- state = SUCCESS
if continue_on is not specified,
no need to move to next iteration;
if current:count achieve retry:count then policy
breaks the loop (regardless on continue-on condition);
otherwise - check continue_on condition and if
it is True - schedule the next iteration,
otherwise policy breaks the loop.
- retry:count = 5, current:count = 2, state = ERROR,
state = IDLE/DELAYED, current:count = 3
3. retry:count = 5, current:count = 4, state = ERROR
Iterations complete therefore state = #{state}, current:count = 4.
-
class mistral.engine.policies.TimeoutPolicy(timeout_sec)
Bases: mistral.engine.base.TaskPolicy
-
before_task_start(task_ex, task_spec)
-
class mistral.engine.policies.WaitAfterPolicy(delay)
Bases: mistral.engine.base.TaskPolicy
-
after_task_complete(task_ex, task_spec)
-
class mistral.engine.policies.WaitBeforePolicy(delay)
Bases: mistral.engine.base.TaskPolicy
-
before_task_start(task_ex, task_spec)
mistral.engine.task_handler module
mistral.engine.tasks module
-
class mistral.engine.tasks.RegularTask(*args, **kwargs)
Bases: mistral.engine.tasks.Task
Regular task.
Takes care of processing regular tasks with one action.
-
on_action_complete(*args, **kwargs)
-
run(*args, **kwargs)
-
class mistral.engine.tasks.Task(*args, **kwargs)
Bases: object
Task.
Represents a workflow task and defines interface that can be used by
Mistral engine or its components in order to manipulate with tasks.
-
complete(*args, **kwargs)
Complete task and set specified state.
Method sets specified task state and runs all necessary post
completion logic such as publishing workflow variables and
scheduling new workflow commands.
Parameters: |
- state – New task state.
- state_info – New state information (i.e. error message).
|
-
defer(*args, **kwargs)
Defers task.
This method puts task to a waiting state.
-
is_completed()
-
is_created()
-
is_state_changed()
-
is_waiting()
-
on_action_complete(action_ex)
Handle action completion.
Parameters: | action_ex – Action execution. |
-
reset()
-
run()
Runs task.
-
set_state(*args, **kwargs)
Sets task state without executing post completion logic.
Parameters: |
- state – New task state.
- state_info – New state information (i.e. error message).
- processed – New “processed” flag value.
|
-
class mistral.engine.tasks.WithItemsTask(*args, **kwargs)
Bases: mistral.engine.tasks.RegularTask
With-items task.
Takes care of processing “with-items” tasks.
-
on_action_complete(*args, **kwargs)
mistral.engine.utils module
mistral.engine.workflow_handler module
mistral.engine.workflows module
-
class mistral.engine.workflows.Workflow(wf_def, wf_ex=None)
Bases: object
Workflow.
Represents a workflow and defines interface that can be used by
Mistral engine or its components in order to manipulate with workflows.
-
check_and_complete(*args, **kwargs)
Completes the workflow if it needs to be completed.
The method simply checks if there are any tasks that are not
in a terminal state. If there aren’t any then it performs all
necessary logic to finalize the workflow (calculate output etc.).
:return: Number of incomplete tasks.
-
lock(*args, **kwargs)
-
rerun(task_ex, reset=True, env=None)
Rerun workflow from the given task.
Parameters: |
- task_ex – Task execution that the workflow needs to rerun from.
- reset – If True, reset task state including deleting its action
executions.
- env – Environment.
|
-
resume(env=None)
Resume workflow.
Parameters: | env – Environment. |
-
set_state(*args, **kwargs)
-
start(*args, **kwargs)
Start workflow.
Parameters: |
- input_dict – Workflow input.
- desc – Workflow execution description.
- params – Workflow type specific parameters.
|
-
stop(state, msg=None)
Stop workflow.
Parameters: |
- state – New workflow state.
- msg – Additional explaining message.
|
Module contents