In order to be able to receive inputs and create outputs from atoms (or other engine processes) in a fault-tolerant way, there is a need to be able to place what atoms output in some kind of location where it can be re-used by other atoms (or used for other purposes). To accommodate this type of usage TaskFlow provides an abstraction (provided by pluggable stevedore backends) that is similar in concept to a running programs memory.
This abstraction serves the following major purposes:
On engine construction typically a backend (it can be
optional) will be provided which satisfies the
Backend
abstraction. Along with
providing a backend object a
FlowDetail
object will also be
created and provided (this object will contain the details about the flow to be
ran) to the engine constructor (or associated load()
helper functions). Typically a
FlowDetail
object is created from a
LogBook
object (the book object acts
as a type of container for FlowDetail
and AtomDetail
objects).
Preparation: Once an engine starts to run it will create a
Storage
object which will act as the engines
interface to the underlying backend storage objects (it provides helper
functions that are commonly used by the engine, avoiding repeating code when
interacting with the provided
FlowDetail
and
Backend
objects). As an engine
initializes it will extract (or create)
AtomDetail
objects for each atom in
the workflow the engine will be executing.
Execution: When an engine beings to execute (see engine
for more of the details about how an engine goes about this process) it will
examine any previously existing
AtomDetail
objects to see if they can
be used for resuming; see resumption for more details on
this subject. For atoms which have not finished (or did not finish correctly
from a previous run) they will begin executing only after any dependent inputs
are ready. This is done by analyzing the execution graph and looking at
predecessor AtomDetail
outputs and
states (which may have been persisted in a past run). This will result in
either using their previous information or by running those predecessors and
saving their output to the FlowDetail
and Backend
objects. This
execution, analysis and interaction with the storage objects continues (what is
described here is a simplification of what really happens; which is quite a bit
more complex) until the engine has finished running (at which point the engine
will have succeeded or failed in its attempt to run the workflow).
Post-execution: Typically when an engine is done running the logbook would be discarded (to avoid creating a stockpile of useless data) and the backend storage would be told to delete any contents for a given execution. For certain use-cases though it may be advantageous to retain logbooks and their contents.
A few scenarios come to mind:
Note
It should be emphasized that logbook is the authoritative, and, preferably, the only (see inputs and outputs) source of run-time state information (breaking this principle makes it hard/impossible to restart or resume in any type of automated fashion). When an atom returns a result, it should be written directly to a logbook. When atom or flow state changes in any way, logbook is first to know (see notifications for how a user may also get notified of those same state changes). The logbook and a backend and associated storage helper class are responsible to store the actual data. These components used together specify the persistence mechanism (how data is saved and where – memory, database, whatever…) and the persistence policy (when data is saved – every time it changes or at some particular moments or simply never).
To select which persistence backend to use you should use the fetch()
function which uses entrypoints
(internally using stevedore) to fetch and configure your backend. This makes
it simpler than accessing the backend data types directly and provides a common
function from which a backend can be fetched.
Using this function to fetch a backend might look like:
from taskflow.persistence import backends
...
persistence = backends.fetch(conf={
"connection": "mysql",
"user": ...,
"password": ...,
})
book = make_and_save_logbook(persistence)
...
As can be seen from above the conf
parameter acts as a dictionary that
is used to fetch and configure your backend. The restrictions on it are
the following:
'connection'
and possibly type-specific backend parameters as other
keys.Connection: 'memory'
Retains all data in local memory (not persisted to reliable storage). Useful for scenarios where persistence is not required (and also in unit tests).
Note
See MemoryBackend
for implementation details.
Connection: 'dir'
or 'file'
Retains all data in a directory & file based structure on local disk. Will be persisted locally in the case of system failure (allowing for resumption from the same local machine only). Useful for cases where a more reliable persistence is desired along with the simplicity of files and directories (a concept everyone is familiar with).
Note
See DirBackend
for implementation details.
Connection: 'mysql'
or 'postgres'
or 'sqlite'
Retains all data in a ACID compliant database using the sqlalchemy library for schemas, connections, and database interaction functionality. Useful when you need a higher level of durability than offered by the previous solutions. When using these connection types it is possible to resume a engine from a peer machine (this does not apply when using sqlite).
Logbooks
Name | Type | Primary Key |
---|---|---|
created_at | DATETIME | False |
updated_at | DATETIME | False |
uuid | VARCHAR | True |
name | VARCHAR | False |
meta | TEXT | False |
Flow details
Name | Type | Primary Key |
---|---|---|
created_at | DATETIME | False |
updated_at | DATETIME | False |
uuid | VARCHAR | True |
name | VARCHAR | False |
meta | TEXT | False |
state | VARCHAR | False |
parent_uuid | VARCHAR | False |
Atom details
Name | Type | Primary Key |
---|---|---|
created_at | DATETIME | False |
updated_at | DATETIME | False |
uuid | VARCHAR | True |
name | VARCHAR | False |
meta | TEXT | False |
atom_type | VARCHAR | False |
state | VARCHAR | False |
intention | VARCHAR | False |
results | TEXT | False |
failure | TEXT | False |
version | TEXT | False |
parent_uuid | VARCHAR | False |
Note
See SQLAlchemyBackend
for implementation details.
Warning
Currently there is a size limit (not applicable for sqlite
) that the
results
will contain. This size limit will restrict how many prior
failures a retry atom can contain. More information and a future fix
will be posted to bug 1416088 (for the meantime try to ensure that
your retry units history does not grow beyond ~80 prior results). This
truncation can also be avoided by providing mysql_sql_mode
as
traditional
when selecting your mysql + sqlalchemy based
backend (see the mysql modes documentation for what this implies).
Connection: 'zookeeper'
Retains all data in a zookeeper backend (zookeeper exposes operations on
files and directories, similar to the above 'dir'
or 'file'
connection
types). Internally the kazoo library is used to interact with zookeeper
to perform reliable, distributed and atomic operations on the contents of a
logbook represented as znodes. Since zookeeper is also distributed it is also
able to resume a engine from a peer machine (having similar functionality
as the database connection types listed previously).
Note
See ZkBackend
for implementation details.
taskflow.persistence.backends.
fetch
(conf, namespace='taskflow.persistence', **kwargs)[source]¶Fetch a persistence backend with the given configuration.
This fetch method will look for the entrypoint name in the entrypoint namespace, and then attempt to instantiate that entrypoint using the provided configuration and any persistence backend specific kwargs.
NOTE(harlowja): to aid in making it easy to specify configuration and options to a backend the configuration (which is typical just a dictionary) can also be a URI string that identifies the entrypoint name and any configuration specific to that backend.
For example, given the following configuration URI:
mysql://<not-used>/?a=b&c=d
This will look for the entrypoint named ‘mysql’ and will provide
a configuration object composed of the URI’s components, in this case that
is {'a': 'b', 'c': 'd'}
to the constructor of that persistence backend
instance.
taskflow.persistence.backends.
backend
(conf, namespace='taskflow.persistence', **kwargs)[source]¶Fetches a backend, connects, upgrades, then closes it on completion.
This allows a backend instance to be fetched, connected to, have its schema upgraded (if the schema is already up to date this is a no-op) and then used in a context manager statement with the backend being closed upon context manager exit.
taskflow.persistence.base.
Backend
(conf)[source]¶Bases: object
Base class for persistence backends.
taskflow.persistence.base.
Connection
[source]¶Bases: object
Base class for backend connections.
backend
¶Returns the backend this connection is associated with.
validate
()[source]¶Validates that a backend is still ok to be used.
The semantics of this may vary depending on the backend. On failure a backend specific exception should be raised that will indicate why the failure occurred.
update_atom_details
(atom_detail)[source]¶Updates a given atom details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have been created by saving a flow details with the given atom detail inside of it.
update_flow_details
(flow_detail)[source]¶Updates a given flow details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have been created by saving a logbook with the given flow detail inside of it.
taskflow.persistence.path_based.
PathBasedBackend
(conf)[source]¶Bases: taskflow.persistence.base.Backend
Base class for persistence backends that address data by path
Subclasses of this backend write logbooks, flow details, and atom details to a provided base path in some filesystem-like storage. They will create and store those objects in three key directories (one for logbooks, one for flow details and one for atom details). They create those associated directories and then create files inside those directories that represent the contents of those objects for later reading and writing.
DEFAULT_PATH
= None¶Default path used when none is provided.
taskflow.persistence.path_based.
PathBasedConnection
(backend)[source]¶Bases: taskflow.persistence.base.Connection
Base class for path based backend connections.
backend
¶Returns the backend this connection is associated with.
get_flows_for_book
(book_uuid, lazy=False)[source]¶Return an iterable of flowdetails for a given logbook uuid.
get_flow_details
(flow_uuid, lazy=False)[source]¶Fetches a flowdetails object matching the given uuid.
update_flow_details
(flow_detail, ignore_missing=False)[source]¶Updates a given flow details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have been created by saving a logbook with the given flow detail inside of it.
get_atoms_for_flow
(flow_uuid)[source]¶Return an iterable of atomdetails for a given flowdetails uuid.
taskflow.persistence.models.
LogBook
(name, uuid=None)[source]¶Bases: object
A collection of flow details and associated metadata.
Typically this class contains a collection of flow detail entries for a given engine (or job) so that those entities can track what ‘work’ has been completed for resumption, reverting and miscellaneous tracking purposes.
The data contained within this class need not be persisted to the backend storage in real time. The data in this class will only be guaranteed to be persisted when a save occurs via some backend connection.
NOTE(harlowja): the naming of this class is analogous to a ship’s log or a similar type of record used in detailing work that has been completed (or work that has not been completed).
Variables: |
|
---|
pformat
(indent=0, linesep='\n')[source]¶Pretty formats this logbook into a string.
>>> from taskflow.persistence import models
>>> tmp = models.LogBook("example")
>>> print(tmp.pformat())
LogBook: 'example'
- uuid = ...
- created_at = ...
add
(fd)[source]¶Adds a new flow detail into this logbook.
NOTE(harlowja): if an existing flow detail exists with the same uuid the existing one will be overwritten with the newly provided one.
Does not guarantee that the details will be immediately saved.
find
(flow_uuid)[source]¶Locate the flow detail corresponding to the given uuid.
Returns: | the flow detail with that uuid |
---|---|
Return type: | FlowDetail (or None if not found) |
merge
(lb, deep_copy=False)[source]¶Merges the current object state with the given ones state.
If deep_copy
is provided as truthy then the
local object will use copy.deepcopy
to replace this objects
local attributes with the provided objects attributes (only if
there is a difference between this objects attributes and the
provided attributes). If deep_copy
is falsey (the default) then a
reference copy will occur instead when a difference is detected.
NOTE(harlowja): If the provided object is this object itself then no merging is done. Also note that this does not merge the flow details contained in either.
Returns: | this logbook (freshly merged with the incoming object) |
---|---|
Return type: | LogBook |
to_dict
(marshal_time=False)[source]¶Translates the internal state of this object to a dict
.
NOTE(harlowja): The returned dict
does not include any
contained flow details.
Returns: | this logbook in dict form |
---|
from_dict
(data, unmarshal_time=False)[source]¶Translates the given dict
into an instance of this class.
NOTE(harlowja): the dict
provided should come from a prior
call to to_dict()
.
Returns: | a new logbook |
---|---|
Return type: | LogBook |
uuid
¶The unique identifer of this logbook.
name
¶The name of this logbook.
copy
(retain_contents=True)[source]¶Copies this logbook.
Creates a shallow copy of this logbook. If this logbook contains
flow details and retain_contents
is truthy (the default) then
the flow details container will be shallow copied (the flow details
contained there-in will not be copied). If retain_contents
is
falsey then the copied logbook will have no contained flow
details (but it will have the rest of the local objects attributes
copied).
Returns: | a new logbook |
---|---|
Return type: | LogBook |
taskflow.persistence.models.
FlowDetail
(name, uuid)[source]¶Bases: object
A collection of atom details and associated metadata.
Typically this class contains a collection of atom detail entries that represent the atoms in a given flow structure (along with any other needed metadata relevant to that flow).
The data contained within this class need not be persisted to the backend storage in real time. The data in this class will only be guaranteed to be persisted when a save (or update) occurs via some backend connection.
Variables: | meta – A dictionary of meta-data associated with this flow detail. |
---|
state
= None¶The state of the flow associated with this flow detail.
update
(fd)[source]¶Updates the objects state to be the same as the given one.
This will assign the private and public attributes of the given flow detail directly to this object (replacing any existing attributes in this object; even if they are the same).
NOTE(harlowja): If the provided object is this object itself then no update is done.
Returns: | this flow detail |
---|---|
Return type: | FlowDetail |
pformat
(indent=0, linesep='\n')[source]¶Pretty formats this flow detail into a string.
>>> from oslo_utils import uuidutils
>>> from taskflow.persistence import models
>>> flow_detail = models.FlowDetail("example",
... uuid=uuidutils.generate_uuid())
>>> print(flow_detail.pformat())
FlowDetail: 'example'
- uuid = ...
- state = ...
merge
(fd, deep_copy=False)[source]¶Merges the current object state with the given one’s state.
If deep_copy
is provided as truthy then the
local object will use copy.deepcopy
to replace this objects
local attributes with the provided objects attributes (only if
there is a difference between this objects attributes and the
provided attributes). If deep_copy
is falsey (the default) then a
reference copy will occur instead when a difference is detected.
NOTE(harlowja): If the provided object is this object itself then no merging is done. Also this does not merge the atom details contained in either.
Returns: | this flow detail (freshly merged with the incoming object) |
---|---|
Return type: | FlowDetail |
copy
(retain_contents=True)[source]¶Copies this flow detail.
Creates a shallow copy of this flow detail. If this detail contains
flow details and retain_contents
is truthy (the default) then
the atom details container will be shallow copied (the atom details
contained there-in will not be copied). If retain_contents
is
falsey then the copied flow detail will have no contained atom
details (but it will have the rest of the local objects attributes
copied).
Returns: | a new flow detail |
---|---|
Return type: | FlowDetail |
to_dict
()[source]¶Translates the internal state of this object to a dict
.
NOTE(harlowja): The returned dict
does not include any
contained atom details.
Returns: | this flow detail in dict form |
---|
from_dict
(data)[source]¶Translates the given dict
into an instance of this class.
NOTE(harlowja): the dict
provided should come from a prior
call to to_dict()
.
Returns: | a new flow detail |
---|---|
Return type: | FlowDetail |
add
(ad)[source]¶Adds a new atom detail into this flow detail.
NOTE(harlowja): if an existing atom detail exists with the same uuid the existing one will be overwritten with the newly provided one.
Does not guarantee that the details will be immediately saved.
find
(ad_uuid)[source]¶Locate the atom detail corresponding to the given uuid.
Returns: | the atom detail with that uuid |
---|---|
Return type: | AtomDetail (or None if not found) |
uuid
¶The unique identifer of this flow detail.
name
¶The name of this flow detail.
taskflow.persistence.models.
AtomDetail
(name, uuid)[source]¶Bases: object
A collection of atom specific runtime information and metadata.
This is a base abstract class that contains attributes that are used
to connect a atom to the persistence layer before, during, or after it is
running. It includes any results it may have produced, any state that it
may be in (for example FAILURE
), any exception that occurred when
running, and any associated stacktrace that may have occurring during an
exception being thrown. It may also contain any other metadata that
should also be stored along-side the details about the connected atom.
The data contained within this class need not be persisted to the backend storage in real time. The data in this class will only be guaranteed to be persisted when a save (or update) occurs via some backend connection.
Variables: |
|
---|
state
= None¶The state of the atom associated with this atom detail.
last_results
¶Gets the atoms last result.
If the atom has produced many results (for example if it has been retried, reverted, executed and …) this returns the last one of many results.
update
(ad)[source]¶Updates the object’s state to be the same as the given one.
This will assign the private and public attributes of the given atom detail directly to this object (replacing any existing attributes in this object; even if they are the same).
NOTE(harlowja): If the provided object is this object itself then no update is done.
Returns: | this atom detail |
---|---|
Return type: | AtomDetail |
merge
(other, deep_copy=False)[source]¶Merges the current object state with the given ones state.
If deep_copy
is provided as truthy then the
local object will use copy.deepcopy
to replace this objects
local attributes with the provided objects attributes (only if
there is a difference between this objects attributes and the
provided attributes). If deep_copy
is falsey (the default) then a
reference copy will occur instead when a difference is detected.
NOTE(harlowja): If the provided object is this object itself then no merging is done. Do note that no results are merged in this method. That operation must to be the responsibilty of subclasses to implement and override this abstract method and provide that merging themselves as they see fit.
Returns: | this atom detail (freshly merged with the incoming object) |
---|---|
Return type: | AtomDetail |
to_dict
()[source]¶Translates the internal state of this object to a dict
.
Returns: | this atom detail in dict form |
---|
from_dict
(data)[source]¶Translates the given dict
into an instance of this class.
NOTE(harlowja): the dict
provided should come from a prior
call to to_dict()
.
Returns: | a new atom detail |
---|---|
Return type: | AtomDetail |
uuid
¶The unique identifer of this atom detail.
name
¶The name of this atom detail.
taskflow.persistence.models.
TaskDetail
(name, uuid)[source]¶Bases: taskflow.persistence.models.AtomDetail
A task detail (an atom detail typically associated with a Task
atom).
reset
(state)[source]¶Resets this task detail and sets state
attribute value.
This sets any previously set results
, failure
,
and revert_results
attributes back to None
and sets the
state to the provided one, as well as setting this task
details intention
attribute to EXECUTE
.
put
(state, result)[source]¶Puts a result (acquired in the given state) into this detail.
Returns whether this object was modified (or whether it was not).
merge
(other, deep_copy=False)[source]¶Merges the current task detail with the given one.
NOTE(harlowja): This merge does not copy and replace
the results
or revert_results
if it differs. Instead the
current objects results
and revert_results
attributes directly
becomes (via assignment) the other objects attributes. Also note that
if the provided object is this object itself then no merging is
done.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if this is copied at a deeper level (for example by
using copy.deepcopy
or by using copy.copy
).
Returns: | this task detail (freshly merged with the incoming object) |
---|---|
Return type: | TaskDetail |
copy
()[source]¶Copies this task detail.
Creates a shallow copy of this task detail (any meta-data and
version information that this object maintains is shallow
copied via copy.copy
).
NOTE(harlowja): This copy does not copy and replace
the results
or revert_results
attribute if it differs. Instead
the current objects results
and revert_results
attributes
directly becomes (via assignment) the cloned objects attributes.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if this is copied at a deeper level (for example by
using copy.deepcopy
or by using copy.copy
).
Returns: | a new task detail |
---|---|
Return type: | TaskDetail |
taskflow.persistence.models.
RetryDetail
(name, uuid)[source]¶Bases: taskflow.persistence.models.AtomDetail
A retry detail (an atom detail typically associated with a Retry
atom).
reset
(state)[source]¶Resets this retry detail and sets state
attribute value.
This sets any previously added results
back to an empty list
and resets the failure
and revert_failure
and
revert_results
attributes back to None
and sets the state
to the provided one, as well as setting this retry
details intention
attribute to EXECUTE
.
copy
()[source]¶Copies this retry detail.
Creates a shallow copy of this retry detail (any meta-data and
version information that this object maintains is shallow
copied via copy.copy
).
NOTE(harlowja): This copy does not copy
the incoming objects results
or revert_results
attributes.
Instead this objects results
attribute list is iterated over and
a new list is constructed with each (data, failures)
element in
that list having its failures
(a dictionary of each named
Failure
object that
occured) copied but its data
is left untouched. After
this is done that new list becomes (via assignment) the cloned
objects results
attribute. The revert_results
is directly
assigned to the cloned objects revert_results
attribute.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if the data
in results
is copied at a
deeper level (for example by using copy.deepcopy
or by
using copy.copy
).
Returns: | a new retry detail |
---|---|
Return type: | RetryDetail |
last_results
¶The last result that was produced.
last_failures
¶The last failure dictionary that was produced.
NOTE(harlowja): This is not the same as the
local failure
attribute as the obtained failure dictionary in
the results
attribute (which is what this returns) is from
associated atom failures (which is different from the directly
related failure of the retry unit associated with this
atom detail).
put
(state, result)[source]¶Puts a result (acquired in the given state) into this detail.
Returns whether this object was modified (or whether it was not).
merge
(other, deep_copy=False)[source]¶Merges the current retry detail with the given one.
NOTE(harlowja): This merge does not deep copy
the incoming objects results
attribute (if it differs). Instead
the incoming objects results
attribute list is always iterated
over and a new list is constructed with
each (data, failures)
element in that list having
its failures
(a dictionary of each named
Failure
objects that
occurred) copied but its data
is left untouched. After
this is done that new list becomes (via assignment) this
objects results
attribute. Also note that if the provided object
is this object itself then no merging is done.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if the data
in results
is copied at a
deeper level (for example by using copy.deepcopy
or by
using copy.copy
).
Returns: | this retry detail (freshly merged with the incoming object) |
---|---|
Return type: | RetryDetail |
taskflow.persistence.backends.impl_memory.
FakeInode
(item, path, value=None)[source]¶Bases: taskflow.types.tree.Node
A in-memory filesystem inode-like object.
taskflow.persistence.backends.impl_memory.
FakeFilesystem
(deep_copy=True)[source]¶Bases: object
An in-memory filesystem-like structure.
This filesystem uses posix style paths only so users must be careful
to use the posixpath
module instead of the os.path
one which will
vary depending on the operating system which the active python is running
in (the decision to use posixpath
was to avoid the path variations
which are not relevant in an implementation of a in-memory fake
filesystem).
Not thread-safe when a single filesystem is mutated at the same
time by multiple threads. For example having multiple threads call into
clear()
at the same time could potentially end badly. It is thread-safe when only
get()
or other read-only actions (like calling into
ls()
)
are occurring at the same time.
Example usage:
>>> from taskflow.persistence.backends import impl_memory
>>> fs = impl_memory.FakeFilesystem()
>>> fs.ensure_path('/a/b/c')
>>> fs['/a/b/c'] = 'd'
>>> print(fs['/a/b/c'])
d
>>> del fs['/a/b/c']
>>> fs.ls("/a/b")
[]
>>> fs.get("/a/b/c", 'blob')
'blob'
root_path
= '/'¶Root path of the in-memory filesystem.
split
(p)¶Split a pathname into a tuple of (head, tail)
.
taskflow.persistence.backends.impl_memory.
MemoryBackend
(conf=None)[source]¶Bases: taskflow.persistence.path_based.PathBasedBackend
A in-memory (non-persistent) backend.
This backend writes logbooks, flow details, and atom details to a
in-memory filesystem-like structure (rooted by the memory
instance variable).
This backend does not provide true transactional semantics. It does guarantee that there will be no inter-thread race conditions when writing and reading by using a read/write locks.
DEFAULT_PATH
= '/'¶Default path used when none is provided.
taskflow.persistence.backends.impl_dir.
DirBackend
(conf)[source]¶Bases: taskflow.persistence.path_based.PathBasedBackend
A directory and file based backend.
This backend does not provide true transactional semantics. It does guarantee that there will be no interprocess race conditions when writing and reading by using a consistent hierarchy of file based locks.
Example configuration:
conf = {
"path": "/tmp/taskflow", # save data to this root directory
"max_cache_size": 1024, # keep up-to 1024 entries in memory
}
DEFAULT_FILE_ENCODING
= 'utf-8'¶Default encoding used when decoding or encoding files into or from text/unicode into binary or binary into text/unicode.
taskflow.persistence.backends.impl_sqlalchemy.
SQLAlchemyBackend
(conf, engine=None)[source]¶Bases: taskflow.persistence.base.Backend
A sqlalchemy backend.
Example configuration:
conf = {
"connection": "sqlite:////tmp/test.db",
}
taskflow.persistence.backends.impl_sqlalchemy.
Connection
(backend, upgrade_lock)[source]¶Bases: taskflow.persistence.base.Connection
backend
¶Returns the backend this connection is associated with.
update_atom_details
(atom_detail)[source]¶Updates a given atom details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have been created by saving a flow details with the given atom detail inside of it.
update_flow_details
(flow_detail)[source]¶Updates a given flow details and returns the updated version.
NOTE(harlowja): the details that is to be updated must already have been created by saving a logbook with the given flow detail inside of it.
get_flows_for_book
(book_uuid, lazy=False)[source]¶Return an iterable of flowdetails for a given logbook uuid.
get_flow_details
(fd_uuid, lazy=False)[source]¶Fetches a flowdetails object matching the given uuid.
taskflow.persistence.backends.impl_zookeeper.
ZkBackend
(conf, client=None)[source]¶Bases: taskflow.persistence.path_based.PathBasedBackend
A zookeeper-backed backend.
Example configuration:
conf = {
"hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
"path": "/taskflow",
}
Do note that the creation of a kazoo client is achieved
by make_client()
and the transfer
of this backend configuration to that function to make a
client may happen at __init__
time. This implies that certain
parameters from this backend configuration may be provided to
make_client()
such
that if a client was not provided by the caller one will be created
according to make_client()
’s
specification
DEFAULT_PATH
= '/taskflow'¶Default path used when none is provided.
taskflow.storage.
Storage
(flow_detail, backend=None, scope_fetcher=None)[source]¶Bases: object
Interface between engines and logbook and its backend (if any).
This class provides a simple interface to save atoms of a given flow and associated activity and results to persistence layer (logbook, atom_details, flow_details) for use by engines. This makes it easier to interact with the underlying storage & backend mechanism through this interface rather than accessing those objects directly.
NOTE(harlowja): if no backend is provided then a in-memory backend will be automatically used and the provided flow detail object will be placed into it for the duration of this objects existence.
injector_name
= '_TaskFlow_INJECTOR'¶Injector task detail name.
This task detail is a special detail that will be automatically created and saved to store persistent injected values (name conflicts with it must be avoided) that are global to the flow being executed.
ensure_atoms
(atoms)[source]¶Ensure there is an atomdetail for each of the given atoms.
Returns list of atomdetail uuids for each atom processed.
lock
¶Reader/writer lock used to ensure multi-thread safety.
This does not protect against the same storage objects being used by multiple engines/users across multiple processes (or different machines); certain backends handle that situation better than others (for example by using sequence identifiers) and it’s a ongoing work in progress to make that better).
ensure_atom
(atom)[source]¶Ensure there is an atomdetail for the given atom.
Returns the uuid for the atomdetail that corresponds to the given atom.
flow_name
¶The flow detail name this storage unit is associated with.
flow_uuid
¶The flow detail uuid this storage unit is associated with.
flow_meta
¶The flow detail metadata this storage unit is associated with.
backend
¶The backend this storage unit is associated with.
set_atom_intention
(atom_name, intention)[source]¶Sets the intention of an atom given an atoms name.
get_atoms_states
(atom_names)[source]¶Gets a dict of atom name => (state, intention) given atom names.
update_atom_metadata
(atom_name, update_with)[source]¶Updates a atoms associated metadata.
This update will take a provided dictionary or a list of (key, value) pairs to include in the updated metadata (newer keys will overwrite older keys) and after merging saves the updated data into the underlying persistence layer.
set_task_progress
(task_name, progress, details=None)[source]¶Set a tasks progress.
Parameters: |
|
---|
get_task_progress
(task_name)[source]¶Get the progress of a task given a tasks name.
Parameters: | task_name – tasks name |
---|---|
Returns: | current task progress value |
get_task_progress_details
(task_name)[source]¶Get the progress details of a task given a tasks name.
Parameters: | task_name – task name |
---|---|
Returns: | None if progress_details not defined, else progress_details dict |
save
(atom_name, result, state='SUCCESS')[source]¶Put result for atom with provided name to storage.
save_retry_failure
(retry_name, failed_atom_name, failure)[source]¶Save subflow failure to retry controller history.
get
(atom_name)¶Gets the execute
results for an atom from storage.
get_failures
()¶Get all execute
failures that happened with this flow.
reset
(atom_name, state='PENDING')[source]¶Reset atom with given name (if the atom is not in a given state).
inject_atom_args
(atom_name, pairs, transient=True)[source]¶Add values into storage for a specific atom only.
Parameters: | transient – save the data in-memory only instead of persisting the data to backend storage (useful for resource-like objects or similar objects which can not be persisted) |
---|
This method injects a dictionary/pairs of arguments for an atom so that when that atom is scheduled for execution it will have immediate access to these arguments.
Note
Injected atom arguments take precedence over arguments
provided by predecessor atoms or arguments provided by injecting
into the flow scope (using
the inject()
method).
Warning
It should be noted that injected atom arguments (that are scoped
to the atom with the given name) should be serializable
whenever possible. This is a requirement for the
worker based engine which must
serialize (typically using json
) all
atom execute()
and
revert()
arguments to
be able to transmit those arguments to the target worker(s). If
the use-case being applied/desired is to later use the worker
based engine then it is highly recommended to ensure all injected
atoms (even transient ones) are serializable to avoid issues
that may appear later (when a object turned out to not actually
be serializable).
inject
(pairs, transient=False)[source]¶Add values into storage.
This method should be used to put flow parameters (requirements that are not satisfied by any atom in the flow) into storage.
Parameters: | transient – save the data in-memory only instead of persisting the data to backend storage (useful for resource-like objects or similar objects which can not be persisted) |
---|
Warning
It should be noted that injected flow arguments (that are scoped
to all atoms in this flow) should be serializable whenever
possible. This is a requirement for
the worker based engine which must
serialize (typically using json
) all
atom execute()
and
revert()
arguments to
be able to transmit those arguments to the target worker(s). If
the use-case being applied/desired is to later use the worker
based engine then it is highly recommended to ensure all injected
atoms (even transient ones) are serializable to avoid issues
that may appear later (when a object turned out to not actually
be serializable).
fetch_unsatisfied_args
(atom_name, args_mapping, scope_walker=None, optional_args=None)[source]¶Fetch unsatisfied execute
arguments using an atoms args mapping.
NOTE(harlowja): this takes into account the provided scope walker atoms who should produce the required value at runtime, as well as the transient/persistent flow and atom specific injected arguments. It does not check if the providers actually have produced the needed values; it just checks that they are registered to produce it in the future.
fetch_mapped_args
(args_mapping, atom_name=None, scope_walker=None, optional_args=None)[source]¶Fetch execute
arguments for an atom using its args mapping.
change_flow_state
(state)[source]¶Transition flow from old state to new state.
Returns (True, old_state)
if transition was performed,
or (False, old_state)
if it was ignored, or raises a
InvalidState
exception if transition
is invalid.
Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.