Examples

While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):

To explore more of these examples please check out the examples directory in the TaskFlow source tree.

Note

If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.

Hello world

Note

Full source located at hello_world.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16
17# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
18# an overly simplistic workflow can be created that runs using different
19# engines using different styles of execution (all can be used to run in
20# parallel if a workflow is provided that is parallelizable).
21
22class PrinterTask(task.Task):
23    def __init__(self, name, show_name=True, inject=None):
24        super().__init__(name, inject=inject)
25        self._show_name = show_name
26
27    def execute(self, output):
28        if self._show_name:
29            print("{}: {}".format(self.name, output))
30        else:
31            print(output)
32
33
34# This will be the work that we want done, which for this example is just to
35# print 'hello world' (like a song) using different tasks and different
36# execution models.
37song = lf.Flow("beats")
38
39# Unordered flows when ran can be ran in parallel; and a chorus is everyone
40# singing at once of course!
41hi_chorus = uf.Flow('hello')
42world_chorus = uf.Flow('world')
43for (name, hello, world) in [('bob', 'hello', 'world'),
44                             ('joe', 'hellooo', 'worllllld'),
45                             ('sue', "helloooooo!", 'wooorllld!')]:
46    hi_chorus.add(PrinterTask("%s@hello" % name,
47                              # This will show up to the execute() method of
48                              # the task as the argument named 'output' (which
49                              # will allow us to print the character we want).
50                              inject={'output': hello}))
51    world_chorus.add(PrinterTask("%s@world" % name,
52                                 inject={'output': world}))
53
54# The composition starts with the conductor and then runs in sequence with
55# the chorus running in parallel, but no matter what the 'hello' chorus must
56# always run before the 'world' chorus (otherwise the world will fall apart).
57song.add(PrinterTask("conductor@begin",
58                     show_name=False, inject={'output': "*ding*"}),
59         hi_chorus,
60         world_chorus,
61         PrinterTask("conductor@end",
62                     show_name=False, inject={'output': "*dong*"}))
63
64# Run in parallel using eventlet green threads...
65try:
66    import eventlet as _eventlet  # noqa
67except ImportError:
68    # No eventlet currently active, skip running with it...
69    pass
70else:
71    print("-- Running in parallel using eventlet --")
72    e = engines.load(song, executor='greenthreaded', engine='parallel',
73                     max_workers=1)
74    e.run()
75
76
77# Run in parallel using real threads...
78print("-- Running in parallel using threads --")
79e = engines.load(song, executor='threaded', engine='parallel',
80                 max_workers=1)
81e.run()
82
83
84# Run in parallel using external processes...
85print("-- Running in parallel using processes --")
86e = engines.load(song, executor='processes', engine='parallel',
87                 max_workers=1)
88e.run()
89
90
91# Run serially (aka, if the workflow could have been ran in parallel, it will
92# not be when ran in this mode)...
93print("-- Running serially --")
94e = engines.load(song, engine='serial')
95e.run()
96print("-- Statistics gathered --")
97print(e.statistics)

Passing values from and to tasks

Note

Full source located at simple_linear_pass.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow
15from taskflow import task
16
17# INTRO: This example shows how a task (in a linear/serial workflow) can
18# produce an output that can be then consumed/used by a downstream task.
19
20
21class TaskA(task.Task):
22    default_provides = 'a'
23
24    def execute(self):
25        print("Executing '%s'" % (self.name))
26        return 'a'
27
28
29class TaskB(task.Task):
30    def execute(self, a):
31        print("Executing '%s'" % (self.name))
32        print("Got input '%s'" % (a))
33
34
35print("Constructing...")
36wf = linear_flow.Flow("pass-from-to")
37wf.add(TaskA('a'), TaskB('b'))
38
39print("Loading...")
40e = engines.load(wf)
41
42print("Compiling...")
43e.compile()
44
45print("Preparing...")
46e.prepare()
47
48print("Running...")
49e.run()
50
51print("Done...")

Using listeners

Note

Full source located at echo_listener.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.DEBUG)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.listeners import logging as logging_listener
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16# INTRO: This example walks through a miniature workflow which will do a
17# simple echo operation; during this execution a listener is associated with
18# the engine to receive all notifications about what the flow has performed,
19# this example dumps that output to the stdout for viewing (at debug level
20# to show all the information which is possible).
21
22
23class Echo(task.Task):
24    def execute(self):
25        print(self.name)
26
27
28# Generate the work to be done (but don't do it yet).
29wf = lf.Flow('abc')
30wf.add(Echo('a'))
31wf.add(Echo('b'))
32wf.add(Echo('c'))
33
34# This will associate the listener with the engine (the listener
35# will automatically register for notifications with the engine and deregister
36# when the context is exited).
37e = engines.load(wf)
38with logging_listener.DynamicLoggingListener(e):
39    e.run()

Using listeners (to watch a phone call)

Note

Full source located at simple_linear_listening.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14from taskflow.types import notifier
15
16ANY = notifier.Notifier.ANY
17
18# INTRO: In this example we create two tasks (this time as functions instead
19# of task subclasses as in the simple_linear.py example), each of which ~calls~
20# a given ~phone~ number (provided as a function input) in a linear fashion
21# (one after the other).
22#
23# For a workflow which is serial this shows an extremely simple way
24# of structuring your tasks (the code that does the work) into a linear
25# sequence (the flow) and then passing the work off to an engine, with some
26# initial data to be ran in a reliable manner.
27#
28# This example shows a basic usage of the taskflow structures without involving
29# the complexity of persistence. Using the structures that taskflow provides
30# via tasks and flows makes it possible for you to easily at a later time
31# hook in a persistence layer (and then gain the functionality that offers)
32# when you decide the complexity of adding that layer in is 'worth it' for your
33# applications usage pattern (which some applications may not need).
34#
35# It **also** adds on to the simple_linear.py example by adding a set of
36# callback functions which the engine will call when a flow state transition
37# or task state transition occurs. These types of functions are useful for
38# updating task or flow progress, or for debugging, sending notifications to
39# external systems, or for other yet unknown future usage that you may create!
40
41
42def call_jim(context):
43    print("Calling jim.")
44    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
45
46
47def call_joe(context):
48    print("Calling joe.")
49    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
50
51
52def flow_watch(state, details):
53    print('Flow => %s' % state)
54
55
56def task_watch(state, details):
57    print('Task {} => {}'.format(details.get('task_name'), state))
58
59
60# Wrap your functions into a task type that knows how to treat your functions
61# as tasks. There was previous work done to just allow a function to be
62# directly passed, but in python 3.0 there is no easy way to capture an
63# instance method, so this wrapping approach was decided upon instead which
64# can attach to instance methods (if that's desired).
65flow = lf.Flow("Call-them")
66flow.add(task.FunctorTask(execute=call_jim))
67flow.add(task.FunctorTask(execute=call_joe))
68
69# Now load (but do not run) the flow using the provided initial data.
70engine = taskflow.engines.load(flow, store={
71    'context': {
72        "joe_number": 444,
73        "jim_number": 555,
74    }
75})
76
77# This is where we attach our callback functions to the 2 different
78# notification objects that an engine exposes. The usage of a ANY (kleene star)
79# here means that we want to be notified on all state changes, if you want to
80# restrict to a specific state change, just register that instead.
81engine.notifier.register(ANY, flow_watch)
82engine.atom_notifier.register(ANY, task_watch)
83
84# And now run!
85engine.run()

Dumping a in-memory backend

Note

Full source located at dump_memory_backend.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: in this example we create a dummy flow with a dummy task, and run
18# it using a in-memory backend and pre/post run we dump out the contents
19# of the in-memory backends tree structure (which can be quite useful to
20# look at for debugging or other analysis).
21
22
23class PrintTask(task.Task):
24    def execute(self):
25        print("Running '%s'" % self.name)
26
27# Make a little flow and run it...
28f = lf.Flow('root')
29for alpha in ['a', 'b', 'c']:
30    f.add(PrintTask(alpha))
31
32e = engines.load(f)
33e.compile()
34e.prepare()
35
36# After prepare the storage layer + backend can now be accessed safely...
37backend = e.storage.backend
38
39print("----------")
40print("Before run")
41print("----------")
42print(backend.memory.pformat())
43print("----------")
44
45e.run()
46
47print("---------")
48print("After run")
49print("---------")
50for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
51    value = backend.memory[path]
52    if value:
53        print("{} -> {}".format(path, value))
54    else:
55        print("%s" % (path))

Making phone calls

Note

Full source located at simple_linear.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create two tasks, each of which ~calls~ a given
16# ~phone~ number (provided as a function input) in a linear fashion (one after
17# the other). For a workflow which is serial this shows a extremely simple way
18# of structuring your tasks (the code that does the work) into a linear
19# sequence (the flow) and then passing the work off to an engine, with some
20# initial data to be ran in a reliable manner.
21#
22# NOTE(harlowja): This example shows a basic usage of the taskflow structures
23# without involving the complexity of persistence. Using the structures that
24# taskflow provides via tasks and flows makes it possible for you to easily at
25# a later time hook in a persistence layer (and then gain the functionality
26# that offers) when you decide the complexity of adding that layer in
27# is 'worth it' for your application's usage pattern (which certain
28# applications may not need).
29
30
31class CallJim(task.Task):
32    def execute(self, jim_number, *args, **kwargs):
33        print("Calling jim %s." % jim_number)
34
35
36class CallJoe(task.Task):
37    def execute(self, joe_number, *args, **kwargs):
38        print("Calling joe %s." % joe_number)
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('simple-linear').add(
43    CallJim(),
44    CallJoe()
45)
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store=dict(joe_number=444,
49                                      jim_number=555))

Making phone calls (automatically reverting)

Note

Full source located at reverting_linear.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create three tasks, each of which ~calls~ a given
16# number (provided as a function input), one of those tasks *fails* calling a
17# given number (the suzzie calling); this causes the workflow to enter the
18# reverting process, which activates the revert methods of the previous two
19# phone ~calls~.
20#
21# This simulated calling makes it appear like all three calls occur or all
22# three don't occur (transaction-like capabilities). No persistence layer is
23# used here so reverting and executing will *not* be tolerant of process
24# failure.
25
26
27class CallJim(task.Task):
28    def execute(self, jim_number, *args, **kwargs):
29        print("Calling jim %s." % jim_number)
30
31    def revert(self, jim_number, *args, **kwargs):
32        print("Calling %s and apologizing." % jim_number)
33
34
35class CallJoe(task.Task):
36    def execute(self, joe_number, *args, **kwargs):
37        print("Calling joe %s." % joe_number)
38
39    def revert(self, joe_number, *args, **kwargs):
40        print("Calling %s and apologizing." % joe_number)
41
42
43class CallSuzzie(task.Task):
44    def execute(self, suzzie_number, *args, **kwargs):
45        raise OSError("Suzzie not home right now.")
46
47
48# Create your flow and associated tasks (the work to be done).
49flow = lf.Flow('simple-linear').add(
50    CallJim(),
51    CallJoe(),
52    CallSuzzie()
53)
54
55try:
56    # Now run that flow using the provided initial data (store below).
57    taskflow.engines.run(flow, store=dict(joe_number=444,
58                                          jim_number=555,
59                                          suzzie_number=666))
60except Exception as e:
61    # NOTE(harlowja): This exception will be the exception that came out of the
62    # 'CallSuzzie' task instead of a different exception, this is useful since
63    # typically surrounding code wants to handle the original exception and not
64    # a wrapped or altered one.
65    #
66    # *WARNING* If this flow was multi-threaded and multiple active tasks threw
67    # exceptions then the above exception would be wrapped into a combined
68    # exception (the object has methods to iterate over the contained
69    # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
70    # how to deal with multiple tasks failing while running.
71    #
72    # You will also note that this is not a problem in this case since no
73    # parallelism is involved; this is ensured by the usage of a linear flow
74    # and the default engine type which is 'serial' vs being 'parallel'.
75    print("Flow failed: %s" % e)

Building a car

Note

Full source located at build_a_car.

  1import os
  2import sys
  3
  4
  5logging.basicConfig(level=logging.ERROR)
  6
  7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  8                                       os.pardir,
  9                                       os.pardir))
 10sys.path.insert(0, top_dir)
 11
 12
 13import taskflow.engines
 14from taskflow.patterns import graph_flow as gf
 15from taskflow.patterns import linear_flow as lf
 16from taskflow import task
 17from taskflow.types import notifier
 18
 19ANY = notifier.Notifier.ANY
 20
 21import example_utils as eu  # noqa
 22
 23
 24# INTRO: This example shows how a graph flow and linear flow can be used
 25# together to execute dependent & non-dependent tasks by going through the
 26# steps required to build a simplistic car (an assembly line if you will). It
 27# also shows how raw functions can be wrapped into a task object instead of
 28# being forced to use the more *heavy* task base class. This is useful in
 29# scenarios where pre-existing code has functions that you easily want to
 30# plug-in to taskflow, without requiring a large amount of code changes.
 31
 32
 33def build_frame():
 34    return 'steel'
 35
 36
 37def build_engine():
 38    return 'honda'
 39
 40
 41def build_doors():
 42    return '2'
 43
 44
 45def build_wheels():
 46    return '4'
 47
 48
 49# These just return true to indiciate success, they would in the real work
 50# do more than just that.
 51
 52def install_engine(frame, engine):
 53    return True
 54
 55
 56def install_doors(frame, windows_installed, doors):
 57    return True
 58
 59
 60def install_windows(frame, doors):
 61    return True
 62
 63
 64def install_wheels(frame, engine, engine_installed, wheels):
 65    return True
 66
 67
 68def trash(**kwargs):
 69    eu.print_wrapped("Throwing away pieces of car!")
 70
 71
 72def startup(**kwargs):
 73    # If you want to see the rollback function being activated try uncommenting
 74    # the following line.
 75    #
 76    # raise ValueError("Car not verified")
 77    return True
 78
 79
 80def verify(spec, **kwargs):
 81    # If the car is not what we ordered throw away the car (trigger reversion).
 82    for key, value in kwargs.items():
 83        if spec[key] != value:
 84            raise Exception("Car doesn't match spec!")
 85    return True
 86
 87
 88# These two functions connect into the state transition notification emission
 89# points that the engine outputs, they can be used to log state transitions
 90# that are occurring, or they can be used to suspend the engine (or perform
 91# other useful activities).
 92def flow_watch(state, details):
 93    print('Flow => %s' % state)
 94
 95
 96def task_watch(state, details):
 97    print('Task {} => {}'.format(details.get('task_name'), state))
 98
 99
100flow = lf.Flow("make-auto").add(
101    task.FunctorTask(startup, revert=trash, provides='ran'),
102    # A graph flow allows automatic dependency based ordering, the ordering
103    # is determined by analyzing the symbols required and provided and ordering
104    # execution based on a functioning order (if one exists).
105    gf.Flow("install-parts").add(
106        task.FunctorTask(build_frame, provides='frame'),
107        task.FunctorTask(build_engine, provides='engine'),
108        task.FunctorTask(build_doors, provides='doors'),
109        task.FunctorTask(build_wheels, provides='wheels'),
110        # These *_installed outputs allow for other tasks to depend on certain
111        # actions being performed (aka the components were installed), another
112        # way to do this is to link() the tasks manually instead of creating
113        # an 'artificial' data dependency that accomplishes the same goal the
114        # manual linking would result in.
115        task.FunctorTask(install_engine, provides='engine_installed'),
116        task.FunctorTask(install_doors, provides='doors_installed'),
117        task.FunctorTask(install_windows, provides='windows_installed'),
118        task.FunctorTask(install_wheels, provides='wheels_installed')),
119    task.FunctorTask(verify, requires=['frame',
120                                       'engine',
121                                       'doors',
122                                       'wheels',
123                                       'engine_installed',
124                                       'doors_installed',
125                                       'windows_installed',
126                                       'wheels_installed']))
127
128# This dictionary will be provided to the tasks as a specification for what
129# the tasks should produce, in this example this specification will influence
130# what those tasks do and what output they create. Different tasks depend on
131# different information from this specification, all of which will be provided
132# automatically by the engine to those tasks.
133spec = {
134    "frame": 'steel',
135    "engine": 'honda',
136    "doors": '2',
137    "wheels": '4',
138    # These are used to compare the result product, a car without the pieces
139    # installed is not a car after all.
140    "engine_installed": True,
141    "doors_installed": True,
142    "windows_installed": True,
143    "wheels_installed": True,
144}
145
146
147engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
148
149# This registers all (ANY) state transitions to trigger a call to the
150# flow_watch function for flow state transitions, and registers the
151# same all (ANY) state transitions for task state transitions.
152engine.notifier.register(ANY, flow_watch)
153engine.atom_notifier.register(ANY, task_watch)
154
155eu.print_wrapped("Building a car")
156engine.run()
157
158# Alter the specification and ensure that the reverting logic gets triggered
159# since the resultant car that will be built by the build_wheels function will
160# build a car with 4 doors only (not 5), this will cause the verification
161# task to mark the car that is produced as not matching the desired spec.
162spec['doors'] = 5
163
164engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
165engine.notifier.register(ANY, flow_watch)
166engine.atom_notifier.register(ANY, task_watch)
167
168eu.print_wrapped("Building a wrong car that doesn't match specification")
169try:
170    engine.run()
171except Exception as e:
172    eu.print_wrapped("Flow failed: %s" % e)

Iterating over the alphabet (using processes)

Note

Full source located at alphabet_soup.

 1import functools
 2import logging
 3import os
 4import string
 5import sys
 6import time
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12                                       os.pardir,
13                                       os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from taskflow import engines
18from taskflow import exceptions
19from taskflow.patterns import linear_flow
20from taskflow import task
21
22
23# In this example we show how a simple linear set of tasks can be executed
24# using local processes (and not threads or remote workers) with minimal (if
25# any) modification to those tasks to make them safe to run in this mode.
26#
27# This is useful since it allows further scaling up your workflows when thread
28# execution starts to become a bottleneck (which it can start to be due to the
29# GIL in python). It also offers a intermediary scalable runner that can be
30# used when the scale and/or setup of remote workers is not desirable.
31
32
33def progress_printer(task, event_type, details):
34    # This callback, attached to each task will be called in the local
35    # process (not the child processes)...
36    progress = details.pop('progress')
37    progress = int(progress * 100.0)
38    print("Task '%s' reached %d%% completion" % (task.name, progress))
39
40
41class AlphabetTask(task.Task):
42    # Second delay between each progress part.
43    _DELAY = 0.1
44
45    # This task will run in X main stages (each with a different progress
46    # report that will be delivered back to the running process...). The
47    # initial 0% and 100% are triggered automatically by the engine when
48    # a task is started and finished (so that's why those are not emitted
49    # here).
50    _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
51
52    def execute(self):
53        for p in self._PROGRESS_PARTS:
54            self.update_progress(p)
55            time.sleep(self._DELAY)
56
57
58print("Constructing...")
59soup = linear_flow.Flow("alphabet-soup")
60for letter in string.ascii_lowercase:
61    abc = AlphabetTask(letter)
62    abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
63                          functools.partial(progress_printer, abc))
64    soup.add(abc)
65try:
66    print("Loading...")
67    e = engines.load(soup, engine='parallel', executor='processes')
68    print("Compiling...")
69    e.compile()
70    print("Preparing...")
71    e.prepare()
72    print("Running...")
73    e.run()
74    print("Done: %s" % e.statistics)
75except exceptions.NotImplementedError as e:
76    print(e)

Watching execution timing

Note

Full source located at timing_listener.

 1import os
 2import random
 3import sys
 4import time
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import timing
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: in this example we will attach a listener to an engine
19# and have variable run time tasks run and show how the listener will print
20# out how long those tasks took (when they started and when they finished).
21#
22# This shows how timing metrics can be gathered (or attached onto an engine)
23# after a workflow has been constructed, making it easy to gather metrics
24# dynamically for situations where this kind of information is applicable (or
25# even adding this information on at a later point in the future when your
26# application starts to slow down).
27
28
29class VariableTask(task.Task):
30    def __init__(self, name):
31        super().__init__(name)
32        self._sleepy_time = random.random()
33
34    def execute(self):
35        time.sleep(self._sleepy_time)
36
37
38f = lf.Flow('root')
39f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
40e = engines.load(f)
41with timing.PrintingDurationListener(e):
42    e.run()

Distance calculator

Note

Full source located at distance_calculator

 1import math
 2import os
 3import sys
 4
 5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 6                                       os.pardir,
 7                                       os.pardir))
 8sys.path.insert(0, top_dir)
 9
10from taskflow import engines
11from taskflow.patterns import linear_flow
12from taskflow import task
13
14# INTRO: This shows how to use a tasks/atoms ability to take requirements from
15# its execute functions default parameters and shows how to provide those
16# via different methods when needed, to influence those parameters to in
17# this case calculate the distance between two points in 2D space.
18
19# A 2D point.
20Point = collections.namedtuple("Point", "x,y")
21
22
23def is_near(val, expected, tolerance=0.001):
24    # Floats don't really provide equality...
25    if val > (expected + tolerance):
26        return False
27    if val < (expected - tolerance):
28        return False
29    return True
30
31
32class DistanceTask(task.Task):
33    # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
34
35    default_provides = 'distance'
36
37    def execute(self, a=Point(0, 0), b=Point(0, 0)):
38        return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
39
40
41if __name__ == '__main__':
42    # For these we rely on the execute() methods points by default being
43    # at the origin (and we override it with store values when we want) at
44    # execution time (which then influences what is calculated).
45    any_distance = linear_flow.Flow("origin").add(DistanceTask())
46    results = engines.run(any_distance)
47    print(results)
48    print("{} is near-enough to {}: {}".format(
49        results['distance'], 0.0, is_near(results['distance'], 0.0)))
50
51    results = engines.run(any_distance, store={'a': Point(1, 1)})
52    print(results)
53    print("{} is near-enough to {}: {}".format(
54        results['distance'], 1.4142, is_near(results['distance'], 1.4142)))
55
56    results = engines.run(any_distance, store={'a': Point(10, 10)})
57    print(results)
58    print("{} is near-enough to {}: {}".format(
59        results['distance'], 14.14199, is_near(results['distance'], 14.14199)))
60
61    results = engines.run(any_distance,
62                          store={'a': Point(5, 5), 'b': Point(10, 10)})
63    print(results)
64    print("{} is near-enough to {}: {}".format(
65        results['distance'], 7.07106, is_near(results['distance'], 7.07106)))
66
67    # For this we use the ability to override at task creation time the
68    # optional arguments so that we don't need to continue to send them
69    # in via the 'store' argument like in the above (and we fix the new
70    # starting point 'a' at (10, 10) instead of (0, 0)...
71
72    ten_distance = linear_flow.Flow("ten")
73    ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
74    results = engines.run(ten_distance, store={'b': Point(10, 10)})
75    print(results)
76    print("{} is near-enough to {}: {}".format(
77        results['distance'], 0.0, is_near(results['distance'], 0.0)))
78
79    results = engines.run(ten_distance)
80    print(results)
81    print("{} is near-enough to {}: {}".format(
82        results['distance'], 14.14199, is_near(results['distance'], 14.14199)))

Table multiplier (in parallel)

Note

Full source located at parallel_table_multiply

  1import logging
  2import os
  3import random
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13import futurist
 14
 15from taskflow import engines
 16from taskflow.patterns import unordered_flow as uf
 17from taskflow import task
 18
 19# INTRO: This example walks through a miniature workflow which does a parallel
 20# table modification where each row in the table gets adjusted by a thread, or
 21# green thread (if eventlet is available) in parallel and then the result
 22# is reformed into a new table and some verifications are performed on it
 23# to ensure everything went as expected.
 24
 25
 26MULTIPLER = 10
 27
 28
 29class RowMultiplier(task.Task):
 30    """Performs a modification of an input row, creating a output row."""
 31
 32    def __init__(self, name, index, row, multiplier):
 33        super().__init__(name=name)
 34        self.index = index
 35        self.multiplier = multiplier
 36        self.row = row
 37
 38    def execute(self):
 39        return [r * self.multiplier for r in self.row]
 40
 41
 42def make_flow(table):
 43    # This creation will allow for parallel computation (since the flow here
 44    # is specifically unordered; and when things are unordered they have
 45    # no dependencies and when things have no dependencies they can just be
 46    # ran at the same time, limited in concurrency by the executor or max
 47    # workers of that executor...)
 48    f = uf.Flow("root")
 49    for i, row in enumerate(table):
 50        f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
 51    # NOTE(harlowja): at this point nothing has ran, the above is just
 52    # defining what should be done (but not actually doing it) and associating
 53    # an ordering dependencies that should be enforced (the flow pattern used
 54    # forces this), the engine in the later main() function will actually
 55    # perform this work...
 56    return f
 57
 58
 59def main():
 60    if len(sys.argv) == 2:
 61        tbl = []
 62        with open(sys.argv[1], 'rb') as fh:
 63            reader = csv.reader(fh)
 64            for row in reader:
 65                tbl.append([float(r) if r else 0.0 for r in row])
 66    else:
 67        # Make some random table out of thin air...
 68        tbl = []
 69        cols = random.randint(1, 100)
 70        rows = random.randint(1, 100)
 71        for _i in range(0, rows):
 72            row = []
 73            for _j in range(0, cols):
 74                row.append(random.random())
 75            tbl.append(row)
 76
 77    # Generate the work to be done.
 78    f = make_flow(tbl)
 79
 80    # Now run it (using the specified executor)...
 81    try:
 82        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
 83    except RuntimeError:
 84        # No eventlet currently active, use real threads instead.
 85        executor = futurist.ThreadPoolExecutor(max_workers=5)
 86    try:
 87        e = engines.load(f, engine='parallel', executor=executor)
 88        for st in e.run_iter():
 89            print(st)
 90    finally:
 91        executor.shutdown()
 92
 93    # Find the old rows and put them into place...
 94    #
 95    # TODO(harlowja): probably easier just to sort instead of search...
 96    computed_tbl = []
 97    for i in range(0, len(tbl)):
 98        for t in f:
 99            if t.index == i:
100                computed_tbl.append(e.storage.get(t.name))
101
102    # Do some basic validation (which causes the return code of this process
103    # to be different if things were not as expected...)
104    if len(computed_tbl) != len(tbl):
105        return 1
106    else:
107        return 0
108
109
110if __name__ == "__main__":
111    sys.exit(main())

Linear equation solver (explicit dependencies)

Note

Full source located at calculate_linear.

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15
16# INTRO: In this example a linear flow is used to group four tasks to calculate
17# a value. A single added task is used twice, showing how this can be done
18# and the twice added task takes in different bound values. In the first case
19# it uses default parameters ('x' and 'y') and in the second case arguments
20# are bound with ('z', 'd') keys from the engines internal storage mechanism.
21#
22# A multiplier task uses a binding that another task also provides, but this
23# example explicitly shows that 'z' parameter is bound with 'a' key
24# This shows that if a task depends on a key named the same as a key provided
25# from another task the name can be remapped to take the desired key from a
26# different origin.
27
28
29# This task provides some values from as a result of execution, this can be
30# useful when you want to provide values from a static set to other tasks that
31# depend on those values existing before those tasks can run.
32#
33# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
34# that just provides those values on engine running by prepopulating the
35# storage backend before your tasks are ran (which accomplishes a similar goal
36# in a more uniform manner).
37class Provider(task.Task):
38
39    def __init__(self, name, *args, **kwargs):
40        super().__init__(name=name, **kwargs)
41        self._provide = args
42
43    def execute(self):
44        return self._provide
45
46
47# This task adds two input variables and returns the result.
48#
49# Note that since this task does not have a revert() function (since addition
50# is a stateless operation) there are no side-effects that this function needs
51# to undo if some later operation fails.
52class Adder(task.Task):
53    def execute(self, x, y):
54        return x + y
55
56
57# This task multiplies an input variable by a multiplier and returns the
58# result.
59#
60# Note that since this task does not have a revert() function (since
61# multiplication is a stateless operation) and there are no side-effects that
62# this function needs to undo if some later operation fails.
63class Multiplier(task.Task):
64    def __init__(self, name, multiplier, provides=None, rebind=None):
65        super().__init__(name=name, provides=provides,
66                         rebind=rebind)
67        self._multiplier = multiplier
68
69    def execute(self, z):
70        return z * self._multiplier
71
72
73# Note here that the ordering is established so that the correct sequences
74# of operations occurs where the adding and multiplying is done according
75# to the expected and typical mathematical model. A graph flow could also be
76# used here to automatically infer & ensure the correct ordering.
77flow = lf.Flow('root').add(
78    # Provide the initial values for other tasks to depend on.
79    #
80    # x = 2, y = 3, d = 5
81    Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
82    # z = x+y = 5
83    Adder("add-1", provides='z'),
84    # a = z+d = 10
85    Adder("add-2", provides='a', rebind=['z', 'd']),
86    # Calculate 'r = a*3 = 30'
87    #
88    # Note here that the 'z' argument of the execute() function will not be
89    # bound to the 'z' variable provided from the above 'provider' object but
90    # instead the 'z' argument will be taken from the 'a' variable provided
91    # by the second add-2 listed above.
92    Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
93)
94
95# The result here will be all results (from all tasks) which is stored in an
96# in-memory storage location that backs this engine since it is not configured
97# with persistence storage.
98results = taskflow.engines.run(flow)
99print(results)

Linear equation solver (inferred dependencies)

Source: graph_flow.py

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import graph_flow as gf
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16
17# In this example there are complex *inferred* dependencies between tasks that
18# are used to perform a simple set of linear equations.
19#
20# As you will see below the tasks just define what they require as input
21# and produce as output (named values). Then the user doesn't care about
22# ordering the tasks (in this case the tasks calculate pieces of the overall
23# equation).
24#
25# As you will notice a graph flow resolves dependencies automatically using the
26# tasks symbol requirements and provided symbol values and no orderin
27# dependency has to be manually created.
28#
29# Also notice that flows of any types can be nested into a graph flow; showing
30# that subflow dependencies (and associated ordering) will be inferred too.
31
32
33class Adder(task.Task):
34
35    def execute(self, x, y):
36        return x + y
37
38
39flow = gf.Flow('root').add(
40    lf.Flow('nested_linear').add(
41        # x2 = y3+y4 = 12
42        Adder("add2", provides='x2', rebind=['y3', 'y4']),
43        # x1 = y1+y2 = 4
44        Adder("add1", provides='x1', rebind=['y1', 'y2'])
45    ),
46    # x5 = x1+x3 = 20
47    Adder("add5", provides='x5', rebind=['x1', 'x3']),
48    # x3 = x1+x2 = 16
49    Adder("add3", provides='x3', rebind=['x1', 'x2']),
50    # x4 = x2+y5 = 21
51    Adder("add4", provides='x4', rebind=['x2', 'y5']),
52    # x6 = x5+x4 = 41
53    Adder("add6", provides='x6', rebind=['x5', 'x4']),
54    # x7 = x6+x6 = 82
55    Adder("add7", provides='x7', rebind=['x6', 'x6']))
56
57# Provide the initial variable inputs using a storage dictionary.
58store = {
59    "y1": 1,
60    "y2": 3,
61    "y3": 5,
62    "y4": 7,
63    "y5": 9,
64}
65
66# This is the expected values that should be created.
67unexpected = 0
68expected = [
69    ('x1', 4),
70    ('x2', 12),
71    ('x3', 16),
72    ('x4', 21),
73    ('x5', 20),
74    ('x6', 41),
75    ('x7', 82),
76]
77
78result = taskflow.engines.run(
79    flow, engine='serial', store=store)
80
81print("Single threaded engine result %s" % result)
82for (name, value) in expected:
83    actual = result.get(name)
84    if actual != value:
85        sys.stderr.write("{} != {}\n".format(actual, value))
86        unexpected += 1
87
88result = taskflow.engines.run(
89    flow, engine='parallel', store=store)
90
91print("Multi threaded engine result %s" % result)
92for (name, value) in expected:
93    actual = result.get(name)
94    if actual != value:
95        sys.stderr.write("{} != {}\n".format(actual, value))
96        unexpected += 1
97
98if unexpected:
99    sys.exit(1)

Linear equation solver (in parallel)

Note

Full source located at calculate_in_parallel

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16# INTRO: These examples show how a linear flow and an unordered flow can be
17# used together to execute calculations in parallel and then use the
18# result for the next task/s. The adder task is used for all calculations
19# and argument bindings are used to set correct parameters for each task.
20
21
22# This task provides some values from as a result of execution, this can be
23# useful when you want to provide values from a static set to other tasks that
24# depend on those values existing before those tasks can run.
25#
26# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
27# that provides those values on engine running by prepopulating the storage
28# backend before your tasks are ran (which accomplishes a similar goal in a
29# more uniform manner).
30class Provider(task.Task):
31    def __init__(self, name, *args, **kwargs):
32        super().__init__(name=name, **kwargs)
33        self._provide = args
34
35    def execute(self):
36        return self._provide
37
38
39# This task adds two input variables and returns the result of that addition.
40#
41# Note that since this task does not have a revert() function (since addition
42# is a stateless operation) there are no side-effects that this function needs
43# to undo if some later operation fails.
44class Adder(task.Task):
45    def execute(self, x, y):
46        return x + y
47
48
49flow = lf.Flow('root').add(
50    # Provide the initial values for other tasks to depend on.
51    #
52    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
53    Provider("provide-adder", 2, 3, 5, 8,
54             provides=('x1', 'y1', 'x2', 'y2')),
55    # Note here that we define the flow that contains the 2 adders to be an
56    # unordered flow since the order in which these execute does not matter,
57    # another way to solve this would be to use a graph_flow pattern, which
58    # also can run in parallel (since they have no ordering dependencies).
59    uf.Flow('adders').add(
60        # Calculate 'z1 = x1+y1 = 5'
61        #
62        # Rebind here means that the execute() function x argument will be
63        # satisfied from a previous output named 'x1', and the y argument
64        # of execute() will be populated from the previous output named 'y1'
65        #
66        # The output (result of adding) will be mapped into a variable named
67        # 'z1' which can then be refereed to and depended on by other tasks.
68        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
69        # z2 = x2+y2 = 13
70        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
71    ),
72    # r = z1+z2 = 18
73    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
74
75
76# The result here will be all results (from all tasks) which is stored in an
77# in-memory storage location that backs this engine since it is not configured
78# with persistence storage.
79result = taskflow.engines.run(flow, engine='parallel')
80print(result)

Creating a volume (in parallel)

Note

Full source located at create_parallel_volume

 1import logging
 2import os
 3import random
 4import sys
 5import time
 6
 7logging.basicConfig(level=logging.ERROR)
 8
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13
14from oslo_utils import reflection
15
16from taskflow import engines
17from taskflow.listeners import printing
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20
21# INTRO: These examples show how unordered_flow can be used to create a large
22# number of fake volumes in parallel (or serially, depending on a constant that
23# can be easily changed).
24
25
26@contextlib.contextmanager
27def show_time(name):
28    start = time.time()
29    yield
30    end = time.time()
31    print(" -- {} took {:0.3f} seconds".format(name, end - start))
32
33
34# This affects how many volumes to create and how much time to *simulate*
35# passing for that volume to be created.
36MAX_CREATE_TIME = 3
37VOLUME_COUNT = 5
38
39# This will be used to determine if all the volumes are created in parallel
40# or whether the volumes are created serially (in an undefined ordered since
41# a unordered flow is used). Note that there is a disconnection between the
42# ordering and the concept of parallelism (since unordered items can still be
43# ran in a serial ordering). A typical use-case for offering both is to allow
44# for debugging using a serial approach, while when running at a larger scale
45# one would likely want to use the parallel approach.
46#
47# If you switch this flag from serial to parallel you can see the overall
48# time difference that this causes.
49SERIAL = False
50if SERIAL:
51    engine = 'serial'
52else:
53    engine = 'parallel'
54
55
56class VolumeCreator(task.Task):
57    def __init__(self, volume_id):
58        # Note here that the volume name is composed of the name of the class
59        # along with the volume id that is being created, since a name of a
60        # task uniquely identifies that task in storage it is important that
61        # the name be relevant and identifiable if the task is recreated for
62        # subsequent resumption (if applicable).
63        #
64        # UUIDs are *not* used as they can not be tied back to a previous tasks
65        # state on resumption (since they are unique and will vary for each
66        # task that is created). A name based off the volume id that is to be
67        # created is more easily tied back to the original task so that the
68        # volume create can be resumed/revert, and is much easier to use for
69        # audit and tracking purposes.
70        base_name = reflection.get_callable_name(self)
71        super().__init__(name="{}-{}".format(base_name, volume_id))
72        self._volume_id = volume_id
73
74    def execute(self):
75        print("Making volume %s" % (self._volume_id))
76        time.sleep(random.random() * MAX_CREATE_TIME)
77        print("Finished making volume %s" % (self._volume_id))
78
79
80# Assume there is no ordering dependency between volumes.
81flow = uf.Flow("volume-maker")
82for i in range(0, VOLUME_COUNT):
83    flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
84
85
86# Show how much time the overall engine loading and running takes.
87with show_time(name=flow.name.title()):
88    eng = engines.load(flow, engine=engine)
89    # This context manager automatically adds (and automatically removes) a
90    # helpful set of state transition notification printing helper utilities
91    # that show you exactly what transitions the engine is going through
92    # while running the various volume create tasks.
93    with printing.PrintingListener(eng):
94        eng.run()

Summation mapper(s) and reducer (in parallel)

Note

Full source located at simple_map_reduce

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13# INTRO: These examples show a simplistic map/reduce implementation where
14# a set of mapper(s) will sum a series of input numbers (in parallel) and
15# return their individual summed result. A reducer will then use those
16# produced values and perform a final summation and this result will then be
17# printed (and verified to ensure the calculation was as expected).
18
19from taskflow import engines
20from taskflow.patterns import linear_flow
21from taskflow.patterns import unordered_flow
22from taskflow import task
23
24
25class SumMapper(task.Task):
26    def execute(self, inputs):
27        # Sums some set of provided inputs.
28        return sum(inputs)
29
30
31class TotalReducer(task.Task):
32    def execute(self, *args, **kwargs):
33        # Reduces all mapped summed outputs into a single value.
34        total = 0
35        for (k, v) in kwargs.items():
36            # If any other kwargs was passed in, we don't want to use those
37            # in the calculation of the total...
38            if k.startswith('reduction_'):
39                total += v
40        return total
41
42
43def chunk_iter(chunk_size, upperbound):
44    """Yields back chunk size pieces from zero to upperbound - 1."""
45    chunk = []
46    for i in range(0, upperbound):
47        chunk.append(i)
48        if len(chunk) == chunk_size:
49            yield chunk
50            chunk = []
51
52
53# Upper bound of numbers to sum for example purposes...
54UPPER_BOUND = 10000
55
56# How many mappers we want to have.
57SPLIT = 10
58
59# How big of a chunk we want to give each mapper.
60CHUNK_SIZE = UPPER_BOUND // SPLIT
61
62# This will be the workflow we will compose and run.
63w = linear_flow.Flow("root")
64
65# The mappers will run in parallel.
66store = {}
67provided = []
68mappers = unordered_flow.Flow('map')
69for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
70    mapper_name = 'mapper_%s' % i
71    # Give that mapper some information to compute.
72    store[mapper_name] = chunk
73    # The reducer uses all of the outputs of the mappers, so it needs
74    # to be recorded that it needs access to them (under a specific name).
75    provided.append("reduction_%s" % i)
76    mappers.add(SumMapper(name=mapper_name,
77                          rebind={'inputs': mapper_name},
78                          provides=provided[-1]))
79w.add(mappers)
80
81# The reducer will run last (after all the mappers).
82w.add(TotalReducer('reducer', requires=provided))
83
84# Now go!
85e = engines.load(w, engine='parallel', store=store, max_workers=4)
86print("Running a parallel engine with options: %s" % e.options)
87e.run()
88
89# Now get the result the reducer created.
90total = e.storage.get('reducer')
91print("Calculated result = %s" % total)
92
93# Calculate it manually to verify that it worked...
94calc_total = sum(range(0, UPPER_BOUND))
95if calc_total != total:
96    sys.exit(1)

Sharing a thread pool executor (in parallel)

Note

Full source located at share_engine_thread

 1import os
 2import random
 3import sys
 4import time
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import futurist
14
15from taskflow import engines
16from taskflow.patterns import unordered_flow as uf
17from taskflow import task
18from taskflow.utils import threading_utils as tu
19
20# INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
21# run it using a shared thread pool executor to show how a single executor can
22# be used with more than one engine (sharing the execution thread pool between
23# them); this allows for saving resources and reusing threads in situations
24# where this is benefical.
25
26
27class DelayedTask(task.Task):
28    def __init__(self, name):
29        super().__init__(name=name)
30        self._wait_for = random.random()
31
32    def execute(self):
33        print("Running '{}' in thread '{}'".format(self.name, tu.get_ident()))
34        time.sleep(self._wait_for)
35
36
37f1 = uf.Flow("f1")
38f1.add(DelayedTask("f1-1"))
39f1.add(DelayedTask("f1-2"))
40
41f2 = uf.Flow("f2")
42f2.add(DelayedTask("f2-1"))
43f2.add(DelayedTask("f2-2"))
44
45# Run them all using the same futures (thread-pool based) executor...
46with futurist.ThreadPoolExecutor() as ex:
47    e1 = engines.load(f1, engine='parallel', executor=ex)
48    e2 = engines.load(f2, engine='parallel', executor=ex)
49    iters = [e1.run_iter(), e2.run_iter()]
50    # Iterate over a copy (so we can remove from the source list).
51    cloned_iters = list(iters)
52    while iters:
53        # Run a single 'step' of each iterator, forcing each engine to perform
54        # some work, then yield, and repeat until each iterator is consumed
55        # and there is no more engine work to be done.
56        for it in cloned_iters:
57            try:
58                next(it)
59            except StopIteration:
60                try:
61                    iters.remove(it)
62                except ValueError:
63                    pass

Storing & emitting a bill

Note

Full source located at fake_billing

  1import logging
  2import os
  3import sys
  4import time
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13from oslo_utils import uuidutils
 14
 15from taskflow import engines
 16from taskflow.listeners import printing
 17from taskflow.patterns import graph_flow as gf
 18from taskflow.patterns import linear_flow as lf
 19from taskflow import task
 20from taskflow.utils import misc
 21
 22# INTRO: This example walks through a miniature workflow which simulates
 23# the reception of an API request, creation of a database entry, driver
 24# activation (which invokes a 'fake' webservice) and final completion.
 25#
 26# This example also shows how a function/object (in this class the url sending)
 27# that occurs during driver activation can update the progress of a task
 28# without being aware of the internals of how to do this by associating a
 29# callback that the url sending can update as the sending progresses from 0.0%
 30# complete to 100% complete.
 31
 32
 33class DB:
 34    def query(self, sql):
 35        print("Querying with: %s" % (sql))
 36
 37
 38class UrlCaller:
 39    def __init__(self):
 40        self._send_time = 0.5
 41        self._chunks = 25
 42
 43    def send(self, url, data, status_cb=None):
 44        sleep_time = float(self._send_time) / self._chunks
 45        for i in range(0, len(data)):
 46            time.sleep(sleep_time)
 47            # As we send the data, each chunk we 'fake' send will progress
 48            # the sending progress that much further to 100%.
 49            if status_cb:
 50                status_cb(float(i) / len(data))
 51
 52
 53# Since engines save the output of tasks to a optional persistent storage
 54# backend resources have to be dealt with in a slightly different manner since
 55# resources are transient and can *not* be persisted (or serialized). For tasks
 56# that require access to a set of resources it is a common pattern to provide
 57# a object (in this case this object) on construction of those tasks via the
 58# task constructor.
 59class ResourceFetcher:
 60    def __init__(self):
 61        self._db_handle = None
 62        self._url_handle = None
 63
 64    @property
 65    def db_handle(self):
 66        if self._db_handle is None:
 67            self._db_handle = DB()
 68        return self._db_handle
 69
 70    @property
 71    def url_handle(self):
 72        if self._url_handle is None:
 73            self._url_handle = UrlCaller()
 74        return self._url_handle
 75
 76
 77class ExtractInputRequest(task.Task):
 78    def __init__(self, resources):
 79        super().__init__(provides="parsed_request")
 80        self._resources = resources
 81
 82    def execute(self, request):
 83        return {
 84            'user': request.user,
 85            'user_id': misc.as_int(request.id),
 86            'request_id': uuidutils.generate_uuid(),
 87        }
 88
 89
 90class MakeDBEntry(task.Task):
 91    def __init__(self, resources):
 92        super().__init__()
 93        self._resources = resources
 94
 95    def execute(self, parsed_request):
 96        db_handle = self._resources.db_handle
 97        db_handle.query("INSERT %s INTO mydb" % (parsed_request))
 98
 99    def revert(self, result, parsed_request):
100        db_handle = self._resources.db_handle
101        db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
102
103
104class ActivateDriver(task.Task):
105    def __init__(self, resources):
106        super().__init__(provides='sent_to')
107        self._resources = resources
108        self._url = "http://blahblah.com"
109
110    def execute(self, parsed_request):
111        print("Sending billing data to %s" % (self._url))
112        url_sender = self._resources.url_handle
113        # Note that here we attach our update_progress function (which is a
114        # function that the engine also 'binds' to) to the progress function
115        # that the url sending helper class uses. This allows the task progress
116        # to be tied to the url sending progress, which is very useful for
117        # downstream systems to be aware of what a task is doing at any time.
118        url_sender.send(self._url, json.dumps(parsed_request),
119                        status_cb=self.update_progress)
120        return self._url
121
122    def update_progress(self, progress, **kwargs):
123        # Override the parent method to also print out the status.
124        super().update_progress(progress, **kwargs)
125        print("{} is {:0.2f}% done".format(self.name, progress * 100))
126
127
128class DeclareSuccess(task.Task):
129    def execute(self, sent_to):
130        print("Done!")
131        print("All data processed and sent to %s" % (sent_to))
132
133
134class DummyUser:
135    def __init__(self, user, id_):
136        self.user = user
137        self.id = id_
138
139
140# Resources (db handles and similar) of course can *not* be persisted so we
141# need to make sure that we pass this resource fetcher to the tasks constructor
142# so that the tasks have access to any needed resources (the resources are
143# lazily loaded so that they are only created when they are used).
144resources = ResourceFetcher()
145flow = lf.Flow("initialize-me")
146
147# 1. First we extract the api request into a usable format.
148# 2. Then we go ahead and make a database entry for our request.
149flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
150
151# 3. Then we activate our payment method and finally declare success.
152sub_flow = gf.Flow("after-initialize")
153sub_flow.add(ActivateDriver(resources), DeclareSuccess())
154flow.add(sub_flow)
155
156# Initially populate the storage with the following request object,
157# prepopulating this allows the tasks that dependent on the 'request' variable
158# to start processing (in this case this is the ExtractInputRequest task).
159store = {
160    'request': DummyUser(user="bob", id_="1.35"),
161}
162eng = engines.load(flow, engine='serial', store=store)
163
164# This context manager automatically adds (and automatically removes) a
165# helpful set of state transition notification printing helper utilities
166# that show you exactly what transitions the engine is going through
167# while running the various billing related tasks.
168with printing.PrintingListener(eng):
169    eng.run()

Suspending a workflow & resuming

Note

Full source located at resume_from_backend

  1import logging
  2import os
  3import sys
  4
  5logging.basicConfig(level=logging.ERROR)
  6
  7self_dir = os.path.abspath(os.path.dirname(__file__))
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12sys.path.insert(0, self_dir)
 13
 14from oslo_utils import uuidutils
 15
 16import taskflow.engines
 17from taskflow.patterns import linear_flow as lf
 18from taskflow.persistence import models
 19from taskflow import task
 20
 21import example_utils as eu  # noqa
 22
 23# INTRO: In this example linear_flow is used to group three tasks, one which
 24# will suspend the future work the engine may do. This suspend engine is then
 25# discarded and the workflow is reloaded from the persisted data and then the
 26# workflow is resumed from where it was suspended. This allows you to see how
 27# to start an engine, have a task stop the engine from doing future work (if
 28# a multi-threaded engine is being used, then the currently active work is not
 29# preempted) and then resume the work later.
 30#
 31# Usage:
 32#
 33#   With a filesystem directory as backend
 34#
 35#     python taskflow/examples/resume_from_backend.py
 36#
 37#   With ZooKeeper as backend
 38#
 39#     python taskflow/examples/resume_from_backend.py \
 40#       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
 41
 42
 43# UTILITY FUNCTIONS #########################################
 44
 45
 46def print_task_states(flowdetail, msg):
 47    eu.print_wrapped(msg)
 48    print("Flow '{}' state: {}".format(flowdetail.name, flowdetail.state))
 49    # Sort by these so that our test validation doesn't get confused by the
 50    # order in which the items in the flow detail can be in.
 51    items = sorted((td.name, td.version, td.state, td.results)
 52                   for td in flowdetail)
 53    for item in items:
 54        print(" %s==%s: %s, result=%s" % item)
 55
 56
 57def find_flow_detail(backend, lb_id, fd_id):
 58    conn = backend.get_connection()
 59    lb = conn.get_logbook(lb_id)
 60    return lb.find(fd_id)
 61
 62
 63# CREATE FLOW ###############################################
 64
 65
 66class InterruptTask(task.Task):
 67    def execute(self):
 68        # DO NOT TRY THIS AT HOME
 69        engine.suspend()
 70
 71
 72class TestTask(task.Task):
 73    def execute(self):
 74        print('executing %s' % self)
 75        return 'ok'
 76
 77
 78def flow_factory():
 79    return lf.Flow('resume from backend example').add(
 80        TestTask(name='first'),
 81        InterruptTask(name='boom'),
 82        TestTask(name='second'))
 83
 84
 85# INITIALIZE PERSISTENCE ####################################
 86
 87with eu.get_backend() as backend:
 88
 89    # Create a place where the persistence information will be stored.
 90    book = models.LogBook("example")
 91    flow_detail = models.FlowDetail("resume from backend example",
 92                                    uuid=uuidutils.generate_uuid())
 93    book.add(flow_detail)
 94    with contextlib.closing(backend.get_connection()) as conn:
 95        conn.save_logbook(book)
 96
 97    # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
 98
 99    flow = flow_factory()
100    engine = taskflow.engines.load(flow, flow_detail=flow_detail,
101                                   book=book, backend=backend)
102
103    print_task_states(flow_detail, "At the beginning, there is no state")
104    eu.print_wrapped("Running")
105    engine.run()
106    print_task_states(flow_detail, "After running")
107
108    # RE-CREATE, RESUME, RUN ####################################
109
110    eu.print_wrapped("Resuming and running again")
111
112    # NOTE(harlowja): reload the flow detail from backend, this will allow us
113    # to resume the flow from its suspended state, but first we need to search
114    # for the right flow details in the correct logbook where things are
115    # stored.
116    #
117    # We could avoid re-loading the engine and just do engine.run() again, but
118    # this example shows how another process may unsuspend a given flow and
119    # start it again for situations where this is useful to-do (say the process
120    # running the above flow crashes).
121    flow2 = flow_factory()
122    flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
123    engine2 = taskflow.engines.load(flow2,
124                                    flow_detail=flow_detail_2,
125                                    backend=backend, book=book)
126    engine2.run()
127    print_task_states(flow_detail_2, "At the end")

Creating a virtual machine (resumable)

Note

Full source located at resume_vm_boot

  1import hashlib
  2import logging
  3import os
  4import random
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10self_dir = os.path.abspath(os.path.dirname(__file__))
 11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 12                                       os.pardir,
 13                                       os.pardir))
 14sys.path.insert(0, top_dir)
 15sys.path.insert(0, self_dir)
 16
 17import futurist
 18from oslo_utils import uuidutils
 19
 20from taskflow import engines
 21from taskflow import exceptions as exc
 22from taskflow.patterns import graph_flow as gf
 23from taskflow.patterns import linear_flow as lf
 24from taskflow.persistence import models
 25from taskflow import task
 26
 27import example_utils as eu  # noqa
 28
 29# INTRO: These examples show how a hierarchy of flows can be used to create a
 30# vm in a reliable & resumable manner using taskflow + a miniature version of
 31# what nova does while booting a vm.
 32
 33
 34@contextlib.contextmanager
 35def slow_down(how_long=0.5):
 36    try:
 37        yield how_long
 38    finally:
 39        if len(sys.argv) > 1:
 40            # Only both to do this if user input provided.
 41            print("** Ctrl-c me please!!! **")
 42            time.sleep(how_long)
 43
 44
 45class PrintText(task.Task):
 46    """Just inserts some text print outs in a workflow."""
 47    def __init__(self, print_what, no_slow=False):
 48        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 49        super().__init__(name="Print: %s" % (content_hash))
 50        self._text = print_what
 51        self._no_slow = no_slow
 52
 53    def execute(self):
 54        if self._no_slow:
 55            eu.print_wrapped(self._text)
 56        else:
 57            with slow_down():
 58                eu.print_wrapped(self._text)
 59
 60
 61class DefineVMSpec(task.Task):
 62    """Defines a vm specification to be."""
 63    def __init__(self, name):
 64        super().__init__(provides='vm_spec', name=name)
 65
 66    def execute(self):
 67        return {
 68            'type': 'kvm',
 69            'disks': 2,
 70            'vcpu': 1,
 71            'ips': 1,
 72            'volumes': 3,
 73        }
 74
 75
 76class LocateImages(task.Task):
 77    """Locates where the vm images are."""
 78    def __init__(self, name):
 79        super().__init__(provides='image_locations', name=name)
 80
 81    def execute(self, vm_spec):
 82        image_locations = {}
 83        for i in range(0, vm_spec['disks']):
 84            url = "http://www.yahoo.com/images/%s" % (i)
 85            image_locations[url] = "/tmp/%s.img" % (i)
 86        return image_locations
 87
 88
 89class DownloadImages(task.Task):
 90    """Downloads all the vm images."""
 91    def __init__(self, name):
 92        super().__init__(provides='download_paths',
 93                         name=name)
 94
 95    def execute(self, image_locations):
 96        for src, loc in image_locations.items():
 97            with slow_down(1):
 98                print("Downloading from {} => {}".format(src, loc))
 99        return sorted(image_locations.values())
100
101
102class CreateNetworkTpl(task.Task):
103    """Generates the network settings file to be placed in the images."""
104    SYSCONFIG_CONTENTS = """DEVICE=eth%s
105BOOTPROTO=static
106IPADDR=%s
107ONBOOT=yes"""
108
109    def __init__(self, name):
110        super().__init__(provides='network_settings',
111                         name=name)
112
113    def execute(self, ips):
114        settings = []
115        for i, ip in enumerate(ips):
116            settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
117        return settings
118
119
120class AllocateIP(task.Task):
121    """Allocates the ips for the given vm."""
122    def __init__(self, name):
123        super().__init__(provides='ips', name=name)
124
125    def execute(self, vm_spec):
126        ips = []
127        for _i in range(0, vm_spec.get('ips', 0)):
128            ips.append("192.168.0.%s" % (random.randint(1, 254)))
129        return ips
130
131
132class WriteNetworkSettings(task.Task):
133    """Writes all the network settings into the downloaded images."""
134    def execute(self, download_paths, network_settings):
135        for j, path in enumerate(download_paths):
136            with slow_down(1):
137                print("Mounting {} to /tmp/{}".format(path, j))
138            for i, setting in enumerate(network_settings):
139                filename = ("/tmp/etc/sysconfig/network-scripts/"
140                            "ifcfg-eth%s" % (i))
141                with slow_down(1):
142                    print("Writing to %s" % (filename))
143                    print(setting)
144
145
146class BootVM(task.Task):
147    """Fires off the vm boot operation."""
148    def execute(self, vm_spec):
149        print("Starting vm!")
150        with slow_down(1):
151            print("Created: %s" % (vm_spec))
152
153
154class AllocateVolumes(task.Task):
155    """Allocates the volumes for the vm."""
156    def execute(self, vm_spec):
157        volumes = []
158        for i in range(0, vm_spec['volumes']):
159            with slow_down(1):
160                volumes.append("/dev/vda%s" % (i + 1))
161                print("Allocated volume %s" % volumes[-1])
162        return volumes
163
164
165class FormatVolumes(task.Task):
166    """Formats the volumes for the vm."""
167    def execute(self, volumes):
168        for v in volumes:
169            print("Formatting volume %s" % v)
170            with slow_down(1):
171                pass
172            print("Formatted volume %s" % v)
173
174
175def create_flow():
176    # Setup the set of things to do (mini-nova).
177    flow = lf.Flow("root").add(
178        PrintText("Starting vm creation.", no_slow=True),
179        lf.Flow('vm-maker').add(
180            # First create a specification for the final vm to-be.
181            DefineVMSpec("define_spec"),
182            # This does all the image stuff.
183            gf.Flow("img-maker").add(
184                LocateImages("locate_images"),
185                DownloadImages("download_images"),
186            ),
187            # This does all the network stuff.
188            gf.Flow("net-maker").add(
189                AllocateIP("get_my_ips"),
190                CreateNetworkTpl("fetch_net_settings"),
191                WriteNetworkSettings("write_net_settings"),
192            ),
193            # This does all the volume stuff.
194            gf.Flow("volume-maker").add(
195                AllocateVolumes("allocate_my_volumes", provides='volumes'),
196                FormatVolumes("volume_formatter"),
197            ),
198            # Finally boot it all.
199            BootVM("boot-it"),
200        ),
201        # Ya it worked!
202        PrintText("Finished vm create.", no_slow=True),
203        PrintText("Instance is running!", no_slow=True))
204    return flow
205
206eu.print_wrapped("Initializing")
207
208# Setup the persistence & resumption layer.
209with eu.get_backend() as backend:
210
211    # Try to find a previously passed in tracking id...
212    try:
213        book_id, flow_id = sys.argv[2].split("+", 1)
214        if not uuidutils.is_uuid_like(book_id):
215            book_id = None
216        if not uuidutils.is_uuid_like(flow_id):
217            flow_id = None
218    except (IndexError, ValueError):
219        book_id = None
220        flow_id = None
221
222    # Set up how we want our engine to run, serial, parallel...
223    try:
224        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
225    except RuntimeError:
226        # No eventlet installed, just let the default be used instead.
227        executor = None
228
229    # Create/fetch a logbook that will track the workflows work.
230    book = None
231    flow_detail = None
232    if all([book_id, flow_id]):
233        # Try to find in a prior logbook and flow detail...
234        with contextlib.closing(backend.get_connection()) as conn:
235            try:
236                book = conn.get_logbook(book_id)
237                flow_detail = book.find(flow_id)
238            except exc.NotFound:
239                pass
240    if book is None and flow_detail is None:
241        book = models.LogBook("vm-boot")
242        with contextlib.closing(backend.get_connection()) as conn:
243            conn.save_logbook(book)
244        engine = engines.load_from_factory(create_flow,
245                                           backend=backend, book=book,
246                                           engine='parallel',
247                                           executor=executor)
248        print("!! Your tracking id is: '{}+{}'".format(
249            book.uuid, engine.storage.flow_uuid))
250        print("!! Please submit this on later runs for tracking purposes")
251    else:
252        # Attempt to load from a previously partially completed flow.
253        engine = engines.load_from_detail(flow_detail, backend=backend,
254                                          engine='parallel', executor=executor)
255
256    # Make me my vm please!
257    eu.print_wrapped('Running')
258    engine.run()
259
260# How to use.
261#
262# 1. $ python me.py "sqlite:////tmp/nova.db"
263# 2. ctrl-c before this finishes
264# 3. Find the tracking id (search for 'Your tracking id is')
265# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
266# 5. Watch it pick up where it left off.
267# 6. Profit!

Creating a volume (resumable)

Note

Full source located at resume_volume_create

  1import hashlib
  2import logging
  3import os
  4import random
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10self_dir = os.path.abspath(os.path.dirname(__file__))
 11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 12                                       os.pardir,
 13                                       os.pardir))
 14sys.path.insert(0, top_dir)
 15sys.path.insert(0, self_dir)
 16
 17from oslo_utils import uuidutils
 18
 19from taskflow import engines
 20from taskflow.patterns import graph_flow as gf
 21from taskflow.patterns import linear_flow as lf
 22from taskflow.persistence import models
 23from taskflow import task
 24
 25import example_utils  # noqa
 26
 27# INTRO: These examples show how a hierarchy of flows can be used to create a
 28# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
 29# version of what cinder does while creating a volume (very miniature).
 30
 31
 32@contextlib.contextmanager
 33def slow_down(how_long=0.5):
 34    try:
 35        yield how_long
 36    finally:
 37        print("** Ctrl-c me please!!! **")
 38        time.sleep(how_long)
 39
 40
 41def find_flow_detail(backend, book_id, flow_id):
 42    # NOTE(harlowja): this is used to attempt to find a given logbook with
 43    # a given id and a given flow details inside that logbook, we need this
 44    # reference so that we can resume the correct flow (as a logbook tracks
 45    # flows and a flow detail tracks a individual flow).
 46    #
 47    # Without a reference to the logbook and the flow details in that logbook
 48    # we will not know exactly what we should resume and that would mean we
 49    # can't resume what we don't know.
 50    with contextlib.closing(backend.get_connection()) as conn:
 51        lb = conn.get_logbook(book_id)
 52        return lb.find(flow_id)
 53
 54
 55class PrintText(task.Task):
 56    def __init__(self, print_what, no_slow=False):
 57        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 58        super().__init__(name="Print: %s" % (content_hash))
 59        self._text = print_what
 60        self._no_slow = no_slow
 61
 62    def execute(self):
 63        if self._no_slow:
 64            print("-" * (len(self._text)))
 65            print(self._text)
 66            print("-" * (len(self._text)))
 67        else:
 68            with slow_down():
 69                print("-" * (len(self._text)))
 70                print(self._text)
 71                print("-" * (len(self._text)))
 72
 73
 74class CreateSpecForVolumes(task.Task):
 75    def execute(self):
 76        volumes = []
 77        for i in range(0, random.randint(1, 10)):
 78            volumes.append({
 79                'type': 'disk',
 80                'location': "/dev/vda%s" % (i + 1),
 81            })
 82        return volumes
 83
 84
 85class PrepareVolumes(task.Task):
 86    def execute(self, volume_specs):
 87        for v in volume_specs:
 88            with slow_down():
 89                print("Dusting off your hard drive %s" % (v))
 90            with slow_down():
 91                print("Taking a well deserved break.")
 92            print("Your drive %s has been certified." % (v))
 93
 94
 95# Setup the set of things to do (mini-cinder).
 96flow = lf.Flow("root").add(
 97    PrintText("Starting volume create", no_slow=True),
 98    gf.Flow('maker').add(
 99        CreateSpecForVolumes("volume_specs", provides='volume_specs'),
100        PrintText("I need a nap, it took me a while to build those specs."),
101        PrepareVolumes(),
102    ),
103    PrintText("Finished volume create", no_slow=True))
104
105# Setup the persistence & resumption layer.
106with example_utils.get_backend() as backend:
107    try:
108        book_id, flow_id = sys.argv[2].split("+", 1)
109    except (IndexError, ValueError):
110        book_id = None
111        flow_id = None
112
113    if not all([book_id, flow_id]):
114        # If no 'tracking id' (think a fedex or ups tracking id) is provided
115        # then we create one by creating a logbook (where flow details are
116        # stored) and creating a flow detail (where flow and task state is
117        # stored). The combination of these 2 objects unique ids (uuids) allows
118        # the users of taskflow to reassociate the workflows that were
119        # potentially running (and which may have partially completed) back
120        # with taskflow so that those workflows can be resumed (or reverted)
121        # after a process/thread/engine has failed in someway.
122        book = models.LogBook('resume-volume-create')
123        flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
124        book.add(flow_detail)
125        with contextlib.closing(backend.get_connection()) as conn:
126            conn.save_logbook(book)
127        print("!! Your tracking id is: '{}+{}'".format(book.uuid,
128                                                       flow_detail.uuid))
129        print("!! Please submit this on later runs for tracking purposes")
130    else:
131        flow_detail = find_flow_detail(backend, book_id, flow_id)
132
133    # Load and run.
134    engine = engines.load(flow,
135                          flow_detail=flow_detail,
136                          backend=backend, engine='serial')
137    engine.run()
138
139# How to use.
140#
141# 1. $ python me.py "sqlite:////tmp/cinder.db"
142# 2. ctrl-c before this finishes
143# 3. Find the tracking id (search for 'Your tracking id is')
144# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
145# 5. Profit!

Running engines via iteration

Note

Full source located at run_by_iter

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6self_dir = os.path.abspath(os.path.dirname(__file__))
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13
14from taskflow import engines
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18
19# INTRO: This example shows how to run a set of engines at the same time, each
20# running in different engines using a single thread of control to iterate over
21# each engine (which causes that engine to advanced to its next state during
22# each iteration).
23
24
25class EchoTask(task.Task):
26    def execute(self, value):
27        print(value)
28        return chr(ord(value) + 1)
29
30
31def make_alphabet_flow(i):
32    f = lf.Flow("alphabet_%s" % (i))
33    start_value = 'A'
34    end_value = 'Z'
35    curr_value = start_value
36    while ord(curr_value) <= ord(end_value):
37        next_value = chr(ord(curr_value) + 1)
38        if curr_value != end_value:
39            f.add(EchoTask(name="echoer_%s" % curr_value,
40                           rebind={'value': curr_value},
41                           provides=next_value))
42        else:
43            f.add(EchoTask(name="echoer_%s" % curr_value,
44                           rebind={'value': curr_value}))
45        curr_value = next_value
46    return f
47
48
49# Adjust this number to change how many engines/flows run at once.
50flow_count = 1
51flows = []
52for i in range(0, flow_count):
53    f = make_alphabet_flow(i + 1)
54    flows.append(make_alphabet_flow(i + 1))
55engine_iters = []
56for f in flows:
57    e = engines.load(f)
58    e.compile()
59    e.storage.inject({'A': 'A'})
60    e.prepare()
61    engine_iters.append(e.run_iter())
62while engine_iters:
63    for it in list(engine_iters):
64        try:
65            print(next(it))
66        except StopIteration:
67            engine_iters.remove(it)

Controlling retries using a retry controller

Note

Full source located at retry_flow

 1import os
 2import sys
 3
 4logging.basicConfig(level=logging.ERROR)
 5
 6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 7                                       os.pardir,
 8                                       os.pardir))
 9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import retry
14from taskflow import task
15
16# INTRO: In this example we create a retry controller that receives a phone
17# directory and tries different phone numbers. The next task tries to call Jim
18# using the given number. If it is not a Jim's number, the task raises an
19# exception and retry controller takes the next number from the phone
20# directory and retries the call.
21#
22# This example shows a basic usage of retry controllers in a flow.
23# Retry controllers allows to revert and retry a failed subflow with new
24# parameters.
25
26
27class CallJim(task.Task):
28    def execute(self, jim_number):
29        print("Calling jim %s." % jim_number)
30        if jim_number != 555:
31            raise Exception("Wrong number!")
32        else:
33            print("Hello Jim!")
34
35    def revert(self, jim_number, **kwargs):
36        print("Wrong number, apologizing.")
37
38
39# Create your flow and associated tasks (the work to be done).
40flow = lf.Flow('retrying-linear',
41               retry=retry.ParameterizedForEach(
42                   rebind=['phone_directory'],
43                   provides='jim_number')).add(CallJim())
44
45# Now run that flow using the provided initial data (store below).
46taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

Distributed execution (simple)

Note

Full source located at wbe_simple_linear

  1import logging
  2import os
  3import sys
  4import tempfile
  5
  6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  7                                       os.pardir,
  8                                       os.pardir))
  9sys.path.insert(0, top_dir)
 10
 11from taskflow import engines
 12from taskflow.engines.worker_based import worker
 13from taskflow.patterns import linear_flow as lf
 14from taskflow.tests import utils
 15from taskflow.utils import threading_utils
 16
 17import example_utils  # noqa
 18
 19# INTRO: This example walks through a miniature workflow which shows how to
 20# start up a number of workers (these workers will process task execution and
 21# reversion requests using any provided input data) and then use an engine
 22# that creates a set of *capable* tasks and flows (the engine can not create
 23# tasks that the workers are not able to run, this will end in failure) that
 24# those workers will run and then executes that workflow seamlessly using the
 25# workers to perform the actual execution.
 26#
 27# NOTE(harlowja): this example simulates the expected larger number of workers
 28# by using a set of threads (which in this example simulate the remote workers
 29# that would typically be running on other external machines).
 30
 31# A filesystem can also be used as the queue transport (useful as simple
 32# transport type that does not involve setting up a larger mq system). If this
 33# is false then the memory transport is used instead, both work in standalone
 34# setups.
 35USE_FILESYSTEM = False
 36BASE_SHARED_CONF = {
 37    'exchange': 'taskflow',
 38}
 39
 40# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 41# recommended to run many worker threads in this example due to the types
 42# of errors mentioned in that issue.
 43MEMORY_WORKERS = 2
 44FILE_WORKERS = 1
 45WORKER_CONF = {
 46    # These are the tasks the worker can execute, they *must* be importable,
 47    # typically this list is used to restrict what workers may execute to
 48    # a smaller set of *allowed* tasks that are known to be safe (one would
 49    # not want to allow all python code to be executed).
 50    'tasks': [
 51        'taskflow.tests.utils:TaskOneArgOneReturn',
 52        'taskflow.tests.utils:TaskMultiArgOneReturn'
 53    ],
 54}
 55
 56
 57def run(engine_options):
 58    flow = lf.Flow('simple-linear').add(
 59        utils.TaskOneArgOneReturn(provides='result1'),
 60        utils.TaskMultiArgOneReturn(provides='result2')
 61    )
 62    eng = engines.load(flow,
 63                       store=dict(x=111, y=222, z=333),
 64                       engine='worker-based', **engine_options)
 65    eng.run()
 66    return eng.storage.fetch_all()
 67
 68
 69if __name__ == "__main__":
 70    logging.basicConfig(level=logging.ERROR)
 71
 72    # Setup our transport configuration and merge it into the worker and
 73    # engine configuration so that both of those use it correctly.
 74    shared_conf = dict(BASE_SHARED_CONF)
 75
 76    tmp_path = None
 77    if USE_FILESYSTEM:
 78        worker_count = FILE_WORKERS
 79        tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
 80        shared_conf.update({
 81            'transport': 'filesystem',
 82            'transport_options': {
 83                'data_folder_in': tmp_path,
 84                'data_folder_out': tmp_path,
 85                'polling_interval': 0.1,
 86            },
 87        })
 88    else:
 89        worker_count = MEMORY_WORKERS
 90        shared_conf.update({
 91            'transport': 'memory',
 92            'transport_options': {
 93                'polling_interval': 0.1,
 94            },
 95        })
 96    worker_conf = dict(WORKER_CONF)
 97    worker_conf.update(shared_conf)
 98    engine_options = dict(shared_conf)
 99    workers = []
100    worker_topics = []
101
102    try:
103        # Create a set of workers to simulate actual remote workers.
104        print('Running %s workers.' % (worker_count))
105        for i in range(0, worker_count):
106            worker_conf['topic'] = 'worker-%s' % (i + 1)
107            worker_topics.append(worker_conf['topic'])
108            w = worker.Worker(**worker_conf)
109            runner = threading_utils.daemon_thread(w.run)
110            runner.start()
111            w.wait()
112            workers.append((runner, w.stop))
113
114        # Now use those workers to do something.
115        print('Executing some work.')
116        engine_options['topics'] = worker_topics
117        result = run(engine_options)
118        print('Execution finished.')
119        # This is done so that the test examples can work correctly
120        # even when the keys change order (which will happen in various
121        # python versions).
122        print("Result = %s" % json.dumps(result, sort_keys=True))
123    finally:
124        # And cleanup.
125        print('Stopping workers.')
126        while workers:
127            r, stopper = workers.pop()
128            stopper()
129            r.join()
130        if tmp_path:
131            example_utils.rm_path(tmp_path)

Distributed notification (simple)

Note

Full source located at wbe_event_sender

  1import os
  2import string
  3import sys
  4import time
  5
  6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  7                                       os.pardir,
  8                                       os.pardir))
  9sys.path.insert(0, top_dir)
 10
 11from taskflow import engines
 12from taskflow.engines.worker_based import worker
 13from taskflow.patterns import linear_flow as lf
 14from taskflow import task
 15from taskflow.types import notifier
 16from taskflow.utils import threading_utils
 17
 18ANY = notifier.Notifier.ANY
 19
 20# INTRO: These examples show how to use a remote worker's event notification
 21# attribute to proxy back task event notifications to the controlling process.
 22#
 23# In this case a simple set of events is triggered by a worker running a
 24# task (simulated to be remote by using a kombu memory transport and threads).
 25# Those events that the 'remote worker' produces will then be proxied back to
 26# the task that the engine is running 'remotely', and then they will be emitted
 27# back to the original callbacks that exist in the originating engine
 28# process/thread. This creates a one-way *notification* channel that can
 29# transparently be used in-process, outside-of-process using remote workers and
 30# so-on that allows tasks to signal to its controlling process some sort of
 31# action that has occurred that the task may need to tell others about (for
 32# example to trigger some type of response when the task reaches 50% done...).
 33
 34
 35def event_receiver(event_type, details):
 36    """This is the callback that (in this example) doesn't do much..."""
 37    print("Recieved event '%s'" % event_type)
 38    print("Details = %s" % details)
 39
 40
 41class EventReporter(task.Task):
 42    """This is the task that will be running 'remotely' (not really remote)."""
 43
 44    EVENTS = tuple(string.ascii_uppercase)
 45    EVENT_DELAY = 0.1
 46
 47    def execute(self):
 48        for i, e in enumerate(self.EVENTS):
 49            details = {
 50                'leftover': self.EVENTS[i:],
 51            }
 52            self.notifier.notify(e, details)
 53            time.sleep(self.EVENT_DELAY)
 54
 55
 56BASE_SHARED_CONF = {
 57    'exchange': 'taskflow',
 58    'transport': 'memory',
 59    'transport_options': {
 60        'polling_interval': 0.1,
 61    },
 62}
 63
 64# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 65# recommended to run many worker threads in this example due to the types
 66# of errors mentioned in that issue.
 67MEMORY_WORKERS = 1
 68WORKER_CONF = {
 69    'tasks': [
 70        # Used to locate which tasks we can run (we don't want to allow
 71        # arbitrary code/tasks to be ran by any worker since that would
 72        # open up a variety of vulnerabilities).
 73        '%s:EventReporter' % (__name__),
 74    ],
 75}
 76
 77
 78def run(engine_options):
 79    reporter = EventReporter()
 80    reporter.notifier.register(ANY, event_receiver)
 81    flow = lf.Flow('event-reporter').add(reporter)
 82    eng = engines.load(flow, engine='worker-based', **engine_options)
 83    eng.run()
 84
 85
 86if __name__ == "__main__":
 87    logging.basicConfig(level=logging.ERROR)
 88
 89    # Setup our transport configuration and merge it into the worker and
 90    # engine configuration so that both of those objects use it correctly.
 91    worker_conf = dict(WORKER_CONF)
 92    worker_conf.update(BASE_SHARED_CONF)
 93    engine_options = dict(BASE_SHARED_CONF)
 94    workers = []
 95
 96    # These topics will be used to request worker information on; those
 97    # workers will respond with their capabilities which the executing engine
 98    # will use to match pending tasks to a matched worker, this will cause
 99    # the task to be sent for execution, and the engine will wait until it
100    # is finished (a response is received) and then the engine will either
101    # continue with other tasks, do some retry/failure resolution logic or
102    # stop (and potentially re-raise the remote workers failure)...
103    worker_topics = []
104
105    try:
106        # Create a set of worker threads to simulate actual remote workers...
107        print('Running %s workers.' % (MEMORY_WORKERS))
108        for i in range(0, MEMORY_WORKERS):
109            # Give each one its own unique topic name so that they can
110            # correctly communicate with the engine (they will all share the
111            # same exchange).
112            worker_conf['topic'] = 'worker-%s' % (i + 1)
113            worker_topics.append(worker_conf['topic'])
114            w = worker.Worker(**worker_conf)
115            runner = threading_utils.daemon_thread(w.run)
116            runner.start()
117            w.wait()
118            workers.append((runner, w.stop))
119
120        # Now use those workers to do something.
121        print('Executing some work.')
122        engine_options['topics'] = worker_topics
123        result = run(engine_options)
124        print('Execution finished.')
125    finally:
126        # And cleanup.
127        print('Stopping workers.')
128        while workers:
129            r, stopper = workers.pop()
130            stopper()
131            r.join()

Distributed mandelbrot (complex)

Note

Full source located at wbe_mandelbrot

Output

Generated mandelbrot fractal

Code

  1import math
  2import os
  3import sys
  4
  5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  6                                       os.pardir,
  7                                       os.pardir))
  8sys.path.insert(0, top_dir)
  9
 10from taskflow import engines
 11from taskflow.engines.worker_based import worker
 12from taskflow.patterns import unordered_flow as uf
 13from taskflow import task
 14from taskflow.utils import threading_utils
 15
 16# INTRO: This example walks through a workflow that will in parallel compute
 17# a mandelbrot result set (using X 'remote' workers) and then combine their
 18# results together to form a final mandelbrot fractal image. It shows a usage
 19# of taskflow to perform a well-known embarrassingly parallel problem that has
 20# the added benefit of also being an elegant visualization.
 21#
 22# NOTE(harlowja): this example simulates the expected larger number of workers
 23# by using a set of threads (which in this example simulate the remote workers
 24# that would typically be running on other external machines).
 25#
 26# NOTE(harlowja): to have it produce an image run (after installing pillow):
 27#
 28# $ python taskflow/examples/wbe_mandelbrot.py output.png
 29
 30BASE_SHARED_CONF = {
 31    'exchange': 'taskflow',
 32}
 33WORKERS = 2
 34WORKER_CONF = {
 35    # These are the tasks the worker can execute, they *must* be importable,
 36    # typically this list is used to restrict what workers may execute to
 37    # a smaller set of *allowed* tasks that are known to be safe (one would
 38    # not want to allow all python code to be executed).
 39    'tasks': [
 40        '%s:MandelCalculator' % (__name__),
 41    ],
 42}
 43ENGINE_CONF = {
 44    'engine': 'worker-based',
 45}
 46
 47# Mandelbrot & image settings...
 48IMAGE_SIZE = (512, 512)
 49CHUNK_COUNT = 8
 50MAX_ITERATIONS = 25
 51
 52
 53class MandelCalculator(task.Task):
 54    def execute(self, image_config, mandelbrot_config, chunk):
 55        """Returns the number of iterations before the computation "escapes".
 56
 57        Given the real and imaginary parts of a complex number, determine if it
 58        is a candidate for membership in the mandelbrot set given a fixed
 59        number of iterations.
 60        """
 61
 62        # Parts borrowed from (credit to mark harris and benoît mandelbrot).
 63        #
 64        # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
 65        def mandelbrot(x, y, max_iters):
 66            c = complex(x, y)
 67            z = 0.0j
 68            for i in range(max_iters):
 69                z = z * z + c
 70                if (z.real * z.real + z.imag * z.imag) >= 4:
 71                    return i
 72            return max_iters
 73
 74        min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
 75        height, width = image_config['size']
 76        pixel_size_x = (max_x - min_x) / width
 77        pixel_size_y = (max_y - min_y) / height
 78        block = []
 79        for y in range(chunk[0], chunk[1]):
 80            row = []
 81            imag = min_y + y * pixel_size_y
 82            for x in range(0, width):
 83                real = min_x + x * pixel_size_x
 84                row.append(mandelbrot(real, imag, max_iters))
 85            block.append(row)
 86        return block
 87
 88
 89def calculate(engine_conf):
 90    # Subdivide the work into X pieces, then request each worker to calculate
 91    # one of those chunks and then later we will write these chunks out to
 92    # an image bitmap file.
 93
 94    # And unordered flow is used here since the mandelbrot calculation is an
 95    # example of an embarrassingly parallel computation that we can scatter
 96    # across as many workers as possible.
 97    flow = uf.Flow("mandelbrot")
 98
 99    # These symbols will be automatically given to tasks as input to their
100    # execute method, in this case these are constants used in the mandelbrot
101    # calculation.
102    store = {
103        'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
104        'image_config': {
105            'size': IMAGE_SIZE,
106        }
107    }
108
109    # We need the task names to be in the right order so that we can extract
110    # the final results in the right order (we don't care about the order when
111    # executing).
112    task_names = []
113
114    # Compose our workflow.
115    height, _width = IMAGE_SIZE
116    chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
117    for i in range(0, CHUNK_COUNT):
118        chunk_name = 'chunk_%s' % i
119        task_name = "calculation_%s" % i
120        # Break the calculation up into chunk size pieces.
121        rows = [i * chunk_size, i * chunk_size + chunk_size]
122        flow.add(
123            MandelCalculator(task_name,
124                             # This ensures the storage symbol with name
125                             # 'chunk_name' is sent into the tasks local
126                             # symbol 'chunk'. This is how we give each
127                             # calculator its own correct sequence of rows
128                             # to work on.
129                             rebind={'chunk': chunk_name}))
130        store[chunk_name] = rows
131        task_names.append(task_name)
132
133    # Now execute it.
134    eng = engines.load(flow, store=store, engine_conf=engine_conf)
135    eng.run()
136
137    # Gather all the results and order them for further processing.
138    gather = []
139    for name in task_names:
140        gather.extend(eng.storage.get(name))
141    points = []
142    for y, row in enumerate(gather):
143        for x, color in enumerate(row):
144            points.append(((x, y), color))
145    return points
146
147
148def write_image(results, output_filename=None):
149    print("Gathered %s results that represents a mandelbrot"
150          " image (using %s chunks that are computed jointly"
151          " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
152    if not output_filename:
153        return
154
155    # Pillow (the PIL fork) saves us from writing our own image writer...
156    try:
157        from PIL import Image
158    except ImportError as e:
159        # To currently get this (may change in the future),
160        # $ pip install Pillow
161        raise RuntimeError("Pillow is required to write image files: %s" % e)
162
163    # Limit to 255, find the max and normalize to that...
164    color_max = 0
165    for _point, color in results:
166        color_max = max(color, color_max)
167
168    # Use gray scale since we don't really have other colors.
169    img = Image.new('L', IMAGE_SIZE, "black")
170    pixels = img.load()
171    for (x, y), color in results:
172        if color_max == 0:
173            color = 0
174        else:
175            color = int((float(color) / color_max) * 255.0)
176        pixels[x, y] = color
177    img.save(output_filename)
178
179
180def create_fractal():
181    logging.basicConfig(level=logging.ERROR)
182
183    # Setup our transport configuration and merge it into the worker and
184    # engine configuration so that both of those use it correctly.
185    shared_conf = dict(BASE_SHARED_CONF)
186    shared_conf.update({
187        'transport': 'memory',
188        'transport_options': {
189            'polling_interval': 0.1,
190        },
191    })
192
193    if len(sys.argv) >= 2:
194        output_filename = sys.argv[1]
195    else:
196        output_filename = None
197
198    worker_conf = dict(WORKER_CONF)
199    worker_conf.update(shared_conf)
200    engine_conf = dict(ENGINE_CONF)
201    engine_conf.update(shared_conf)
202    workers = []
203    worker_topics = []
204
205    print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
206    try:
207        # Create a set of workers to simulate actual remote workers.
208        print('Running %s workers.' % (WORKERS))
209        for i in range(0, WORKERS):
210            worker_conf['topic'] = 'calculator_%s' % (i + 1)
211            worker_topics.append(worker_conf['topic'])
212            w = worker.Worker(**worker_conf)
213            runner = threading_utils.daemon_thread(w.run)
214            runner.start()
215            w.wait()
216            workers.append((runner, w.stop))
217
218        # Now use those workers to do something.
219        engine_conf['topics'] = worker_topics
220        results = calculate(engine_conf)
221        print('Execution finished.')
222    finally:
223        # And cleanup.
224        print('Stopping workers.')
225        while workers:
226            r, stopper = workers.pop()
227            stopper()
228            r.join()
229    print("Writing image...")
230    write_image(results, output_filename=output_filename)
231
232
233if __name__ == "__main__":
234    create_fractal()

Jobboard producer/consumer (simple)

Note

Full source located at jobboard_produce_consume_colors

  1import contextlib
  2import logging
  3import os
  4import random
  5import sys
  6import threading
  7import time
  8
  9logging.basicConfig(level=logging.ERROR)
 10
 11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 12                                       os.pardir,
 13                                       os.pardir))
 14sys.path.insert(0, top_dir)
 15
 16from zake import fake_client
 17
 18from taskflow import exceptions as excp
 19from taskflow.jobs import backends
 20from taskflow.utils import threading_utils
 21
 22# In this example we show how a jobboard can be used to post work for other
 23# entities to work on. This example creates a set of jobs using one producer
 24# thread (typically this would be split across many machines) and then having
 25# other worker threads with their own jobboards select work using a given
 26# filters [red/blue] and then perform that work (and consuming or abandoning
 27# the job after it has been completed or failed).
 28
 29# Things to note:
 30# - No persistence layer is used (or logbook), just the job details are used
 31#   to determine if a job should be selected by a worker or not.
 32# - This example runs in a single process (this is expected to be atypical
 33#   but this example shows that it can be done if needed, for testing...)
 34# - The iterjobs(), claim(), consume()/abandon() worker workflow.
 35# - The post() producer workflow.
 36
 37SHARED_CONF = {
 38    'path': "/taskflow/jobs",
 39    'board': 'zookeeper',
 40}
 41
 42# How many workers and producers of work will be created (as threads).
 43PRODUCERS = 3
 44WORKERS = 5
 45
 46# How many units of work each producer will create.
 47PRODUCER_UNITS = 10
 48
 49# How many units of work are expected to be produced (used so workers can
 50# know when to stop running and shutdown, typically this would not be a
 51# a value but we have to limit this example's execution time to be less than
 52# infinity).
 53EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
 54
 55# Delay between producing/consuming more work.
 56WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
 57
 58# To ensure threads don't trample other threads output.
 59STDOUT_LOCK = threading.Lock()
 60
 61
 62def dispatch_work(job):
 63    # This is where the jobs contained work *would* be done
 64    time.sleep(1.0)
 65
 66
 67def safe_print(name, message, prefix=""):
 68    with STDOUT_LOCK:
 69        if prefix:
 70            print("{} {}: {}".format(prefix, name, message))
 71        else:
 72            print("{}: {}".format(name, message))
 73
 74
 75def worker(ident, client, consumed):
 76    # Create a personal board (using the same client so that it works in
 77    # the same process) and start looking for jobs on the board that we want
 78    # to perform.
 79    name = "W-%s" % (ident)
 80    safe_print(name, "started")
 81    claimed_jobs = 0
 82    consumed_jobs = 0
 83    abandoned_jobs = 0
 84    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
 85        while len(consumed) != EXPECTED_UNITS:
 86            favorite_color = random.choice(['blue', 'red'])
 87            for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
 88                # See if we should even bother with it...
 89                if job.details.get('color') != favorite_color:
 90                    continue
 91                safe_print(name, "'%s' [attempting claim]" % (job))
 92                try:
 93                    board.claim(job, name)
 94                    claimed_jobs += 1
 95                    safe_print(name, "'%s' [claimed]" % (job))
 96                except (excp.NotFound, excp.UnclaimableJob):
 97                    safe_print(name, "'%s' [claim unsuccessful]" % (job))
 98                else:
 99                    try:
100                        dispatch_work(job)
101                        board.consume(job, name)
102                        safe_print(name, "'%s' [consumed]" % (job))
103                        consumed_jobs += 1
104                        consumed.append(job)
105                    except Exception:
106                        board.abandon(job, name)
107                        abandoned_jobs += 1
108                        safe_print(name, "'%s' [abandoned]" % (job))
109            time.sleep(WORKER_DELAY)
110    safe_print(name,
111               "finished (claimed %s jobs, consumed %s jobs,"
112               " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
113                                        abandoned_jobs), prefix=">>>")
114
115
116def producer(ident, client):
117    # Create a personal board (using the same client so that it works in
118    # the same process) and start posting jobs on the board that we want
119    # some entity to perform.
120    name = "P-%s" % (ident)
121    safe_print(name, "started")
122    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
123        for i in range(0, PRODUCER_UNITS):
124            job_name = "{}-{}".format(name, i)
125            details = {
126                'color': random.choice(['red', 'blue']),
127            }
128            job = board.post(job_name, book=None, details=details)
129            safe_print(name, "'%s' [posted]" % (job))
130            time.sleep(PRODUCER_DELAY)
131    safe_print(name, "finished", prefix=">>>")
132
133
134def main():
135    # TODO(harlowja): Hack to make eventlet work right, remove when the
136    # following is fixed: https://github.com/eventlet/eventlet/issues/230
137    from taskflow.utils import eventlet_utils as _eu  # noqa
138    try:
139        import eventlet as _eventlet  # noqa
140    except ImportError:
141        pass
142
143    with contextlib.closing(fake_client.FakeClient()) as c:
144        created = []
145        for i in range(0, PRODUCERS):
146            p = threading_utils.daemon_thread(producer, i + 1, c)
147            created.append(p)
148            p.start()
149        consumed = collections.deque()
150        for i in range(0, WORKERS):
151            w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
152            created.append(w)
153            w.start()
154        while created:
155            t = created.pop()
156            t.join()
157        # At the end there should be nothing leftover, let's verify that.
158        board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
159        board.connect()
160        with contextlib.closing(board):
161            if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
162                return 1
163            return 0
164
165
166if __name__ == "__main__":
167    sys.exit(main())

Conductor simulating a CI pipeline

Note

Full source located at tox_conductor

  1import itertools
  2import logging
  3import os
  4import shutil
  5import socket
  6import sys
  7import tempfile
  8import threading
  9import time
 10
 11logging.basicConfig(level=logging.ERROR)
 12
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17
 18from oslo_utils import timeutils
 19from oslo_utils import uuidutils
 20from zake import fake_client
 21
 22from taskflow.conductors import backends as conductors
 23from taskflow import engines
 24from taskflow.jobs import backends as boards
 25from taskflow.patterns import linear_flow
 26from taskflow.persistence import backends as persistence
 27from taskflow.persistence import models
 28from taskflow import task
 29from taskflow.utils import threading_utils
 30
 31# INTRO: This examples shows how a worker/producer can post desired work (jobs)
 32# to a jobboard and a conductor can consume that work (jobs) from that jobboard
 33# and execute those jobs in a reliable & async manner (for example, if the
 34# conductor were to crash then the job will be released back onto the jobboard
 35# and another conductor can attempt to finish it, from wherever that job last
 36# left off).
 37#
 38# In this example a in-memory jobboard (and in-memory storage) is created and
 39# used that simulates how this would be done at a larger scale (it is an
 40# example after all).
 41
 42# Restrict how long this example runs for...
 43RUN_TIME = 5
 44REVIEW_CREATION_DELAY = 0.5
 45SCAN_DELAY = 0.1
 46NAME = "{}_{}".format(socket.getfqdn(), os.getpid())
 47
 48# This won't really use zookeeper but will use a local version of it using
 49# the zake library that mimics an actual zookeeper cluster using threads and
 50# an in-memory data structure.
 51JOBBOARD_CONF = {
 52    'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
 53}
 54
 55
 56class RunReview(task.Task):
 57    # A dummy task that clones the review and runs tox...
 58
 59    def _clone_review(self, review, temp_dir):
 60        print("Cloning review '{}' into {}".format(review['id'], temp_dir))
 61
 62    def _run_tox(self, temp_dir):
 63        print("Running tox in %s" % temp_dir)
 64
 65    def execute(self, review, temp_dir):
 66        self._clone_review(review, temp_dir)
 67        self._run_tox(temp_dir)
 68
 69
 70class MakeTempDir(task.Task):
 71    # A task that creates and destroys a temporary dir (on failure).
 72    #
 73    # It provides the location of the temporary dir for other tasks to use
 74    # as they see fit.
 75
 76    default_provides = 'temp_dir'
 77
 78    def execute(self):
 79        return tempfile.mkdtemp()
 80
 81    def revert(self, *args, **kwargs):
 82        temp_dir = kwargs.get(task.REVERT_RESULT)
 83        if temp_dir:
 84            shutil.rmtree(temp_dir)
 85
 86
 87class CleanResources(task.Task):
 88    # A task that cleans up any workflow resources.
 89
 90    def execute(self, temp_dir):
 91        print("Removing %s" % temp_dir)
 92        shutil.rmtree(temp_dir)
 93
 94
 95def review_iter():
 96    """Makes reviews (never-ending iterator/generator)."""
 97    review_id_gen = itertools.count(0)
 98    while True:
 99        review_id = next(review_id_gen)
100        review = {
101            'id': review_id,
102        }
103        yield review
104
105
106# The reason this is at the module namespace level is important, since it must
107# be accessible from a conductor dispatching an engine, if it was a lambda
108# function for example, it would not be reimportable and the conductor would
109# be unable to reference it when creating the workflow to run.
110def create_review_workflow():
111    """Factory method used to create a review workflow to run."""
112    f = linear_flow.Flow("tester")
113    f.add(
114        MakeTempDir(name="maker"),
115        RunReview(name="runner"),
116        CleanResources(name="cleaner")
117    )
118    return f
119
120
121def generate_reviewer(client, saver, name=NAME):
122    """Creates a review producer thread with the given name prefix."""
123    real_name = "%s_reviewer" % name
124    no_more = threading.Event()
125    jb = boards.fetch(real_name, JOBBOARD_CONF,
126                      client=client, persistence=saver)
127
128    def make_save_book(saver, review_id):
129        # Record what we want to happen (sometime in the future).
130        book = models.LogBook("book_%s" % review_id)
131        detail = models.FlowDetail("flow_%s" % review_id,
132                                   uuidutils.generate_uuid())
133        book.add(detail)
134        # Associate the factory method we want to be called (in the future)
135        # with the book, so that the conductor will be able to call into
136        # that factory to retrieve the workflow objects that represent the
137        # work.
138        #
139        # These args and kwargs *can* be used to save any specific parameters
140        # into the factory when it is being called to create the workflow
141        # objects (typically used to tell a factory how to create a unique
142        # workflow that represents this review).
143        factory_args = ()
144        factory_kwargs = {}
145        engines.save_factory_details(detail, create_review_workflow,
146                                     factory_args, factory_kwargs)
147        with contextlib.closing(saver.get_connection()) as conn:
148            conn.save_logbook(book)
149            return book
150
151    def run():
152        """Periodically publishes 'fake' reviews to analyze."""
153        jb.connect()
154        review_generator = review_iter()
155        with contextlib.closing(jb):
156            while not no_more.is_set():
157                review = next(review_generator)
158                details = {
159                    'store': {
160                        'review': review,
161                    },
162                }
163                job_name = "{}_{}".format(real_name, review['id'])
164                print("Posting review '%s'" % review['id'])
165                jb.post(job_name,
166                        book=make_save_book(saver, review['id']),
167                        details=details)
168                time.sleep(REVIEW_CREATION_DELAY)
169
170    # Return the unstarted thread, and a callback that can be used
171    # shutdown that thread (to avoid running forever).
172    return (threading_utils.daemon_thread(target=run), no_more.set)
173
174
175def generate_conductor(client, saver, name=NAME):
176    """Creates a conductor thread with the given name prefix."""
177    real_name = "%s_conductor" % name
178    jb = boards.fetch(name, JOBBOARD_CONF,
179                      client=client, persistence=saver)
180    conductor = conductors.fetch("blocking", real_name, jb,
181                                 engine='parallel', wait_timeout=SCAN_DELAY)
182
183    def run():
184        jb.connect()
185        with contextlib.closing(jb):
186            conductor.run()
187
188    # Return the unstarted thread, and a callback that can be used
189    # shutdown that thread (to avoid running forever).
190    return (threading_utils.daemon_thread(target=run), conductor.stop)
191
192
193def main():
194    # Need to share the same backend, so that data can be shared...
195    persistence_conf = {
196        'connection': 'memory',
197    }
198    saver = persistence.fetch(persistence_conf)
199    with contextlib.closing(saver.get_connection()) as conn:
200        # This ensures that the needed backend setup/data directories/schema
201        # upgrades and so on... exist before they are attempted to be used...
202        conn.upgrade()
203    fc1 = fake_client.FakeClient()
204    # Done like this to share the same client storage location so the correct
205    # zookeeper features work across clients...
206    fc2 = fake_client.FakeClient(storage=fc1.storage)
207    entities = [
208        generate_reviewer(fc1, saver),
209        generate_conductor(fc2, saver),
210    ]
211    for t, stopper in entities:
212        t.start()
213    try:
214        watch = timeutils.StopWatch(duration=RUN_TIME)
215        watch.start()
216        while not watch.expired():
217            time.sleep(0.1)
218    finally:
219        for t, stopper in reversed(entities):
220            stopper()
221            t.join()
222
223
224if __name__ == '__main__':
225    main()

Conductor running 99 bottles of beer song requests

Note

Full source located at 99_bottles

  1import functools
  2import logging
  3import os
  4import sys
  5import time
  6import traceback
  7
  8from kazoo import client
  9
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14
 15from taskflow.conductors import backends as conductor_backends
 16from taskflow import engines
 17from taskflow.jobs import backends as job_backends
 18from taskflow import logging as taskflow_logging
 19from taskflow.patterns import linear_flow as lf
 20from taskflow.persistence import backends as persistence_backends
 21from taskflow.persistence import models
 22from taskflow import task
 23
 24from oslo_utils import timeutils
 25from oslo_utils import uuidutils
 26
 27# Instructions!
 28#
 29# 1. Install zookeeper (or change host listed below)
 30# 2. Download this example, place in file '99_bottles.py'
 31# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
 32# 4. Run `python 99_bottles.py c` a few times (in different shells)
 33# 5. On demand kill previously listed processes created in (4) and watch
 34#    the work resume on another process (and repeat)
 35# 6. Keep enough workers alive to eventually finish the song (if desired).
 36
 37ME = os.getpid()
 38ZK_HOST = "localhost:2181"
 39JB_CONF = {
 40    'hosts': ZK_HOST,
 41    'board': 'zookeeper',
 42    'path': '/taskflow/99-bottles-demo',
 43}
 44PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
 45TAKE_DOWN_DELAY = 1.0
 46PASS_AROUND_DELAY = 3.0
 47HOW_MANY_BOTTLES = 99
 48
 49
 50class TakeABottleDown(task.Task):
 51    def execute(self, bottles_left):
 52        sys.stdout.write('Take one down, ')
 53        sys.stdout.flush()
 54        time.sleep(TAKE_DOWN_DELAY)
 55        return bottles_left - 1
 56
 57
 58class PassItAround(task.Task):
 59    def execute(self):
 60        sys.stdout.write('pass it around, ')
 61        sys.stdout.flush()
 62        time.sleep(PASS_AROUND_DELAY)
 63
 64
 65class Conclusion(task.Task):
 66    def execute(self, bottles_left):
 67        sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
 68        sys.stdout.flush()
 69
 70
 71def make_bottles(count):
 72    # This is the function that will be called to generate the workflow
 73    # and will also be called to regenerate it on resumption so that work
 74    # can continue from where it last left off...
 75
 76    s = lf.Flow("bottle-song")
 77
 78    take_bottle = TakeABottleDown("take-bottle-%s" % count,
 79                                  inject={'bottles_left': count},
 80                                  provides='bottles_left')
 81    pass_it = PassItAround("pass-%s-around" % count)
 82    next_bottles = Conclusion("next-bottles-%s" % (count - 1))
 83    s.add(take_bottle, pass_it, next_bottles)
 84
 85    for bottle in reversed(list(range(1, count))):
 86        take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
 87                                      provides='bottles_left')
 88        pass_it = PassItAround("pass-%s-around" % bottle)
 89        next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
 90        s.add(take_bottle, pass_it, next_bottles)
 91
 92    return s
 93
 94
 95def run_conductor(only_run_once=False):
 96    # This continuously runs consumers until its stopped via ctrl-c or other
 97    # kill signal...
 98    event_watches = {}
 99
100    # This will be triggered by the conductor doing various activities
101    # with engines, and is quite nice to be able to see the various timing
102    # segments (which is useful for debugging, or watching, or figuring out
103    # where to optimize).
104    def on_conductor_event(cond, event, details):
105        print("Event '%s' has been received..." % event)
106        print("Details = %s" % details)
107        if event.endswith("_start"):
108            w = timeutils.StopWatch()
109            w.start()
110            base_event = event[0:-len("_start")]
111            event_watches[base_event] = w
112        if event.endswith("_end"):
113            base_event = event[0:-len("_end")]
114            try:
115                w = event_watches.pop(base_event)
116                w.stop()
117                print("It took %0.3f seconds for event '%s' to finish"
118                      % (w.elapsed(), base_event))
119            except KeyError:
120                pass
121        if event == 'running_end' and only_run_once:
122            cond.stop()
123
124    print("Starting conductor with pid: %s" % ME)
125    my_name = "conductor-%s" % ME
126    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
127    with contextlib.closing(persist_backend):
128        with contextlib.closing(persist_backend.get_connection()) as conn:
129            conn.upgrade()
130        job_backend = job_backends.fetch(my_name, JB_CONF,
131                                         persistence=persist_backend)
132        job_backend.connect()
133        with contextlib.closing(job_backend):
134            cond = conductor_backends.fetch('blocking', my_name, job_backend,
135                                            persistence=persist_backend)
136            on_conductor_event = functools.partial(on_conductor_event, cond)
137            cond.notifier.register(cond.notifier.ANY, on_conductor_event)
138            # Run forever, and kill -9 or ctrl-c me...
139            try:
140                cond.run()
141            finally:
142                cond.stop()
143                cond.wait()
144
145
146def run_poster():
147    # This just posts a single job and then ends...
148    print("Starting poster with pid: %s" % ME)
149    my_name = "poster-%s" % ME
150    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
151    with contextlib.closing(persist_backend):
152        with contextlib.closing(persist_backend.get_connection()) as conn:
153            conn.upgrade()
154        job_backend = job_backends.fetch(my_name, JB_CONF,
155                                         persistence=persist_backend)
156        job_backend.connect()
157        with contextlib.closing(job_backend):
158            # Create information in the persistence backend about the
159            # unit of work we want to complete and the factory that
160            # can be called to create the tasks that the work unit needs
161            # to be done.
162            lb = models.LogBook("post-from-%s" % my_name)
163            fd = models.FlowDetail("song-from-%s" % my_name,
164                                   uuidutils.generate_uuid())
165            lb.add(fd)
166            with contextlib.closing(persist_backend.get_connection()) as conn:
167                conn.save_logbook(lb)
168            engines.save_factory_details(fd, make_bottles,
169                                         [HOW_MANY_BOTTLES], {},
170                                         backend=persist_backend)
171            # Post, and be done with it!
172            jb = job_backend.post("song-from-%s" % my_name, book=lb)
173            print("Posted: %s" % jb)
174            print("Goodbye...")
175
176
177def main_local():
178    # Run locally typically this is activating during unit testing when all
179    # the examples are made sure to still function correctly...
180    global TAKE_DOWN_DELAY
181    global PASS_AROUND_DELAY
182    global JB_CONF
183    # Make everything go much faster (so that this finishes quickly).
184    PASS_AROUND_DELAY = 0.01
185    TAKE_DOWN_DELAY = 0.01
186    JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
187    run_poster()
188    run_conductor(only_run_once=True)
189
190
191def check_for_zookeeper(timeout=1):
192    sys.stderr.write("Testing for the existence of a zookeeper server...\n")
193    sys.stderr.write("Please wait....\n")
194    with contextlib.closing(client.KazooClient()) as test_client:
195        try:
196            test_client.start(timeout=timeout)
197        except test_client.handler.timeout_exception:
198            sys.stderr.write("Zookeeper is needed for running this example!\n")
199            traceback.print_exc()
200            return False
201        else:
202            test_client.stop()
203            return True
204
205
206def main():
207    if not check_for_zookeeper():
208        return
209    if len(sys.argv) == 1:
210        main_local()
211    elif sys.argv[1] in ('p', 'c'):
212        if sys.argv[-1] == "v":
213            logging.basicConfig(level=taskflow_logging.TRACE)
214        else:
215            logging.basicConfig(level=logging.ERROR)
216        if sys.argv[1] == 'p':
217            run_poster()
218        else:
219            run_conductor()
220    else:
221        sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
222
223
224if __name__ == '__main__':
225    main()