In order to be able to receive inputs and create outputs from atoms (or other engine processes) in a fault-tolerant way, there is a need to be able to place what atoms output in some kind of location where it can be re-used by other atoms (or used for other purposes). To accommodate this type of usage TaskFlow provides an abstraction (provided by pluggable stevedore backends) that is similar in concept to a running programs memory.
This abstraction serves the following major purposes:
On engine construction typically a backend (it can be optional) will be provided which satisfies the Backend abstraction. Along with providing a backend object a FlowDetail object will also be created and provided (this object will contain the details about the flow to be ran) to the engine constructor (or associated load() helper functions). Typically a FlowDetail object is created from a LogBook object (the book object acts as a type of container for FlowDetail and AtomDetail objects).
Preparation: Once an engine starts to run it will create a Storage object which will act as the engines interface to the underlying backend storage objects (it provides helper functions that are commonly used by the engine, avoiding repeating code when interacting with the provided FlowDetail and Backend objects). As an engine initializes it will extract (or create) AtomDetail objects for each atom in the workflow the engine will be executing.
Execution: When an engine beings to execute (see engine for more of the details about how an engine goes about this process) it will examine any previously existing AtomDetail objects to see if they can be used for resuming; see resumption for more details on this subject. For atoms which have not finished (or did not finish correctly from a previous run) they will begin executing only after any dependent inputs are ready. This is done by analyzing the execution graph and looking at predecessor AtomDetail outputs and states (which may have been persisted in a past run). This will result in either using their previous information or by running those predecessors and saving their output to the FlowDetail and Backend objects. This execution, analysis and interaction with the storage objects continues (what is described here is a simplification of what really happens; which is quite a bit more complex) until the engine has finished running (at which point the engine will have succeeded or failed in its attempt to run the workflow).
Post-execution: Typically when an engine is done running the logbook would be discarded (to avoid creating a stockpile of useless data) and the backend storage would be told to delete any contents for a given execution. For certain use-cases though it may be advantageous to retain logbooks and their contents.
A few scenarios come to mind:
Note
It should be emphasized that logbook is the authoritative, and, preferably, the only (see inputs and outputs) source of run-time state information (breaking this principle makes it hard/impossible to restart or resume in any type of automated fashion). When an atom returns a result, it should be written directly to a logbook. When atom or flow state changes in any way, logbook is first to know (see notifications for how a user may also get notified of those same state changes). The logbook and a backend and associated storage helper class are responsible to store the actual data. These components used together specify the persistence mechanism (how data is saved and where – memory, database, whatever...) and the persistence policy (when data is saved – every time it changes or at some particular moments or simply never).
To select which persistence backend to use you should use the fetch() function which uses entrypoints (internally using stevedore) to fetch and configure your backend. This makes it simpler than accessing the backend data types directly and provides a common function from which a backend can be fetched.
Using this function to fetch a backend might look like:
from taskflow.persistence import backends
...
persistence = backends.fetch(conf={
"connection': "mysql",
"user": ...,
"password": ...,
})
book = make_and_save_logbook(persistence)
...
As can be seen from above the conf parameter acts as a dictionary that is used to fetch and configure your backend. The restrictions on it are the following:
Connection: 'memory'
Retains all data in local memory (not persisted to reliable storage). Useful for scenarios where persistence is not required (and also in unit tests).
Note
See MemoryBackend for implementation details.
Connection: 'dir' or 'file'
Retains all data in a directory & file based structure on local disk. Will be persisted locally in the case of system failure (allowing for resumption from the same local machine only). Useful for cases where a more reliable persistence is desired along with the simplicity of files and directories (a concept everyone is familiar with).
Note
See DirBackend for implementation details.
Connection: 'mysql' or 'postgres' or 'sqlite'
Retains all data in a ACID compliant database using the sqlalchemy library for schemas, connections, and database interaction functionality. Useful when you need a higher level of durability than offered by the previous solutions. When using these connection types it is possible to resume a engine from a peer machine (this does not apply when using sqlite).
Logbooks
Name | Type | Primary Key |
---|---|---|
created_at | DATETIME | False |
updated_at | DATETIME | False |
uuid | VARCHAR | True |
name | VARCHAR | False |
meta | TEXT | False |
Flow details
Name | Type | Primary Key |
---|---|---|
created_at | DATETIME | False |
updated_at | DATETIME | False |
uuid | VARCHAR | True |
name | VARCHAR | False |
meta | TEXT | False |
state | VARCHAR | False |
parent_uuid | VARCHAR | False |
Atom details
Name | Type | Primary Key |
---|---|---|
created_at | DATETIME | False |
updated_at | DATETIME | False |
uuid | VARCHAR | True |
name | VARCHAR | False |
meta | TEXT | False |
atom_type | VARCHAR | False |
state | VARCHAR | False |
intention | VARCHAR | False |
results | TEXT | False |
failure | TEXT | False |
version | TEXT | False |
parent_uuid | VARCHAR | False |
Note
See SQLAlchemyBackend for implementation details.
Warning
Currently there is a size limit (not applicable for sqlite) that the results will contain. This size limit will restrict how many prior failures a retry atom can contain. More information and a future fix will be posted to bug 1416088 (for the meantime try to ensure that your retry units history does not grow beyond ~80 prior results). This truncation can also be avoided by providing mysql_sql_mode as traditional when selecting your mysql + sqlalchemy based backend (see the mysql modes documentation for what this implies).
Connection: 'zookeeper'
Retains all data in a zookeeper backend (zookeeper exposes operations on files and directories, similar to the above 'dir' or 'file' connection types). Internally the kazoo library is used to interact with zookeeper to perform reliable, distributed and atomic operations on the contents of a logbook represented as znodes. Since zookeeper is also distributed it is also able to resume a engine from a peer machine (having similar functionality as the database connection types listed previously).
Note
See ZkBackend for implementation details.
Fetch a persistence backend with the given configuration.
This fetch method will look for the entrypoint name in the entrypoint namespace, and then attempt to instantiate that entrypoint using the provided configuration and any persistence backend specific kwargs.
NOTE(harlowja): to aid in making it easy to specify configuration and options to a backend the configuration (which is typical just a dictionary) can also be a URI string that identifies the entrypoint name and any configuration specific to that backend.
For example, given the following configuration URI:
mysql://<not-used>/?a=b&c=d
This will look for the entrypoint named ‘mysql’ and will provide a configuration object composed of the URI’s components, in this case that is {'a': 'b', 'c': 'd'} to the constructor of that persistence backend instance.
Fetches a backend, connects, upgrades, then closes it on completion.
This allows a backend instance to be fetched, connected to, have its schema upgraded (if the schema is already up to date this is a no-op) and then used in a context manager statement with the backend being closed upon context manager exit.
Bases: object
Base class for persistence backends.
Bases: object
Base class for backend connections.
Validates that a backend is still ok to be used.
The semantics of this may vary depending on the backend. On failure a backend specific exception should be raised that will indicate why the failure occurred.
Updates a given atom details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have been created by saving a flow details with the given atom detail inside of it.
Updates a given flow details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have been created by saving a logbook with the given flow detail inside of it.
Bases: taskflow.persistence.base.Backend
Base class for persistence backends that address data by path
Subclasses of this backend write logbooks, flow details, and atom details to a provided base path in some filesystem-like storage. They will create and store those objects in three key directories (one for logbooks, one for flow details and one for atom details). They create those associated directories and then create files inside those directories that represent the contents of those objects for later reading and writing.
Default path used when none is provided.
Bases: taskflow.persistence.base.Connection
Base class for path based backend connections.
Bases: object
A collection of flow details and associated metadata.
Typically this class contains a collection of flow detail entries for a given engine (or job) so that those entities can track what ‘work’ has been completed for resumption, reverting and miscellaneous tracking purposes.
The data contained within this class need not be persisted to the backend storage in real time. The data in this class will only be guaranteed to be persisted when a save occurs via some backend connection.
NOTE(harlowja): the naming of this class is analogous to a ship’s log or a similar type of record used in detailing work that has been completed (or work that has not been completed).
Variables: |
|
---|
Pretty formats this logbook into a string.
>>> from taskflow.persistence import models
>>> tmp = models.LogBook("example")
>>> print(tmp.pformat())
LogBook: 'example'
- uuid = ...
- created_at = ...
Adds a new flow detail into this logbook.
NOTE(harlowja): if an existing flow detail exists with the same uuid the existing one will be overwritten with the newly provided one.
Does not guarantee that the details will be immediately saved.
Locate the flow detail corresponding to the given uuid.
Returns: | the flow detail with that uuid |
---|---|
Return type: | FlowDetail (or None if not found) |
Merges the current object state with the given ones state.
If deep_copy is provided as truthy then the local object will use copy.deepcopy to replace this objects local attributes with the provided objects attributes (only if there is a difference between this objects attributes and the provided attributes). If deep_copy is falsey (the default) then a reference copy will occur instead when a difference is detected.
NOTE(harlowja): If the provided object is this object itself then no merging is done. Also note that this does not merge the flow details contained in either.
Returns: | this logbook (freshly merged with the incoming object) |
---|---|
Return type: | LogBook |
Translates the internal state of this object to a dict.
NOTE(harlowja): The returned dict does not include any contained flow details.
Returns: | this logbook in dict form |
---|
Translates the given dict into an instance of this class.
NOTE(harlowja): the dict provided should come from a prior call to to_dict().
Returns: | a new logbook |
---|---|
Return type: | LogBook |
Copies this logbook.
Creates a shallow copy of this logbook. If this logbook contains flow details and retain_contents is truthy (the default) then the flow details container will be shallow copied (the flow details contained there-in will not be copied). If retain_contents is falsey then the copied logbook will have no contained flow details (but it will have the rest of the local objects attributes copied).
Returns: | a new logbook |
---|---|
Return type: | LogBook |
Bases: object
A collection of atom details and associated metadata.
Typically this class contains a collection of atom detail entries that represent the atoms in a given flow structure (along with any other needed metadata relevant to that flow).
The data contained within this class need not be persisted to the backend storage in real time. The data in this class will only be guaranteed to be persisted when a save (or update) occurs via some backend connection.
Variables: |
|
---|
Updates the objects state to be the same as the given one.
This will assign the private and public attributes of the given flow detail directly to this object (replacing any existing attributes in this object; even if they are the same).
NOTE(harlowja): If the provided object is this object itself then no update is done.
Returns: | this flow detail |
---|---|
Return type: | FlowDetail |
Pretty formats this flow detail into a string.
>>> from oslo_utils import uuidutils
>>> from taskflow.persistence import models
>>> flow_detail = models.FlowDetail("example",
... uuid=uuidutils.generate_uuid())
>>> print(flow_detail.pformat())
FlowDetail: 'example'
- uuid = ...
- state = ...
Merges the current object state with the given one’s state.
If deep_copy is provided as truthy then the local object will use copy.deepcopy to replace this objects local attributes with the provided objects attributes (only if there is a difference between this objects attributes and the provided attributes). If deep_copy is falsey (the default) then a reference copy will occur instead when a difference is detected.
NOTE(harlowja): If the provided object is this object itself then no merging is done. Also this does not merge the atom details contained in either.
Returns: | this flow detail (freshly merged with the incoming object) |
---|---|
Return type: | FlowDetail |
Copies this flow detail.
Creates a shallow copy of this flow detail. If this detail contains flow details and retain_contents is truthy (the default) then the atom details container will be shallow copied (the atom details contained there-in will not be copied). If retain_contents is falsey then the copied flow detail will have no contained atom details (but it will have the rest of the local objects attributes copied).
Returns: | a new flow detail |
---|---|
Return type: | FlowDetail |
Translates the internal state of this object to a dict.
NOTE(harlowja): The returned dict does not include any contained atom details.
Returns: | this flow detail in dict form |
---|
Translates the given dict into an instance of this class.
NOTE(harlowja): the dict provided should come from a prior call to to_dict().
Returns: | a new flow detail |
---|---|
Return type: | FlowDetail |
Adds a new atom detail into this flow detail.
NOTE(harlowja): if an existing atom detail exists with the same uuid the existing one will be overwritten with the newly provided one.
Does not guarantee that the details will be immediately saved.
Locate the atom detail corresponding to the given uuid.
Returns: | the atom detail with that uuid |
---|---|
Return type: | AtomDetail (or None if not found) |
Bases: object
A collection of atom specific runtime information and metadata.
This is a base abstract class that contains attributes that are used to connect a atom to the persistence layer before, during, or after it is running. It includes any results it may have produced, any state that it may be in (for example FAILURE), any exception that occurred when running, and any associated stacktrace that may have occurring during an exception being thrown. It may also contain any other metadata that should also be stored along-side the details about the connected atom.
The data contained within this class need not be persisted to the backend storage in real time. The data in this class will only be guaranteed to be persisted when a save (or update) occurs via some backend connection.
Variables: |
|
---|
Gets the atoms last result.
If the atom has produced many results (for example if it has been retried, reverted, executed and ...) this returns the last one of many results.
Updates the object’s state to be the same as the given one.
This will assign the private and public attributes of the given atom detail directly to this object (replacing any existing attributes in this object; even if they are the same).
NOTE(harlowja): If the provided object is this object itself then no update is done.
Returns: | this atom detail |
---|---|
Return type: | AtomDetail |
Merges the current object state with the given ones state.
If deep_copy is provided as truthy then the local object will use copy.deepcopy to replace this objects local attributes with the provided objects attributes (only if there is a difference between this objects attributes and the provided attributes). If deep_copy is falsey (the default) then a reference copy will occur instead when a difference is detected.
NOTE(harlowja): If the provided object is this object itself then no merging is done. Do note that no results are merged in this method. That operation must to be the responsibilty of subclasses to implement and override this abstract method and provide that merging themselves as they see fit.
Returns: | this atom detail (freshly merged with the incoming object) |
---|---|
Return type: | AtomDetail |
Translates the internal state of this object to a dict.
Returns: | this atom detail in dict form |
---|
Translates the given dict into an instance of this class.
NOTE(harlowja): the dict provided should come from a prior call to to_dict().
Returns: | a new atom detail |
---|---|
Return type: | AtomDetail |
Bases: taskflow.persistence.models.AtomDetail
A task detail (an atom detail typically associated with a Task atom).
Resets this task detail and sets state attribute value.
This sets any previously set results, failure, and revert_results attributes back to None and sets the state to the provided one, as well as setting this task details intention attribute to EXECUTE.
Puts a result (acquired in the given state) into this detail.
Returns whether this object was modified (or whether it was not).
Merges the current task detail with the given one.
NOTE(harlowja): This merge does not copy and replace the results or revert_results if it differs. Instead the current objects results and revert_results attributes directly becomes (via assignment) the other objects attributes. Also note that if the provided object is this object itself then no merging is done.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by using copy.deepcopy or by using copy.copy).
Returns: | this task detail (freshly merged with the incoming object) |
---|---|
Return type: | TaskDetail |
Copies this task detail.
Creates a shallow copy of this task detail (any meta-data and version information that this object maintains is shallow copied via copy.copy).
NOTE(harlowja): This copy does not copy and replace the results or revert_results attribute if it differs. Instead the current objects results and revert_results attributes directly becomes (via assignment) the cloned objects attributes.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by using copy.deepcopy or by using copy.copy).
Returns: | a new task detail |
---|---|
Return type: | TaskDetail |
Bases: taskflow.persistence.models.AtomDetail
A retry detail (an atom detail typically associated with a Retry atom).
Resets this retry detail and sets state attribute value.
This sets any previously added results back to an empty list and resets the failure and revert_failure and revert_results attributes back to None and sets the state to the provided one, as well as setting this retry details intention attribute to EXECUTE.
Copies this retry detail.
Creates a shallow copy of this retry detail (any meta-data and version information that this object maintains is shallow copied via copy.copy).
NOTE(harlowja): This copy does not copy the incoming objects results or revert_results attributes. Instead this objects results attribute list is iterated over and a new list is constructed with each (data, failures) element in that list having its failures (a dictionary of each named Failure object that occured) copied but its data is left untouched. After this is done that new list becomes (via assignment) the cloned objects results attribute. The revert_results is directly assigned to the cloned objects revert_results attribute.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if the data in results is copied at a deeper level (for example by using copy.deepcopy or by using copy.copy).
Returns: | a new retry detail |
---|---|
Return type: | RetryDetail |
The last failure dictionary that was produced.
NOTE(harlowja): This is not the same as the local failure attribute as the obtained failure dictionary in the results attribute (which is what this returns) is from associated atom failures (which is different from the directly related failure of the retry unit associated with this atom detail).
Puts a result (acquired in the given state) into this detail.
Returns whether this object was modified (or whether it was not).
Merges the current retry detail with the given one.
NOTE(harlowja): This merge does not deep copy the incoming objects results attribute (if it differs). Instead the incoming objects results attribute list is always iterated over and a new list is constructed with each (data, failures) element in that list having its failures (a dictionary of each named Failure objects that occurred) copied but its data is left untouched. After this is done that new list becomes (via assignment) this objects results attribute. Also note that if the provided object is this object itself then no merging is done.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if the data in results is copied at a deeper level (for example by using copy.deepcopy or by using copy.copy).
Returns: | this retry detail (freshly merged with the incoming object) |
---|---|
Return type: | RetryDetail |
Bases: taskflow.types.tree.Node
A in-memory filesystem inode-like object.
Bases: object
An in-memory filesystem-like structure.
This filesystem uses posix style paths only so users must be careful to use the posixpath module instead of the os.path one which will vary depending on the operating system which the active python is running in (the decision to use posixpath was to avoid the path variations which are not relevant in an implementation of a in-memory fake filesystem).
Not thread-safe when a single filesystem is mutated at the same time by multiple threads. For example having multiple threads call into clear() at the same time could potentially end badly. It is thread-safe when only get() or other read-only actions (like calling into ls()) are occurring at the same time.
Example usage:
>>> from taskflow.persistence.backends import impl_memory
>>> fs = impl_memory.FakeFilesystem()
>>> fs.ensure_path('/a/b/c')
>>> fs['/a/b/c'] = 'd'
>>> print(fs['/a/b/c'])
d
>>> del fs['/a/b/c']
>>> fs.ls("/a/b")
[]
>>> fs.get("/a/b/c", 'blob')
'blob'
Root path of the in-memory filesystem.
Split a pathname into a tuple of (head, tail).
Bases: taskflow.persistence.path_based.PathBasedBackend
A in-memory (non-persistent) backend.
This backend writes logbooks, flow details, and atom details to a in-memory filesystem-like structure (rooted by the memory instance variable).
This backend does not provide true transactional semantics. It does guarantee that there will be no inter-thread race conditions when writing and reading by using a read/write locks.
Default path used when none is provided.
Bases: taskflow.persistence.path_based.PathBasedBackend
A directory and file based backend.
This backend does not provide true transactional semantics. It does guarantee that there will be no interprocess race conditions when writing and reading by using a consistent hierarchy of file based locks.
Example configuration:
conf = {
"path": "/tmp/taskflow", # save data to this root directory
"max_cache_size": 1024, # keep up-to 1024 entries in memory
}
Default encoding used when decoding or encoding files into or from text/unicode into binary or binary into text/unicode.
Bases: taskflow.persistence.base.Backend
A sqlalchemy backend.
Example configuration:
conf = {
"connection": "sqlite:////tmp/test.db",
}
Bases: taskflow.persistence.path_based.PathBasedBackend
A zookeeper-backed backend.
Example configuration:
conf = {
"hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
"path": "/taskflow",
}
Do note that the creation of a kazoo client is achieved by make_client() and the transfer of this backend configuration to that function to make a client may happen at __init__ time. This implies that certain parameters from this backend configuration may be provided to make_client() such that if a client was not provided by the caller one will be created according to make_client()‘s specification
Default path used when none is provided.
Bases: object
Interface between engines and logbook and its backend (if any).
This class provides a simple interface to save atoms of a given flow and associated activity and results to persistence layer (logbook, atom_details, flow_details) for use by engines. This makes it easier to interact with the underlying storage & backend mechanism through this interface rather than accessing those objects directly.
NOTE(harlowja): if no backend is provided then a in-memory backend will be automatically used and the provided flow detail object will be placed into it for the duration of this objects existence.
Injector task detail name.
This task detail is a special detail that will be automatically created and saved to store persistent injected values (name conflicts with it must be avoided) that are global to the flow being executed.
Ensure there is an atomdetail for each of the given atoms.
Returns list of atomdetail uuids for each atom processed.
Ensure there is an atomdetail for the given atom.
Returns the uuid for the atomdetail that corresponds to the given atom.
Gets a dict of atom name => (state, intention) given atom names.
Updates a atoms associated metadata.
This update will take a provided dictionary or a list of (key, value) pairs to include in the updated metadata (newer keys will overwrite older keys) and after merging saves the updated data into the underlying persistence layer.
Set a tasks progress.
Parameters: |
|
---|
Get the progress of a task given a tasks name.
Parameters: | task_name – tasks name |
---|---|
Returns: | current task progress value |
Get the progress details of a task given a tasks name.
Parameters: | task_name – task name |
---|---|
Returns: | None if progress_details not defined, else progress_details dict |
Gets the execute results for an atom from storage.
Get all execute failures that happened with this flow.
Add values into storage for a specific atom only.
Parameters: | transient – save the data in-memory only instead of persisting the data to backend storage (useful for resource-like objects or similar objects which can not be persisted) |
---|
This method injects a dictionary/pairs of arguments for an atom so that when that atom is scheduled for execution it will have immediate access to these arguments.
Note
Injected atom arguments take precedence over arguments provided by predecessor atoms or arguments provided by injecting into the flow scope (using the inject() method).
Warning
It should be noted that injected atom arguments (that are scoped to the atom with the given name) should be serializable whenever possible. This is a requirement for the worker based engine which must serialize (typically using json) all atom execute() and revert() arguments to be able to transmit those arguments to the target worker(s). If the use-case being applied/desired is to later use the worker based engine then it is highly recommended to ensure all injected atoms (even transient ones) are serializable to avoid issues that may appear later (when a object turned out to not actually be serializable).
Add values into storage.
This method should be used to put flow parameters (requirements that are not satisfied by any atom in the flow) into storage.
Parameters: | transient – save the data in-memory only instead of persisting the data to backend storage (useful for resource-like objects or similar objects which can not be persisted) |
---|
Warning
It should be noted that injected flow arguments (that are scoped to all atoms in this flow) should be serializable whenever possible. This is a requirement for the worker based engine which must serialize (typically using json) all atom execute() and revert() arguments to be able to transmit those arguments to the target worker(s). If the use-case being applied/desired is to later use the worker based engine then it is highly recommended to ensure all injected atoms (even transient ones) are serializable to avoid issues that may appear later (when a object turned out to not actually be serializable).
Fetch unsatisfied execute arguments using an atoms args mapping.
NOTE(harlowja): this takes into account the provided scope walker atoms who should produce the required value at runtime, as well as the transient/persistent flow and atom specific injected arguments. It does not check if the providers actually have produced the needed values; it just checks that they are registered to produce it in the future.
Fetch execute arguments for an atom using its args mapping.
Transition flow from old state to new state.
Returns (True, old_state) if transition was performed, or (False, old_state) if it was ignored, or raises a InvalidState exception if transition is invalid.