Examples¶
While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):
To explore more of these examples please check out the examples directory in the TaskFlow source tree.
Note
If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.
Hello world¶
Note
Full source located at hello_world.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16
17# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
18# an overly simplistic workflow can be created that runs using different
19# engines using different styles of execution (all can be used to run in
20# parallel if a workflow is provided that is parallelizable).
21
22class PrinterTask(task.Task):
23 def __init__(self, name, show_name=True, inject=None):
24 super().__init__(name, inject=inject)
25 self._show_name = show_name
26
27 def execute(self, output):
28 if self._show_name:
29 print("{}: {}".format(self.name, output))
30 else:
31 print(output)
32
33
34# This will be the work that we want done, which for this example is just to
35# print 'hello world' (like a song) using different tasks and different
36# execution models.
37song = lf.Flow("beats")
38
39# Unordered flows when ran can be ran in parallel; and a chorus is everyone
40# singing at once of course!
41hi_chorus = uf.Flow('hello')
42world_chorus = uf.Flow('world')
43for (name, hello, world) in [('bob', 'hello', 'world'),
44 ('joe', 'hellooo', 'worllllld'),
45 ('sue', "helloooooo!", 'wooorllld!')]:
46 hi_chorus.add(PrinterTask("%s@hello" % name,
47 # This will show up to the execute() method of
48 # the task as the argument named 'output' (which
49 # will allow us to print the character we want).
50 inject={'output': hello}))
51 world_chorus.add(PrinterTask("%s@world" % name,
52 inject={'output': world}))
53
54# The composition starts with the conductor and then runs in sequence with
55# the chorus running in parallel, but no matter what the 'hello' chorus must
56# always run before the 'world' chorus (otherwise the world will fall apart).
57song.add(PrinterTask("conductor@begin",
58 show_name=False, inject={'output': "*ding*"}),
59 hi_chorus,
60 world_chorus,
61 PrinterTask("conductor@end",
62 show_name=False, inject={'output': "*dong*"}))
63
64# Run in parallel using eventlet green threads...
65try:
66 import eventlet as _eventlet # noqa
67except ImportError:
68 # No eventlet currently active, skip running with it...
69 pass
70else:
71 print("-- Running in parallel using eventlet --")
72 e = engines.load(song, executor='greenthreaded', engine='parallel',
73 max_workers=1)
74 e.run()
75
76
77# Run in parallel using real threads...
78print("-- Running in parallel using threads --")
79e = engines.load(song, executor='threaded', engine='parallel',
80 max_workers=1)
81e.run()
82
83
84# Run in parallel using external processes...
85print("-- Running in parallel using processes --")
86e = engines.load(song, executor='processes', engine='parallel',
87 max_workers=1)
88e.run()
89
90
91# Run serially (aka, if the workflow could have been ran in parallel, it will
92# not be when ran in this mode)...
93print("-- Running serially --")
94e = engines.load(song, engine='serial')
95e.run()
96print("-- Statistics gathered --")
97print(e.statistics)
Passing values from and to tasks¶
Note
Full source located at simple_linear_pass.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow
15from taskflow import task
16
17# INTRO: This example shows how a task (in a linear/serial workflow) can
18# produce an output that can be then consumed/used by a downstream task.
19
20
21class TaskA(task.Task):
22 default_provides = 'a'
23
24 def execute(self):
25 print("Executing '%s'" % (self.name))
26 return 'a'
27
28
29class TaskB(task.Task):
30 def execute(self, a):
31 print("Executing '%s'" % (self.name))
32 print("Got input '%s'" % (a))
33
34
35print("Constructing...")
36wf = linear_flow.Flow("pass-from-to")
37wf.add(TaskA('a'), TaskB('b'))
38
39print("Loading...")
40e = engines.load(wf)
41
42print("Compiling...")
43e.compile()
44
45print("Preparing...")
46e.prepare()
47
48print("Running...")
49e.run()
50
51print("Done...")
Using listeners¶
Note
Full source located at echo_listener.
1import os
2import sys
3
4logging.basicConfig(level=logging.DEBUG)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.listeners import logging as logging_listener
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16# INTRO: This example walks through a miniature workflow which will do a
17# simple echo operation; during this execution a listener is associated with
18# the engine to receive all notifications about what the flow has performed,
19# this example dumps that output to the stdout for viewing (at debug level
20# to show all the information which is possible).
21
22
23class Echo(task.Task):
24 def execute(self):
25 print(self.name)
26
27
28# Generate the work to be done (but don't do it yet).
29wf = lf.Flow('abc')
30wf.add(Echo('a'))
31wf.add(Echo('b'))
32wf.add(Echo('c'))
33
34# This will associate the listener with the engine (the listener
35# will automatically register for notifications with the engine and deregister
36# when the context is exited).
37e = engines.load(wf)
38with logging_listener.DynamicLoggingListener(e):
39 e.run()
Using listeners (to watch a phone call)¶
Note
Full source located at simple_linear_listening.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14from taskflow.types import notifier
15
16ANY = notifier.Notifier.ANY
17
18# INTRO: In this example we create two tasks (this time as functions instead
19# of task subclasses as in the simple_linear.py example), each of which ~calls~
20# a given ~phone~ number (provided as a function input) in a linear fashion
21# (one after the other).
22#
23# For a workflow which is serial this shows an extremely simple way
24# of structuring your tasks (the code that does the work) into a linear
25# sequence (the flow) and then passing the work off to an engine, with some
26# initial data to be ran in a reliable manner.
27#
28# This example shows a basic usage of the taskflow structures without involving
29# the complexity of persistence. Using the structures that taskflow provides
30# via tasks and flows makes it possible for you to easily at a later time
31# hook in a persistence layer (and then gain the functionality that offers)
32# when you decide the complexity of adding that layer in is 'worth it' for your
33# applications usage pattern (which some applications may not need).
34#
35# It **also** adds on to the simple_linear.py example by adding a set of
36# callback functions which the engine will call when a flow state transition
37# or task state transition occurs. These types of functions are useful for
38# updating task or flow progress, or for debugging, sending notifications to
39# external systems, or for other yet unknown future usage that you may create!
40
41
42def call_jim(context):
43 print("Calling jim.")
44 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
45
46
47def call_joe(context):
48 print("Calling joe.")
49 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
50
51
52def flow_watch(state, details):
53 print('Flow => %s' % state)
54
55
56def task_watch(state, details):
57 print('Task {} => {}'.format(details.get('task_name'), state))
58
59
60# Wrap your functions into a task type that knows how to treat your functions
61# as tasks. There was previous work done to just allow a function to be
62# directly passed, but in python 3.0 there is no easy way to capture an
63# instance method, so this wrapping approach was decided upon instead which
64# can attach to instance methods (if that's desired).
65flow = lf.Flow("Call-them")
66flow.add(task.FunctorTask(execute=call_jim))
67flow.add(task.FunctorTask(execute=call_joe))
68
69# Now load (but do not run) the flow using the provided initial data.
70engine = taskflow.engines.load(flow, store={
71 'context': {
72 "joe_number": 444,
73 "jim_number": 555,
74 }
75})
76
77# This is where we attach our callback functions to the 2 different
78# notification objects that an engine exposes. The usage of a ANY (kleene star)
79# here means that we want to be notified on all state changes, if you want to
80# restrict to a specific state change, just register that instead.
81engine.notifier.register(ANY, flow_watch)
82engine.atom_notifier.register(ANY, task_watch)
83
84# And now run!
85engine.run()
Dumping a in-memory backend¶
Note
Full source located at dump_memory_backend.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: in this example we create a dummy flow with a dummy task, and run
18# it using a in-memory backend and pre/post run we dump out the contents
19# of the in-memory backends tree structure (which can be quite useful to
20# look at for debugging or other analysis).
21
22
23class PrintTask(task.Task):
24 def execute(self):
25 print("Running '%s'" % self.name)
26
27# Make a little flow and run it...
28f = lf.Flow('root')
29for alpha in ['a', 'b', 'c']:
30 f.add(PrintTask(alpha))
31
32e = engines.load(f)
33e.compile()
34e.prepare()
35
36# After prepare the storage layer + backend can now be accessed safely...
37backend = e.storage.backend
38
39print("----------")
40print("Before run")
41print("----------")
42print(backend.memory.pformat())
43print("----------")
44
45e.run()
46
47print("---------")
48print("After run")
49print("---------")
50for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
51 value = backend.memory[path]
52 if value:
53 print("{} -> {}".format(path, value))
54 else:
55 print("%s" % (path))
Making phone calls¶
Note
Full source located at simple_linear.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create two tasks, each of which ~calls~ a given
16# ~phone~ number (provided as a function input) in a linear fashion (one after
17# the other). For a workflow which is serial this shows a extremely simple way
18# of structuring your tasks (the code that does the work) into a linear
19# sequence (the flow) and then passing the work off to an engine, with some
20# initial data to be ran in a reliable manner.
21#
22# NOTE(harlowja): This example shows a basic usage of the taskflow structures
23# without involving the complexity of persistence. Using the structures that
24# taskflow provides via tasks and flows makes it possible for you to easily at
25# a later time hook in a persistence layer (and then gain the functionality
26# that offers) when you decide the complexity of adding that layer in
27# is 'worth it' for your application's usage pattern (which certain
28# applications may not need).
29
30
31class CallJim(task.Task):
32 def execute(self, jim_number, *args, **kwargs):
33 print("Calling jim %s." % jim_number)
34
35
36class CallJoe(task.Task):
37 def execute(self, joe_number, *args, **kwargs):
38 print("Calling joe %s." % joe_number)
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('simple-linear').add(
43 CallJim(),
44 CallJoe()
45)
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store=dict(joe_number=444,
49 jim_number=555))
Making phone calls (automatically reverting)¶
Note
Full source located at reverting_linear.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15# INTRO: In this example we create three tasks, each of which ~calls~ a given
16# number (provided as a function input), one of those tasks *fails* calling a
17# given number (the suzzie calling); this causes the workflow to enter the
18# reverting process, which activates the revert methods of the previous two
19# phone ~calls~.
20#
21# This simulated calling makes it appear like all three calls occur or all
22# three don't occur (transaction-like capabilities). No persistence layer is
23# used here so reverting and executing will *not* be tolerant of process
24# failure.
25
26
27class CallJim(task.Task):
28 def execute(self, jim_number, *args, **kwargs):
29 print("Calling jim %s." % jim_number)
30
31 def revert(self, jim_number, *args, **kwargs):
32 print("Calling %s and apologizing." % jim_number)
33
34
35class CallJoe(task.Task):
36 def execute(self, joe_number, *args, **kwargs):
37 print("Calling joe %s." % joe_number)
38
39 def revert(self, joe_number, *args, **kwargs):
40 print("Calling %s and apologizing." % joe_number)
41
42
43class CallSuzzie(task.Task):
44 def execute(self, suzzie_number, *args, **kwargs):
45 raise OSError("Suzzie not home right now.")
46
47
48# Create your flow and associated tasks (the work to be done).
49flow = lf.Flow('simple-linear').add(
50 CallJim(),
51 CallJoe(),
52 CallSuzzie()
53)
54
55try:
56 # Now run that flow using the provided initial data (store below).
57 taskflow.engines.run(flow, store=dict(joe_number=444,
58 jim_number=555,
59 suzzie_number=666))
60except Exception as e:
61 # NOTE(harlowja): This exception will be the exception that came out of the
62 # 'CallSuzzie' task instead of a different exception, this is useful since
63 # typically surrounding code wants to handle the original exception and not
64 # a wrapped or altered one.
65 #
66 # *WARNING* If this flow was multi-threaded and multiple active tasks threw
67 # exceptions then the above exception would be wrapped into a combined
68 # exception (the object has methods to iterate over the contained
69 # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
70 # how to deal with multiple tasks failing while running.
71 #
72 # You will also note that this is not a problem in this case since no
73 # parallelism is involved; this is ensured by the usage of a linear flow
74 # and the default engine type which is 'serial' vs being 'parallel'.
75 print("Flow failed: %s" % e)
Building a car¶
Note
Full source located at build_a_car.
1import os
2import sys
3
4
5logging.basicConfig(level=logging.ERROR)
6
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11
12
13import taskflow.engines
14from taskflow.patterns import graph_flow as gf
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17from taskflow.types import notifier
18
19ANY = notifier.Notifier.ANY
20
21import example_utils as eu # noqa
22
23
24# INTRO: This example shows how a graph flow and linear flow can be used
25# together to execute dependent & non-dependent tasks by going through the
26# steps required to build a simplistic car (an assembly line if you will). It
27# also shows how raw functions can be wrapped into a task object instead of
28# being forced to use the more *heavy* task base class. This is useful in
29# scenarios where pre-existing code has functions that you easily want to
30# plug-in to taskflow, without requiring a large amount of code changes.
31
32
33def build_frame():
34 return 'steel'
35
36
37def build_engine():
38 return 'honda'
39
40
41def build_doors():
42 return '2'
43
44
45def build_wheels():
46 return '4'
47
48
49# These just return true to indiciate success, they would in the real work
50# do more than just that.
51
52def install_engine(frame, engine):
53 return True
54
55
56def install_doors(frame, windows_installed, doors):
57 return True
58
59
60def install_windows(frame, doors):
61 return True
62
63
64def install_wheels(frame, engine, engine_installed, wheels):
65 return True
66
67
68def trash(**kwargs):
69 eu.print_wrapped("Throwing away pieces of car!")
70
71
72def startup(**kwargs):
73 # If you want to see the rollback function being activated try uncommenting
74 # the following line.
75 #
76 # raise ValueError("Car not verified")
77 return True
78
79
80def verify(spec, **kwargs):
81 # If the car is not what we ordered throw away the car (trigger reversion).
82 for key, value in kwargs.items():
83 if spec[key] != value:
84 raise Exception("Car doesn't match spec!")
85 return True
86
87
88# These two functions connect into the state transition notification emission
89# points that the engine outputs, they can be used to log state transitions
90# that are occurring, or they can be used to suspend the engine (or perform
91# other useful activities).
92def flow_watch(state, details):
93 print('Flow => %s' % state)
94
95
96def task_watch(state, details):
97 print('Task {} => {}'.format(details.get('task_name'), state))
98
99
100flow = lf.Flow("make-auto").add(
101 task.FunctorTask(startup, revert=trash, provides='ran'),
102 # A graph flow allows automatic dependency based ordering, the ordering
103 # is determined by analyzing the symbols required and provided and ordering
104 # execution based on a functioning order (if one exists).
105 gf.Flow("install-parts").add(
106 task.FunctorTask(build_frame, provides='frame'),
107 task.FunctorTask(build_engine, provides='engine'),
108 task.FunctorTask(build_doors, provides='doors'),
109 task.FunctorTask(build_wheels, provides='wheels'),
110 # These *_installed outputs allow for other tasks to depend on certain
111 # actions being performed (aka the components were installed), another
112 # way to do this is to link() the tasks manually instead of creating
113 # an 'artificial' data dependency that accomplishes the same goal the
114 # manual linking would result in.
115 task.FunctorTask(install_engine, provides='engine_installed'),
116 task.FunctorTask(install_doors, provides='doors_installed'),
117 task.FunctorTask(install_windows, provides='windows_installed'),
118 task.FunctorTask(install_wheels, provides='wheels_installed')),
119 task.FunctorTask(verify, requires=['frame',
120 'engine',
121 'doors',
122 'wheels',
123 'engine_installed',
124 'doors_installed',
125 'windows_installed',
126 'wheels_installed']))
127
128# This dictionary will be provided to the tasks as a specification for what
129# the tasks should produce, in this example this specification will influence
130# what those tasks do and what output they create. Different tasks depend on
131# different information from this specification, all of which will be provided
132# automatically by the engine to those tasks.
133spec = {
134 "frame": 'steel',
135 "engine": 'honda',
136 "doors": '2',
137 "wheels": '4',
138 # These are used to compare the result product, a car without the pieces
139 # installed is not a car after all.
140 "engine_installed": True,
141 "doors_installed": True,
142 "windows_installed": True,
143 "wheels_installed": True,
144}
145
146
147engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
148
149# This registers all (ANY) state transitions to trigger a call to the
150# flow_watch function for flow state transitions, and registers the
151# same all (ANY) state transitions for task state transitions.
152engine.notifier.register(ANY, flow_watch)
153engine.atom_notifier.register(ANY, task_watch)
154
155eu.print_wrapped("Building a car")
156engine.run()
157
158# Alter the specification and ensure that the reverting logic gets triggered
159# since the resultant car that will be built by the build_wheels function will
160# build a car with 4 doors only (not 5), this will cause the verification
161# task to mark the car that is produced as not matching the desired spec.
162spec['doors'] = 5
163
164engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
165engine.notifier.register(ANY, flow_watch)
166engine.atom_notifier.register(ANY, task_watch)
167
168eu.print_wrapped("Building a wrong car that doesn't match specification")
169try:
170 engine.run()
171except Exception as e:
172 eu.print_wrapped("Flow failed: %s" % e)
Iterating over the alphabet (using processes)¶
Note
Full source located at alphabet_soup.
1import functools
2import logging
3import os
4import string
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from taskflow import engines
18from taskflow import exceptions
19from taskflow.patterns import linear_flow
20from taskflow import task
21
22
23# In this example we show how a simple linear set of tasks can be executed
24# using local processes (and not threads or remote workers) with minimal (if
25# any) modification to those tasks to make them safe to run in this mode.
26#
27# This is useful since it allows further scaling up your workflows when thread
28# execution starts to become a bottleneck (which it can start to be due to the
29# GIL in python). It also offers a intermediary scalable runner that can be
30# used when the scale and/or setup of remote workers is not desirable.
31
32
33def progress_printer(task, event_type, details):
34 # This callback, attached to each task will be called in the local
35 # process (not the child processes)...
36 progress = details.pop('progress')
37 progress = int(progress * 100.0)
38 print("Task '%s' reached %d%% completion" % (task.name, progress))
39
40
41class AlphabetTask(task.Task):
42 # Second delay between each progress part.
43 _DELAY = 0.1
44
45 # This task will run in X main stages (each with a different progress
46 # report that will be delivered back to the running process...). The
47 # initial 0% and 100% are triggered automatically by the engine when
48 # a task is started and finished (so that's why those are not emitted
49 # here).
50 _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
51
52 def execute(self):
53 for p in self._PROGRESS_PARTS:
54 self.update_progress(p)
55 time.sleep(self._DELAY)
56
57
58print("Constructing...")
59soup = linear_flow.Flow("alphabet-soup")
60for letter in string.ascii_lowercase:
61 abc = AlphabetTask(letter)
62 abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
63 functools.partial(progress_printer, abc))
64 soup.add(abc)
65try:
66 print("Loading...")
67 e = engines.load(soup, engine='parallel', executor='processes')
68 print("Compiling...")
69 e.compile()
70 print("Preparing...")
71 e.prepare()
72 print("Running...")
73 e.run()
74 print("Done: %s" % e.statistics)
75except exceptions.NotImplementedError as e:
76 print(e)
Watching execution timing¶
Note
Full source located at timing_listener.
1import os
2import random
3import sys
4import time
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import timing
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: in this example we will attach a listener to an engine
19# and have variable run time tasks run and show how the listener will print
20# out how long those tasks took (when they started and when they finished).
21#
22# This shows how timing metrics can be gathered (or attached onto an engine)
23# after a workflow has been constructed, making it easy to gather metrics
24# dynamically for situations where this kind of information is applicable (or
25# even adding this information on at a later point in the future when your
26# application starts to slow down).
27
28
29class VariableTask(task.Task):
30 def __init__(self, name):
31 super().__init__(name)
32 self._sleepy_time = random.random()
33
34 def execute(self):
35 time.sleep(self._sleepy_time)
36
37
38f = lf.Flow('root')
39f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
40e = engines.load(f)
41with timing.PrintingDurationListener(e):
42 e.run()
Distance calculator¶
Note
Full source located at distance_calculator
1import math
2import os
3import sys
4
5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
6 os.pardir,
7 os.pardir))
8sys.path.insert(0, top_dir)
9
10from taskflow import engines
11from taskflow.patterns import linear_flow
12from taskflow import task
13
14# INTRO: This shows how to use a tasks/atoms ability to take requirements from
15# its execute functions default parameters and shows how to provide those
16# via different methods when needed, to influence those parameters to in
17# this case calculate the distance between two points in 2D space.
18
19# A 2D point.
20Point = collections.namedtuple("Point", "x,y")
21
22
23def is_near(val, expected, tolerance=0.001):
24 # Floats don't really provide equality...
25 if val > (expected + tolerance):
26 return False
27 if val < (expected - tolerance):
28 return False
29 return True
30
31
32class DistanceTask(task.Task):
33 # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
34
35 default_provides = 'distance'
36
37 def execute(self, a=Point(0, 0), b=Point(0, 0)):
38 return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
39
40
41if __name__ == '__main__':
42 # For these we rely on the execute() methods points by default being
43 # at the origin (and we override it with store values when we want) at
44 # execution time (which then influences what is calculated).
45 any_distance = linear_flow.Flow("origin").add(DistanceTask())
46 results = engines.run(any_distance)
47 print(results)
48 print("{} is near-enough to {}: {}".format(
49 results['distance'], 0.0, is_near(results['distance'], 0.0)))
50
51 results = engines.run(any_distance, store={'a': Point(1, 1)})
52 print(results)
53 print("{} is near-enough to {}: {}".format(
54 results['distance'], 1.4142, is_near(results['distance'], 1.4142)))
55
56 results = engines.run(any_distance, store={'a': Point(10, 10)})
57 print(results)
58 print("{} is near-enough to {}: {}".format(
59 results['distance'], 14.14199, is_near(results['distance'], 14.14199)))
60
61 results = engines.run(any_distance,
62 store={'a': Point(5, 5), 'b': Point(10, 10)})
63 print(results)
64 print("{} is near-enough to {}: {}".format(
65 results['distance'], 7.07106, is_near(results['distance'], 7.07106)))
66
67 # For this we use the ability to override at task creation time the
68 # optional arguments so that we don't need to continue to send them
69 # in via the 'store' argument like in the above (and we fix the new
70 # starting point 'a' at (10, 10) instead of (0, 0)...
71
72 ten_distance = linear_flow.Flow("ten")
73 ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
74 results = engines.run(ten_distance, store={'b': Point(10, 10)})
75 print(results)
76 print("{} is near-enough to {}: {}".format(
77 results['distance'], 0.0, is_near(results['distance'], 0.0)))
78
79 results = engines.run(ten_distance)
80 print(results)
81 print("{} is near-enough to {}: {}".format(
82 results['distance'], 14.14199, is_near(results['distance'], 14.14199)))
Table multiplier (in parallel)¶
Note
Full source located at parallel_table_multiply
1import logging
2import os
3import random
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import futurist
14
15from taskflow import engines
16from taskflow.patterns import unordered_flow as uf
17from taskflow import task
18
19# INTRO: This example walks through a miniature workflow which does a parallel
20# table modification where each row in the table gets adjusted by a thread, or
21# green thread (if eventlet is available) in parallel and then the result
22# is reformed into a new table and some verifications are performed on it
23# to ensure everything went as expected.
24
25
26MULTIPLER = 10
27
28
29class RowMultiplier(task.Task):
30 """Performs a modification of an input row, creating a output row."""
31
32 def __init__(self, name, index, row, multiplier):
33 super().__init__(name=name)
34 self.index = index
35 self.multiplier = multiplier
36 self.row = row
37
38 def execute(self):
39 return [r * self.multiplier for r in self.row]
40
41
42def make_flow(table):
43 # This creation will allow for parallel computation (since the flow here
44 # is specifically unordered; and when things are unordered they have
45 # no dependencies and when things have no dependencies they can just be
46 # ran at the same time, limited in concurrency by the executor or max
47 # workers of that executor...)
48 f = uf.Flow("root")
49 for i, row in enumerate(table):
50 f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
51 # NOTE(harlowja): at this point nothing has ran, the above is just
52 # defining what should be done (but not actually doing it) and associating
53 # an ordering dependencies that should be enforced (the flow pattern used
54 # forces this), the engine in the later main() function will actually
55 # perform this work...
56 return f
57
58
59def main():
60 if len(sys.argv) == 2:
61 tbl = []
62 with open(sys.argv[1], 'rb') as fh:
63 reader = csv.reader(fh)
64 for row in reader:
65 tbl.append([float(r) if r else 0.0 for r in row])
66 else:
67 # Make some random table out of thin air...
68 tbl = []
69 cols = random.randint(1, 100)
70 rows = random.randint(1, 100)
71 for _i in range(0, rows):
72 row = []
73 for _j in range(0, cols):
74 row.append(random.random())
75 tbl.append(row)
76
77 # Generate the work to be done.
78 f = make_flow(tbl)
79
80 # Now run it (using the specified executor)...
81 try:
82 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
83 except RuntimeError:
84 # No eventlet currently active, use real threads instead.
85 executor = futurist.ThreadPoolExecutor(max_workers=5)
86 try:
87 e = engines.load(f, engine='parallel', executor=executor)
88 for st in e.run_iter():
89 print(st)
90 finally:
91 executor.shutdown()
92
93 # Find the old rows and put them into place...
94 #
95 # TODO(harlowja): probably easier just to sort instead of search...
96 computed_tbl = []
97 for i in range(0, len(tbl)):
98 for t in f:
99 if t.index == i:
100 computed_tbl.append(e.storage.get(t.name))
101
102 # Do some basic validation (which causes the return code of this process
103 # to be different if things were not as expected...)
104 if len(computed_tbl) != len(tbl):
105 return 1
106 else:
107 return 0
108
109
110if __name__ == "__main__":
111 sys.exit(main())
Linear equation solver (explicit dependencies)¶
Note
Full source located at calculate_linear.
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import task
14
15
16# INTRO: In this example a linear flow is used to group four tasks to calculate
17# a value. A single added task is used twice, showing how this can be done
18# and the twice added task takes in different bound values. In the first case
19# it uses default parameters ('x' and 'y') and in the second case arguments
20# are bound with ('z', 'd') keys from the engines internal storage mechanism.
21#
22# A multiplier task uses a binding that another task also provides, but this
23# example explicitly shows that 'z' parameter is bound with 'a' key
24# This shows that if a task depends on a key named the same as a key provided
25# from another task the name can be remapped to take the desired key from a
26# different origin.
27
28
29# This task provides some values from as a result of execution, this can be
30# useful when you want to provide values from a static set to other tasks that
31# depend on those values existing before those tasks can run.
32#
33# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
34# that just provides those values on engine running by prepopulating the
35# storage backend before your tasks are ran (which accomplishes a similar goal
36# in a more uniform manner).
37class Provider(task.Task):
38
39 def __init__(self, name, *args, **kwargs):
40 super().__init__(name=name, **kwargs)
41 self._provide = args
42
43 def execute(self):
44 return self._provide
45
46
47# This task adds two input variables and returns the result.
48#
49# Note that since this task does not have a revert() function (since addition
50# is a stateless operation) there are no side-effects that this function needs
51# to undo if some later operation fails.
52class Adder(task.Task):
53 def execute(self, x, y):
54 return x + y
55
56
57# This task multiplies an input variable by a multiplier and returns the
58# result.
59#
60# Note that since this task does not have a revert() function (since
61# multiplication is a stateless operation) and there are no side-effects that
62# this function needs to undo if some later operation fails.
63class Multiplier(task.Task):
64 def __init__(self, name, multiplier, provides=None, rebind=None):
65 super().__init__(name=name, provides=provides,
66 rebind=rebind)
67 self._multiplier = multiplier
68
69 def execute(self, z):
70 return z * self._multiplier
71
72
73# Note here that the ordering is established so that the correct sequences
74# of operations occurs where the adding and multiplying is done according
75# to the expected and typical mathematical model. A graph flow could also be
76# used here to automatically infer & ensure the correct ordering.
77flow = lf.Flow('root').add(
78 # Provide the initial values for other tasks to depend on.
79 #
80 # x = 2, y = 3, d = 5
81 Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
82 # z = x+y = 5
83 Adder("add-1", provides='z'),
84 # a = z+d = 10
85 Adder("add-2", provides='a', rebind=['z', 'd']),
86 # Calculate 'r = a*3 = 30'
87 #
88 # Note here that the 'z' argument of the execute() function will not be
89 # bound to the 'z' variable provided from the above 'provider' object but
90 # instead the 'z' argument will be taken from the 'a' variable provided
91 # by the second add-2 listed above.
92 Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
93)
94
95# The result here will be all results (from all tasks) which is stored in an
96# in-memory storage location that backs this engine since it is not configured
97# with persistence storage.
98results = taskflow.engines.run(flow)
99print(results)
Linear equation solver (inferred dependencies)¶
Source:
graph_flow.py
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import graph_flow as gf
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15
16
17# In this example there are complex *inferred* dependencies between tasks that
18# are used to perform a simple set of linear equations.
19#
20# As you will see below the tasks just define what they require as input
21# and produce as output (named values). Then the user doesn't care about
22# ordering the tasks (in this case the tasks calculate pieces of the overall
23# equation).
24#
25# As you will notice a graph flow resolves dependencies automatically using the
26# tasks symbol requirements and provided symbol values and no orderin
27# dependency has to be manually created.
28#
29# Also notice that flows of any types can be nested into a graph flow; showing
30# that subflow dependencies (and associated ordering) will be inferred too.
31
32
33class Adder(task.Task):
34
35 def execute(self, x, y):
36 return x + y
37
38
39flow = gf.Flow('root').add(
40 lf.Flow('nested_linear').add(
41 # x2 = y3+y4 = 12
42 Adder("add2", provides='x2', rebind=['y3', 'y4']),
43 # x1 = y1+y2 = 4
44 Adder("add1", provides='x1', rebind=['y1', 'y2'])
45 ),
46 # x5 = x1+x3 = 20
47 Adder("add5", provides='x5', rebind=['x1', 'x3']),
48 # x3 = x1+x2 = 16
49 Adder("add3", provides='x3', rebind=['x1', 'x2']),
50 # x4 = x2+y5 = 21
51 Adder("add4", provides='x4', rebind=['x2', 'y5']),
52 # x6 = x5+x4 = 41
53 Adder("add6", provides='x6', rebind=['x5', 'x4']),
54 # x7 = x6+x6 = 82
55 Adder("add7", provides='x7', rebind=['x6', 'x6']))
56
57# Provide the initial variable inputs using a storage dictionary.
58store = {
59 "y1": 1,
60 "y2": 3,
61 "y3": 5,
62 "y4": 7,
63 "y5": 9,
64}
65
66# This is the expected values that should be created.
67unexpected = 0
68expected = [
69 ('x1', 4),
70 ('x2', 12),
71 ('x3', 16),
72 ('x4', 21),
73 ('x5', 20),
74 ('x6', 41),
75 ('x7', 82),
76]
77
78result = taskflow.engines.run(
79 flow, engine='serial', store=store)
80
81print("Single threaded engine result %s" % result)
82for (name, value) in expected:
83 actual = result.get(name)
84 if actual != value:
85 sys.stderr.write("{} != {}\n".format(actual, value))
86 unexpected += 1
87
88result = taskflow.engines.run(
89 flow, engine='parallel', store=store)
90
91print("Multi threaded engine result %s" % result)
92for (name, value) in expected:
93 actual = result.get(name)
94 if actual != value:
95 sys.stderr.write("{} != {}\n".format(actual, value))
96 unexpected += 1
97
98if unexpected:
99 sys.exit(1)
Linear equation solver (in parallel)¶
Note
Full source located at calculate_in_parallel
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow.patterns import unordered_flow as uf
14from taskflow import task
15
16# INTRO: These examples show how a linear flow and an unordered flow can be
17# used together to execute calculations in parallel and then use the
18# result for the next task/s. The adder task is used for all calculations
19# and argument bindings are used to set correct parameters for each task.
20
21
22# This task provides some values from as a result of execution, this can be
23# useful when you want to provide values from a static set to other tasks that
24# depend on those values existing before those tasks can run.
25#
26# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
27# that provides those values on engine running by prepopulating the storage
28# backend before your tasks are ran (which accomplishes a similar goal in a
29# more uniform manner).
30class Provider(task.Task):
31 def __init__(self, name, *args, **kwargs):
32 super().__init__(name=name, **kwargs)
33 self._provide = args
34
35 def execute(self):
36 return self._provide
37
38
39# This task adds two input variables and returns the result of that addition.
40#
41# Note that since this task does not have a revert() function (since addition
42# is a stateless operation) there are no side-effects that this function needs
43# to undo if some later operation fails.
44class Adder(task.Task):
45 def execute(self, x, y):
46 return x + y
47
48
49flow = lf.Flow('root').add(
50 # Provide the initial values for other tasks to depend on.
51 #
52 # x1 = 2, y1 = 3, x2 = 5, x3 = 8
53 Provider("provide-adder", 2, 3, 5, 8,
54 provides=('x1', 'y1', 'x2', 'y2')),
55 # Note here that we define the flow that contains the 2 adders to be an
56 # unordered flow since the order in which these execute does not matter,
57 # another way to solve this would be to use a graph_flow pattern, which
58 # also can run in parallel (since they have no ordering dependencies).
59 uf.Flow('adders').add(
60 # Calculate 'z1 = x1+y1 = 5'
61 #
62 # Rebind here means that the execute() function x argument will be
63 # satisfied from a previous output named 'x1', and the y argument
64 # of execute() will be populated from the previous output named 'y1'
65 #
66 # The output (result of adding) will be mapped into a variable named
67 # 'z1' which can then be refereed to and depended on by other tasks.
68 Adder(name="add", provides='z1', rebind=['x1', 'y1']),
69 # z2 = x2+y2 = 13
70 Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
71 ),
72 # r = z1+z2 = 18
73 Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
74
75
76# The result here will be all results (from all tasks) which is stored in an
77# in-memory storage location that backs this engine since it is not configured
78# with persistence storage.
79result = taskflow.engines.run(flow, engine='parallel')
80print(result)
Creating a volume (in parallel)¶
Note
Full source located at create_parallel_volume
1import logging
2import os
3import random
4import sys
5import time
6
7logging.basicConfig(level=logging.ERROR)
8
9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12sys.path.insert(0, top_dir)
13
14from oslo_utils import reflection
15
16from taskflow import engines
17from taskflow.listeners import printing
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20
21# INTRO: These examples show how unordered_flow can be used to create a large
22# number of fake volumes in parallel (or serially, depending on a constant that
23# can be easily changed).
24
25
26@contextlib.contextmanager
27def show_time(name):
28 start = time.time()
29 yield
30 end = time.time()
31 print(" -- {} took {:0.3f} seconds".format(name, end - start))
32
33
34# This affects how many volumes to create and how much time to *simulate*
35# passing for that volume to be created.
36MAX_CREATE_TIME = 3
37VOLUME_COUNT = 5
38
39# This will be used to determine if all the volumes are created in parallel
40# or whether the volumes are created serially (in an undefined ordered since
41# a unordered flow is used). Note that there is a disconnection between the
42# ordering and the concept of parallelism (since unordered items can still be
43# ran in a serial ordering). A typical use-case for offering both is to allow
44# for debugging using a serial approach, while when running at a larger scale
45# one would likely want to use the parallel approach.
46#
47# If you switch this flag from serial to parallel you can see the overall
48# time difference that this causes.
49SERIAL = False
50if SERIAL:
51 engine = 'serial'
52else:
53 engine = 'parallel'
54
55
56class VolumeCreator(task.Task):
57 def __init__(self, volume_id):
58 # Note here that the volume name is composed of the name of the class
59 # along with the volume id that is being created, since a name of a
60 # task uniquely identifies that task in storage it is important that
61 # the name be relevant and identifiable if the task is recreated for
62 # subsequent resumption (if applicable).
63 #
64 # UUIDs are *not* used as they can not be tied back to a previous tasks
65 # state on resumption (since they are unique and will vary for each
66 # task that is created). A name based off the volume id that is to be
67 # created is more easily tied back to the original task so that the
68 # volume create can be resumed/revert, and is much easier to use for
69 # audit and tracking purposes.
70 base_name = reflection.get_callable_name(self)
71 super().__init__(name="{}-{}".format(base_name, volume_id))
72 self._volume_id = volume_id
73
74 def execute(self):
75 print("Making volume %s" % (self._volume_id))
76 time.sleep(random.random() * MAX_CREATE_TIME)
77 print("Finished making volume %s" % (self._volume_id))
78
79
80# Assume there is no ordering dependency between volumes.
81flow = uf.Flow("volume-maker")
82for i in range(0, VOLUME_COUNT):
83 flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
84
85
86# Show how much time the overall engine loading and running takes.
87with show_time(name=flow.name.title()):
88 eng = engines.load(flow, engine=engine)
89 # This context manager automatically adds (and automatically removes) a
90 # helpful set of state transition notification printing helper utilities
91 # that show you exactly what transitions the engine is going through
92 # while running the various volume create tasks.
93 with printing.PrintingListener(eng):
94 eng.run()
Summation mapper(s) and reducer (in parallel)¶
Note
Full source located at simple_map_reduce
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13# INTRO: These examples show a simplistic map/reduce implementation where
14# a set of mapper(s) will sum a series of input numbers (in parallel) and
15# return their individual summed result. A reducer will then use those
16# produced values and perform a final summation and this result will then be
17# printed (and verified to ensure the calculation was as expected).
18
19from taskflow import engines
20from taskflow.patterns import linear_flow
21from taskflow.patterns import unordered_flow
22from taskflow import task
23
24
25class SumMapper(task.Task):
26 def execute(self, inputs):
27 # Sums some set of provided inputs.
28 return sum(inputs)
29
30
31class TotalReducer(task.Task):
32 def execute(self, *args, **kwargs):
33 # Reduces all mapped summed outputs into a single value.
34 total = 0
35 for (k, v) in kwargs.items():
36 # If any other kwargs was passed in, we don't want to use those
37 # in the calculation of the total...
38 if k.startswith('reduction_'):
39 total += v
40 return total
41
42
43def chunk_iter(chunk_size, upperbound):
44 """Yields back chunk size pieces from zero to upperbound - 1."""
45 chunk = []
46 for i in range(0, upperbound):
47 chunk.append(i)
48 if len(chunk) == chunk_size:
49 yield chunk
50 chunk = []
51
52
53# Upper bound of numbers to sum for example purposes...
54UPPER_BOUND = 10000
55
56# How many mappers we want to have.
57SPLIT = 10
58
59# How big of a chunk we want to give each mapper.
60CHUNK_SIZE = UPPER_BOUND // SPLIT
61
62# This will be the workflow we will compose and run.
63w = linear_flow.Flow("root")
64
65# The mappers will run in parallel.
66store = {}
67provided = []
68mappers = unordered_flow.Flow('map')
69for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
70 mapper_name = 'mapper_%s' % i
71 # Give that mapper some information to compute.
72 store[mapper_name] = chunk
73 # The reducer uses all of the outputs of the mappers, so it needs
74 # to be recorded that it needs access to them (under a specific name).
75 provided.append("reduction_%s" % i)
76 mappers.add(SumMapper(name=mapper_name,
77 rebind={'inputs': mapper_name},
78 provides=provided[-1]))
79w.add(mappers)
80
81# The reducer will run last (after all the mappers).
82w.add(TotalReducer('reducer', requires=provided))
83
84# Now go!
85e = engines.load(w, engine='parallel', store=store, max_workers=4)
86print("Running a parallel engine with options: %s" % e.options)
87e.run()
88
89# Now get the result the reducer created.
90total = e.storage.get('reducer')
91print("Calculated result = %s" % total)
92
93# Calculate it manually to verify that it worked...
94calc_total = sum(range(0, UPPER_BOUND))
95if calc_total != total:
96 sys.exit(1)
Storing & emitting a bill¶
Note
Full source located at fake_billing
1import logging
2import os
3import sys
4import time
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from oslo_utils import uuidutils
14
15from taskflow import engines
16from taskflow.listeners import printing
17from taskflow.patterns import graph_flow as gf
18from taskflow.patterns import linear_flow as lf
19from taskflow import task
20from taskflow.utils import misc
21
22# INTRO: This example walks through a miniature workflow which simulates
23# the reception of an API request, creation of a database entry, driver
24# activation (which invokes a 'fake' webservice) and final completion.
25#
26# This example also shows how a function/object (in this class the url sending)
27# that occurs during driver activation can update the progress of a task
28# without being aware of the internals of how to do this by associating a
29# callback that the url sending can update as the sending progresses from 0.0%
30# complete to 100% complete.
31
32
33class DB:
34 def query(self, sql):
35 print("Querying with: %s" % (sql))
36
37
38class UrlCaller:
39 def __init__(self):
40 self._send_time = 0.5
41 self._chunks = 25
42
43 def send(self, url, data, status_cb=None):
44 sleep_time = float(self._send_time) / self._chunks
45 for i in range(0, len(data)):
46 time.sleep(sleep_time)
47 # As we send the data, each chunk we 'fake' send will progress
48 # the sending progress that much further to 100%.
49 if status_cb:
50 status_cb(float(i) / len(data))
51
52
53# Since engines save the output of tasks to a optional persistent storage
54# backend resources have to be dealt with in a slightly different manner since
55# resources are transient and can *not* be persisted (or serialized). For tasks
56# that require access to a set of resources it is a common pattern to provide
57# a object (in this case this object) on construction of those tasks via the
58# task constructor.
59class ResourceFetcher:
60 def __init__(self):
61 self._db_handle = None
62 self._url_handle = None
63
64 @property
65 def db_handle(self):
66 if self._db_handle is None:
67 self._db_handle = DB()
68 return self._db_handle
69
70 @property
71 def url_handle(self):
72 if self._url_handle is None:
73 self._url_handle = UrlCaller()
74 return self._url_handle
75
76
77class ExtractInputRequest(task.Task):
78 def __init__(self, resources):
79 super().__init__(provides="parsed_request")
80 self._resources = resources
81
82 def execute(self, request):
83 return {
84 'user': request.user,
85 'user_id': misc.as_int(request.id),
86 'request_id': uuidutils.generate_uuid(),
87 }
88
89
90class MakeDBEntry(task.Task):
91 def __init__(self, resources):
92 super().__init__()
93 self._resources = resources
94
95 def execute(self, parsed_request):
96 db_handle = self._resources.db_handle
97 db_handle.query("INSERT %s INTO mydb" % (parsed_request))
98
99 def revert(self, result, parsed_request):
100 db_handle = self._resources.db_handle
101 db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
102
103
104class ActivateDriver(task.Task):
105 def __init__(self, resources):
106 super().__init__(provides='sent_to')
107 self._resources = resources
108 self._url = "http://blahblah.com"
109
110 def execute(self, parsed_request):
111 print("Sending billing data to %s" % (self._url))
112 url_sender = self._resources.url_handle
113 # Note that here we attach our update_progress function (which is a
114 # function that the engine also 'binds' to) to the progress function
115 # that the url sending helper class uses. This allows the task progress
116 # to be tied to the url sending progress, which is very useful for
117 # downstream systems to be aware of what a task is doing at any time.
118 url_sender.send(self._url, json.dumps(parsed_request),
119 status_cb=self.update_progress)
120 return self._url
121
122 def update_progress(self, progress, **kwargs):
123 # Override the parent method to also print out the status.
124 super().update_progress(progress, **kwargs)
125 print("{} is {:0.2f}% done".format(self.name, progress * 100))
126
127
128class DeclareSuccess(task.Task):
129 def execute(self, sent_to):
130 print("Done!")
131 print("All data processed and sent to %s" % (sent_to))
132
133
134class DummyUser:
135 def __init__(self, user, id_):
136 self.user = user
137 self.id = id_
138
139
140# Resources (db handles and similar) of course can *not* be persisted so we
141# need to make sure that we pass this resource fetcher to the tasks constructor
142# so that the tasks have access to any needed resources (the resources are
143# lazily loaded so that they are only created when they are used).
144resources = ResourceFetcher()
145flow = lf.Flow("initialize-me")
146
147# 1. First we extract the api request into a usable format.
148# 2. Then we go ahead and make a database entry for our request.
149flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
150
151# 3. Then we activate our payment method and finally declare success.
152sub_flow = gf.Flow("after-initialize")
153sub_flow.add(ActivateDriver(resources), DeclareSuccess())
154flow.add(sub_flow)
155
156# Initially populate the storage with the following request object,
157# prepopulating this allows the tasks that dependent on the 'request' variable
158# to start processing (in this case this is the ExtractInputRequest task).
159store = {
160 'request': DummyUser(user="bob", id_="1.35"),
161}
162eng = engines.load(flow, engine='serial', store=store)
163
164# This context manager automatically adds (and automatically removes) a
165# helpful set of state transition notification printing helper utilities
166# that show you exactly what transitions the engine is going through
167# while running the various billing related tasks.
168with printing.PrintingListener(eng):
169 eng.run()
Suspending a workflow & resuming¶
Note
Full source located at resume_from_backend
1import logging
2import os
3import sys
4
5logging.basicConfig(level=logging.ERROR)
6
7self_dir = os.path.abspath(os.path.dirname(__file__))
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12sys.path.insert(0, self_dir)
13
14from oslo_utils import uuidutils
15
16import taskflow.engines
17from taskflow.patterns import linear_flow as lf
18from taskflow.persistence import models
19from taskflow import task
20
21import example_utils as eu # noqa
22
23# INTRO: In this example linear_flow is used to group three tasks, one which
24# will suspend the future work the engine may do. This suspend engine is then
25# discarded and the workflow is reloaded from the persisted data and then the
26# workflow is resumed from where it was suspended. This allows you to see how
27# to start an engine, have a task stop the engine from doing future work (if
28# a multi-threaded engine is being used, then the currently active work is not
29# preempted) and then resume the work later.
30#
31# Usage:
32#
33# With a filesystem directory as backend
34#
35# python taskflow/examples/resume_from_backend.py
36#
37# With ZooKeeper as backend
38#
39# python taskflow/examples/resume_from_backend.py \
40# zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
41
42
43# UTILITY FUNCTIONS #########################################
44
45
46def print_task_states(flowdetail, msg):
47 eu.print_wrapped(msg)
48 print("Flow '{}' state: {}".format(flowdetail.name, flowdetail.state))
49 # Sort by these so that our test validation doesn't get confused by the
50 # order in which the items in the flow detail can be in.
51 items = sorted((td.name, td.version, td.state, td.results)
52 for td in flowdetail)
53 for item in items:
54 print(" %s==%s: %s, result=%s" % item)
55
56
57def find_flow_detail(backend, lb_id, fd_id):
58 conn = backend.get_connection()
59 lb = conn.get_logbook(lb_id)
60 return lb.find(fd_id)
61
62
63# CREATE FLOW ###############################################
64
65
66class InterruptTask(task.Task):
67 def execute(self):
68 # DO NOT TRY THIS AT HOME
69 engine.suspend()
70
71
72class TestTask(task.Task):
73 def execute(self):
74 print('executing %s' % self)
75 return 'ok'
76
77
78def flow_factory():
79 return lf.Flow('resume from backend example').add(
80 TestTask(name='first'),
81 InterruptTask(name='boom'),
82 TestTask(name='second'))
83
84
85# INITIALIZE PERSISTENCE ####################################
86
87with eu.get_backend() as backend:
88
89 # Create a place where the persistence information will be stored.
90 book = models.LogBook("example")
91 flow_detail = models.FlowDetail("resume from backend example",
92 uuid=uuidutils.generate_uuid())
93 book.add(flow_detail)
94 with contextlib.closing(backend.get_connection()) as conn:
95 conn.save_logbook(book)
96
97 # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
98
99 flow = flow_factory()
100 engine = taskflow.engines.load(flow, flow_detail=flow_detail,
101 book=book, backend=backend)
102
103 print_task_states(flow_detail, "At the beginning, there is no state")
104 eu.print_wrapped("Running")
105 engine.run()
106 print_task_states(flow_detail, "After running")
107
108 # RE-CREATE, RESUME, RUN ####################################
109
110 eu.print_wrapped("Resuming and running again")
111
112 # NOTE(harlowja): reload the flow detail from backend, this will allow us
113 # to resume the flow from its suspended state, but first we need to search
114 # for the right flow details in the correct logbook where things are
115 # stored.
116 #
117 # We could avoid re-loading the engine and just do engine.run() again, but
118 # this example shows how another process may unsuspend a given flow and
119 # start it again for situations where this is useful to-do (say the process
120 # running the above flow crashes).
121 flow2 = flow_factory()
122 flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
123 engine2 = taskflow.engines.load(flow2,
124 flow_detail=flow_detail_2,
125 backend=backend, book=book)
126 engine2.run()
127 print_task_states(flow_detail_2, "At the end")
Creating a virtual machine (resumable)¶
Note
Full source located at resume_vm_boot
1import hashlib
2import logging
3import os
4import random
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17import futurist
18from oslo_utils import uuidutils
19
20from taskflow import engines
21from taskflow import exceptions as exc
22from taskflow.patterns import graph_flow as gf
23from taskflow.patterns import linear_flow as lf
24from taskflow.persistence import models
25from taskflow import task
26
27import example_utils as eu # noqa
28
29# INTRO: These examples show how a hierarchy of flows can be used to create a
30# vm in a reliable & resumable manner using taskflow + a miniature version of
31# what nova does while booting a vm.
32
33
34@contextlib.contextmanager
35def slow_down(how_long=0.5):
36 try:
37 yield how_long
38 finally:
39 if len(sys.argv) > 1:
40 # Only both to do this if user input provided.
41 print("** Ctrl-c me please!!! **")
42 time.sleep(how_long)
43
44
45class PrintText(task.Task):
46 """Just inserts some text print outs in a workflow."""
47 def __init__(self, print_what, no_slow=False):
48 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
49 super().__init__(name="Print: %s" % (content_hash))
50 self._text = print_what
51 self._no_slow = no_slow
52
53 def execute(self):
54 if self._no_slow:
55 eu.print_wrapped(self._text)
56 else:
57 with slow_down():
58 eu.print_wrapped(self._text)
59
60
61class DefineVMSpec(task.Task):
62 """Defines a vm specification to be."""
63 def __init__(self, name):
64 super().__init__(provides='vm_spec', name=name)
65
66 def execute(self):
67 return {
68 'type': 'kvm',
69 'disks': 2,
70 'vcpu': 1,
71 'ips': 1,
72 'volumes': 3,
73 }
74
75
76class LocateImages(task.Task):
77 """Locates where the vm images are."""
78 def __init__(self, name):
79 super().__init__(provides='image_locations', name=name)
80
81 def execute(self, vm_spec):
82 image_locations = {}
83 for i in range(0, vm_spec['disks']):
84 url = "http://www.yahoo.com/images/%s" % (i)
85 image_locations[url] = "/tmp/%s.img" % (i)
86 return image_locations
87
88
89class DownloadImages(task.Task):
90 """Downloads all the vm images."""
91 def __init__(self, name):
92 super().__init__(provides='download_paths',
93 name=name)
94
95 def execute(self, image_locations):
96 for src, loc in image_locations.items():
97 with slow_down(1):
98 print("Downloading from {} => {}".format(src, loc))
99 return sorted(image_locations.values())
100
101
102class CreateNetworkTpl(task.Task):
103 """Generates the network settings file to be placed in the images."""
104 SYSCONFIG_CONTENTS = """DEVICE=eth%s
105BOOTPROTO=static
106IPADDR=%s
107ONBOOT=yes"""
108
109 def __init__(self, name):
110 super().__init__(provides='network_settings',
111 name=name)
112
113 def execute(self, ips):
114 settings = []
115 for i, ip in enumerate(ips):
116 settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
117 return settings
118
119
120class AllocateIP(task.Task):
121 """Allocates the ips for the given vm."""
122 def __init__(self, name):
123 super().__init__(provides='ips', name=name)
124
125 def execute(self, vm_spec):
126 ips = []
127 for _i in range(0, vm_spec.get('ips', 0)):
128 ips.append("192.168.0.%s" % (random.randint(1, 254)))
129 return ips
130
131
132class WriteNetworkSettings(task.Task):
133 """Writes all the network settings into the downloaded images."""
134 def execute(self, download_paths, network_settings):
135 for j, path in enumerate(download_paths):
136 with slow_down(1):
137 print("Mounting {} to /tmp/{}".format(path, j))
138 for i, setting in enumerate(network_settings):
139 filename = ("/tmp/etc/sysconfig/network-scripts/"
140 "ifcfg-eth%s" % (i))
141 with slow_down(1):
142 print("Writing to %s" % (filename))
143 print(setting)
144
145
146class BootVM(task.Task):
147 """Fires off the vm boot operation."""
148 def execute(self, vm_spec):
149 print("Starting vm!")
150 with slow_down(1):
151 print("Created: %s" % (vm_spec))
152
153
154class AllocateVolumes(task.Task):
155 """Allocates the volumes for the vm."""
156 def execute(self, vm_spec):
157 volumes = []
158 for i in range(0, vm_spec['volumes']):
159 with slow_down(1):
160 volumes.append("/dev/vda%s" % (i + 1))
161 print("Allocated volume %s" % volumes[-1])
162 return volumes
163
164
165class FormatVolumes(task.Task):
166 """Formats the volumes for the vm."""
167 def execute(self, volumes):
168 for v in volumes:
169 print("Formatting volume %s" % v)
170 with slow_down(1):
171 pass
172 print("Formatted volume %s" % v)
173
174
175def create_flow():
176 # Setup the set of things to do (mini-nova).
177 flow = lf.Flow("root").add(
178 PrintText("Starting vm creation.", no_slow=True),
179 lf.Flow('vm-maker').add(
180 # First create a specification for the final vm to-be.
181 DefineVMSpec("define_spec"),
182 # This does all the image stuff.
183 gf.Flow("img-maker").add(
184 LocateImages("locate_images"),
185 DownloadImages("download_images"),
186 ),
187 # This does all the network stuff.
188 gf.Flow("net-maker").add(
189 AllocateIP("get_my_ips"),
190 CreateNetworkTpl("fetch_net_settings"),
191 WriteNetworkSettings("write_net_settings"),
192 ),
193 # This does all the volume stuff.
194 gf.Flow("volume-maker").add(
195 AllocateVolumes("allocate_my_volumes", provides='volumes'),
196 FormatVolumes("volume_formatter"),
197 ),
198 # Finally boot it all.
199 BootVM("boot-it"),
200 ),
201 # Ya it worked!
202 PrintText("Finished vm create.", no_slow=True),
203 PrintText("Instance is running!", no_slow=True))
204 return flow
205
206eu.print_wrapped("Initializing")
207
208# Setup the persistence & resumption layer.
209with eu.get_backend() as backend:
210
211 # Try to find a previously passed in tracking id...
212 try:
213 book_id, flow_id = sys.argv[2].split("+", 1)
214 if not uuidutils.is_uuid_like(book_id):
215 book_id = None
216 if not uuidutils.is_uuid_like(flow_id):
217 flow_id = None
218 except (IndexError, ValueError):
219 book_id = None
220 flow_id = None
221
222 # Set up how we want our engine to run, serial, parallel...
223 try:
224 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
225 except RuntimeError:
226 # No eventlet installed, just let the default be used instead.
227 executor = None
228
229 # Create/fetch a logbook that will track the workflows work.
230 book = None
231 flow_detail = None
232 if all([book_id, flow_id]):
233 # Try to find in a prior logbook and flow detail...
234 with contextlib.closing(backend.get_connection()) as conn:
235 try:
236 book = conn.get_logbook(book_id)
237 flow_detail = book.find(flow_id)
238 except exc.NotFound:
239 pass
240 if book is None and flow_detail is None:
241 book = models.LogBook("vm-boot")
242 with contextlib.closing(backend.get_connection()) as conn:
243 conn.save_logbook(book)
244 engine = engines.load_from_factory(create_flow,
245 backend=backend, book=book,
246 engine='parallel',
247 executor=executor)
248 print("!! Your tracking id is: '{}+{}'".format(
249 book.uuid, engine.storage.flow_uuid))
250 print("!! Please submit this on later runs for tracking purposes")
251 else:
252 # Attempt to load from a previously partially completed flow.
253 engine = engines.load_from_detail(flow_detail, backend=backend,
254 engine='parallel', executor=executor)
255
256 # Make me my vm please!
257 eu.print_wrapped('Running')
258 engine.run()
259
260# How to use.
261#
262# 1. $ python me.py "sqlite:////tmp/nova.db"
263# 2. ctrl-c before this finishes
264# 3. Find the tracking id (search for 'Your tracking id is')
265# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
266# 5. Watch it pick up where it left off.
267# 6. Profit!
Creating a volume (resumable)¶
Note
Full source located at resume_volume_create
1import hashlib
2import logging
3import os
4import random
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17from oslo_utils import uuidutils
18
19from taskflow import engines
20from taskflow.patterns import graph_flow as gf
21from taskflow.patterns import linear_flow as lf
22from taskflow.persistence import models
23from taskflow import task
24
25import example_utils # noqa
26
27# INTRO: These examples show how a hierarchy of flows can be used to create a
28# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
29# version of what cinder does while creating a volume (very miniature).
30
31
32@contextlib.contextmanager
33def slow_down(how_long=0.5):
34 try:
35 yield how_long
36 finally:
37 print("** Ctrl-c me please!!! **")
38 time.sleep(how_long)
39
40
41def find_flow_detail(backend, book_id, flow_id):
42 # NOTE(harlowja): this is used to attempt to find a given logbook with
43 # a given id and a given flow details inside that logbook, we need this
44 # reference so that we can resume the correct flow (as a logbook tracks
45 # flows and a flow detail tracks a individual flow).
46 #
47 # Without a reference to the logbook and the flow details in that logbook
48 # we will not know exactly what we should resume and that would mean we
49 # can't resume what we don't know.
50 with contextlib.closing(backend.get_connection()) as conn:
51 lb = conn.get_logbook(book_id)
52 return lb.find(flow_id)
53
54
55class PrintText(task.Task):
56 def __init__(self, print_what, no_slow=False):
57 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
58 super().__init__(name="Print: %s" % (content_hash))
59 self._text = print_what
60 self._no_slow = no_slow
61
62 def execute(self):
63 if self._no_slow:
64 print("-" * (len(self._text)))
65 print(self._text)
66 print("-" * (len(self._text)))
67 else:
68 with slow_down():
69 print("-" * (len(self._text)))
70 print(self._text)
71 print("-" * (len(self._text)))
72
73
74class CreateSpecForVolumes(task.Task):
75 def execute(self):
76 volumes = []
77 for i in range(0, random.randint(1, 10)):
78 volumes.append({
79 'type': 'disk',
80 'location': "/dev/vda%s" % (i + 1),
81 })
82 return volumes
83
84
85class PrepareVolumes(task.Task):
86 def execute(self, volume_specs):
87 for v in volume_specs:
88 with slow_down():
89 print("Dusting off your hard drive %s" % (v))
90 with slow_down():
91 print("Taking a well deserved break.")
92 print("Your drive %s has been certified." % (v))
93
94
95# Setup the set of things to do (mini-cinder).
96flow = lf.Flow("root").add(
97 PrintText("Starting volume create", no_slow=True),
98 gf.Flow('maker').add(
99 CreateSpecForVolumes("volume_specs", provides='volume_specs'),
100 PrintText("I need a nap, it took me a while to build those specs."),
101 PrepareVolumes(),
102 ),
103 PrintText("Finished volume create", no_slow=True))
104
105# Setup the persistence & resumption layer.
106with example_utils.get_backend() as backend:
107 try:
108 book_id, flow_id = sys.argv[2].split("+", 1)
109 except (IndexError, ValueError):
110 book_id = None
111 flow_id = None
112
113 if not all([book_id, flow_id]):
114 # If no 'tracking id' (think a fedex or ups tracking id) is provided
115 # then we create one by creating a logbook (where flow details are
116 # stored) and creating a flow detail (where flow and task state is
117 # stored). The combination of these 2 objects unique ids (uuids) allows
118 # the users of taskflow to reassociate the workflows that were
119 # potentially running (and which may have partially completed) back
120 # with taskflow so that those workflows can be resumed (or reverted)
121 # after a process/thread/engine has failed in someway.
122 book = models.LogBook('resume-volume-create')
123 flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
124 book.add(flow_detail)
125 with contextlib.closing(backend.get_connection()) as conn:
126 conn.save_logbook(book)
127 print("!! Your tracking id is: '{}+{}'".format(book.uuid,
128 flow_detail.uuid))
129 print("!! Please submit this on later runs for tracking purposes")
130 else:
131 flow_detail = find_flow_detail(backend, book_id, flow_id)
132
133 # Load and run.
134 engine = engines.load(flow,
135 flow_detail=flow_detail,
136 backend=backend, engine='serial')
137 engine.run()
138
139# How to use.
140#
141# 1. $ python me.py "sqlite:////tmp/cinder.db"
142# 2. ctrl-c before this finishes
143# 3. Find the tracking id (search for 'Your tracking id is')
144# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
145# 5. Profit!
Running engines via iteration¶
Note
Full source located at run_by_iter
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6self_dir = os.path.abspath(os.path.dirname(__file__))
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11sys.path.insert(0, self_dir)
12
13
14from taskflow import engines
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18
19# INTRO: This example shows how to run a set of engines at the same time, each
20# running in different engines using a single thread of control to iterate over
21# each engine (which causes that engine to advanced to its next state during
22# each iteration).
23
24
25class EchoTask(task.Task):
26 def execute(self, value):
27 print(value)
28 return chr(ord(value) + 1)
29
30
31def make_alphabet_flow(i):
32 f = lf.Flow("alphabet_%s" % (i))
33 start_value = 'A'
34 end_value = 'Z'
35 curr_value = start_value
36 while ord(curr_value) <= ord(end_value):
37 next_value = chr(ord(curr_value) + 1)
38 if curr_value != end_value:
39 f.add(EchoTask(name="echoer_%s" % curr_value,
40 rebind={'value': curr_value},
41 provides=next_value))
42 else:
43 f.add(EchoTask(name="echoer_%s" % curr_value,
44 rebind={'value': curr_value}))
45 curr_value = next_value
46 return f
47
48
49# Adjust this number to change how many engines/flows run at once.
50flow_count = 1
51flows = []
52for i in range(0, flow_count):
53 f = make_alphabet_flow(i + 1)
54 flows.append(make_alphabet_flow(i + 1))
55engine_iters = []
56for f in flows:
57 e = engines.load(f)
58 e.compile()
59 e.storage.inject({'A': 'A'})
60 e.prepare()
61 engine_iters.append(e.run_iter())
62while engine_iters:
63 for it in list(engine_iters):
64 try:
65 print(next(it))
66 except StopIteration:
67 engine_iters.remove(it)
Controlling retries using a retry controller¶
Note
Full source located at retry_flow
1import os
2import sys
3
4logging.basicConfig(level=logging.ERROR)
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11import taskflow.engines
12from taskflow.patterns import linear_flow as lf
13from taskflow import retry
14from taskflow import task
15
16# INTRO: In this example we create a retry controller that receives a phone
17# directory and tries different phone numbers. The next task tries to call Jim
18# using the given number. If it is not a Jim's number, the task raises an
19# exception and retry controller takes the next number from the phone
20# directory and retries the call.
21#
22# This example shows a basic usage of retry controllers in a flow.
23# Retry controllers allows to revert and retry a failed subflow with new
24# parameters.
25
26
27class CallJim(task.Task):
28 def execute(self, jim_number):
29 print("Calling jim %s." % jim_number)
30 if jim_number != 555:
31 raise Exception("Wrong number!")
32 else:
33 print("Hello Jim!")
34
35 def revert(self, jim_number, **kwargs):
36 print("Wrong number, apologizing.")
37
38
39# Create your flow and associated tasks (the work to be done).
40flow = lf.Flow('retrying-linear',
41 retry=retry.ParameterizedForEach(
42 rebind=['phone_directory'],
43 provides='jim_number')).add(CallJim())
44
45# Now run that flow using the provided initial data (store below).
46taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})
Distributed execution (simple)¶
Note
Full source located at wbe_simple_linear
1import logging
2import os
3import sys
4import tempfile
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.engines.worker_based import worker
13from taskflow.patterns import linear_flow as lf
14from taskflow.tests import utils
15from taskflow.utils import threading_utils
16
17import example_utils # noqa
18
19# INTRO: This example walks through a miniature workflow which shows how to
20# start up a number of workers (these workers will process task execution and
21# reversion requests using any provided input data) and then use an engine
22# that creates a set of *capable* tasks and flows (the engine can not create
23# tasks that the workers are not able to run, this will end in failure) that
24# those workers will run and then executes that workflow seamlessly using the
25# workers to perform the actual execution.
26#
27# NOTE(harlowja): this example simulates the expected larger number of workers
28# by using a set of threads (which in this example simulate the remote workers
29# that would typically be running on other external machines).
30
31# A filesystem can also be used as the queue transport (useful as simple
32# transport type that does not involve setting up a larger mq system). If this
33# is false then the memory transport is used instead, both work in standalone
34# setups.
35USE_FILESYSTEM = False
36BASE_SHARED_CONF = {
37 'exchange': 'taskflow',
38}
39
40# Until https://github.com/celery/kombu/issues/398 is resolved it is not
41# recommended to run many worker threads in this example due to the types
42# of errors mentioned in that issue.
43MEMORY_WORKERS = 2
44FILE_WORKERS = 1
45WORKER_CONF = {
46 # These are the tasks the worker can execute, they *must* be importable,
47 # typically this list is used to restrict what workers may execute to
48 # a smaller set of *allowed* tasks that are known to be safe (one would
49 # not want to allow all python code to be executed).
50 'tasks': [
51 'taskflow.tests.utils:TaskOneArgOneReturn',
52 'taskflow.tests.utils:TaskMultiArgOneReturn'
53 ],
54}
55
56
57def run(engine_options):
58 flow = lf.Flow('simple-linear').add(
59 utils.TaskOneArgOneReturn(provides='result1'),
60 utils.TaskMultiArgOneReturn(provides='result2')
61 )
62 eng = engines.load(flow,
63 store=dict(x=111, y=222, z=333),
64 engine='worker-based', **engine_options)
65 eng.run()
66 return eng.storage.fetch_all()
67
68
69if __name__ == "__main__":
70 logging.basicConfig(level=logging.ERROR)
71
72 # Setup our transport configuration and merge it into the worker and
73 # engine configuration so that both of those use it correctly.
74 shared_conf = dict(BASE_SHARED_CONF)
75
76 tmp_path = None
77 if USE_FILESYSTEM:
78 worker_count = FILE_WORKERS
79 tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
80 shared_conf.update({
81 'transport': 'filesystem',
82 'transport_options': {
83 'data_folder_in': tmp_path,
84 'data_folder_out': tmp_path,
85 'polling_interval': 0.1,
86 },
87 })
88 else:
89 worker_count = MEMORY_WORKERS
90 shared_conf.update({
91 'transport': 'memory',
92 'transport_options': {
93 'polling_interval': 0.1,
94 },
95 })
96 worker_conf = dict(WORKER_CONF)
97 worker_conf.update(shared_conf)
98 engine_options = dict(shared_conf)
99 workers = []
100 worker_topics = []
101
102 try:
103 # Create a set of workers to simulate actual remote workers.
104 print('Running %s workers.' % (worker_count))
105 for i in range(0, worker_count):
106 worker_conf['topic'] = 'worker-%s' % (i + 1)
107 worker_topics.append(worker_conf['topic'])
108 w = worker.Worker(**worker_conf)
109 runner = threading_utils.daemon_thread(w.run)
110 runner.start()
111 w.wait()
112 workers.append((runner, w.stop))
113
114 # Now use those workers to do something.
115 print('Executing some work.')
116 engine_options['topics'] = worker_topics
117 result = run(engine_options)
118 print('Execution finished.')
119 # This is done so that the test examples can work correctly
120 # even when the keys change order (which will happen in various
121 # python versions).
122 print("Result = %s" % json.dumps(result, sort_keys=True))
123 finally:
124 # And cleanup.
125 print('Stopping workers.')
126 while workers:
127 r, stopper = workers.pop()
128 stopper()
129 r.join()
130 if tmp_path:
131 example_utils.rm_path(tmp_path)
Distributed notification (simple)¶
Note
Full source located at wbe_event_sender
1import os
2import string
3import sys
4import time
5
6top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
7 os.pardir,
8 os.pardir))
9sys.path.insert(0, top_dir)
10
11from taskflow import engines
12from taskflow.engines.worker_based import worker
13from taskflow.patterns import linear_flow as lf
14from taskflow import task
15from taskflow.types import notifier
16from taskflow.utils import threading_utils
17
18ANY = notifier.Notifier.ANY
19
20# INTRO: These examples show how to use a remote worker's event notification
21# attribute to proxy back task event notifications to the controlling process.
22#
23# In this case a simple set of events is triggered by a worker running a
24# task (simulated to be remote by using a kombu memory transport and threads).
25# Those events that the 'remote worker' produces will then be proxied back to
26# the task that the engine is running 'remotely', and then they will be emitted
27# back to the original callbacks that exist in the originating engine
28# process/thread. This creates a one-way *notification* channel that can
29# transparently be used in-process, outside-of-process using remote workers and
30# so-on that allows tasks to signal to its controlling process some sort of
31# action that has occurred that the task may need to tell others about (for
32# example to trigger some type of response when the task reaches 50% done...).
33
34
35def event_receiver(event_type, details):
36 """This is the callback that (in this example) doesn't do much..."""
37 print("Recieved event '%s'" % event_type)
38 print("Details = %s" % details)
39
40
41class EventReporter(task.Task):
42 """This is the task that will be running 'remotely' (not really remote)."""
43
44 EVENTS = tuple(string.ascii_uppercase)
45 EVENT_DELAY = 0.1
46
47 def execute(self):
48 for i, e in enumerate(self.EVENTS):
49 details = {
50 'leftover': self.EVENTS[i:],
51 }
52 self.notifier.notify(e, details)
53 time.sleep(self.EVENT_DELAY)
54
55
56BASE_SHARED_CONF = {
57 'exchange': 'taskflow',
58 'transport': 'memory',
59 'transport_options': {
60 'polling_interval': 0.1,
61 },
62}
63
64# Until https://github.com/celery/kombu/issues/398 is resolved it is not
65# recommended to run many worker threads in this example due to the types
66# of errors mentioned in that issue.
67MEMORY_WORKERS = 1
68WORKER_CONF = {
69 'tasks': [
70 # Used to locate which tasks we can run (we don't want to allow
71 # arbitrary code/tasks to be ran by any worker since that would
72 # open up a variety of vulnerabilities).
73 '%s:EventReporter' % (__name__),
74 ],
75}
76
77
78def run(engine_options):
79 reporter = EventReporter()
80 reporter.notifier.register(ANY, event_receiver)
81 flow = lf.Flow('event-reporter').add(reporter)
82 eng = engines.load(flow, engine='worker-based', **engine_options)
83 eng.run()
84
85
86if __name__ == "__main__":
87 logging.basicConfig(level=logging.ERROR)
88
89 # Setup our transport configuration and merge it into the worker and
90 # engine configuration so that both of those objects use it correctly.
91 worker_conf = dict(WORKER_CONF)
92 worker_conf.update(BASE_SHARED_CONF)
93 engine_options = dict(BASE_SHARED_CONF)
94 workers = []
95
96 # These topics will be used to request worker information on; those
97 # workers will respond with their capabilities which the executing engine
98 # will use to match pending tasks to a matched worker, this will cause
99 # the task to be sent for execution, and the engine will wait until it
100 # is finished (a response is received) and then the engine will either
101 # continue with other tasks, do some retry/failure resolution logic or
102 # stop (and potentially re-raise the remote workers failure)...
103 worker_topics = []
104
105 try:
106 # Create a set of worker threads to simulate actual remote workers...
107 print('Running %s workers.' % (MEMORY_WORKERS))
108 for i in range(0, MEMORY_WORKERS):
109 # Give each one its own unique topic name so that they can
110 # correctly communicate with the engine (they will all share the
111 # same exchange).
112 worker_conf['topic'] = 'worker-%s' % (i + 1)
113 worker_topics.append(worker_conf['topic'])
114 w = worker.Worker(**worker_conf)
115 runner = threading_utils.daemon_thread(w.run)
116 runner.start()
117 w.wait()
118 workers.append((runner, w.stop))
119
120 # Now use those workers to do something.
121 print('Executing some work.')
122 engine_options['topics'] = worker_topics
123 result = run(engine_options)
124 print('Execution finished.')
125 finally:
126 # And cleanup.
127 print('Stopping workers.')
128 while workers:
129 r, stopper = workers.pop()
130 stopper()
131 r.join()
Distributed mandelbrot (complex)¶
Note
Full source located at wbe_mandelbrot
Output¶

Code¶
1import math
2import os
3import sys
4
5top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
6 os.pardir,
7 os.pardir))
8sys.path.insert(0, top_dir)
9
10from taskflow import engines
11from taskflow.engines.worker_based import worker
12from taskflow.patterns import unordered_flow as uf
13from taskflow import task
14from taskflow.utils import threading_utils
15
16# INTRO: This example walks through a workflow that will in parallel compute
17# a mandelbrot result set (using X 'remote' workers) and then combine their
18# results together to form a final mandelbrot fractal image. It shows a usage
19# of taskflow to perform a well-known embarrassingly parallel problem that has
20# the added benefit of also being an elegant visualization.
21#
22# NOTE(harlowja): this example simulates the expected larger number of workers
23# by using a set of threads (which in this example simulate the remote workers
24# that would typically be running on other external machines).
25#
26# NOTE(harlowja): to have it produce an image run (after installing pillow):
27#
28# $ python taskflow/examples/wbe_mandelbrot.py output.png
29
30BASE_SHARED_CONF = {
31 'exchange': 'taskflow',
32}
33WORKERS = 2
34WORKER_CONF = {
35 # These are the tasks the worker can execute, they *must* be importable,
36 # typically this list is used to restrict what workers may execute to
37 # a smaller set of *allowed* tasks that are known to be safe (one would
38 # not want to allow all python code to be executed).
39 'tasks': [
40 '%s:MandelCalculator' % (__name__),
41 ],
42}
43ENGINE_CONF = {
44 'engine': 'worker-based',
45}
46
47# Mandelbrot & image settings...
48IMAGE_SIZE = (512, 512)
49CHUNK_COUNT = 8
50MAX_ITERATIONS = 25
51
52
53class MandelCalculator(task.Task):
54 def execute(self, image_config, mandelbrot_config, chunk):
55 """Returns the number of iterations before the computation "escapes".
56
57 Given the real and imaginary parts of a complex number, determine if it
58 is a candidate for membership in the mandelbrot set given a fixed
59 number of iterations.
60 """
61
62 # Parts borrowed from (credit to mark harris and benoît mandelbrot).
63 #
64 # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
65 def mandelbrot(x, y, max_iters):
66 c = complex(x, y)
67 z = 0.0j
68 for i in range(max_iters):
69 z = z * z + c
70 if (z.real * z.real + z.imag * z.imag) >= 4:
71 return i
72 return max_iters
73
74 min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
75 height, width = image_config['size']
76 pixel_size_x = (max_x - min_x) / width
77 pixel_size_y = (max_y - min_y) / height
78 block = []
79 for y in range(chunk[0], chunk[1]):
80 row = []
81 imag = min_y + y * pixel_size_y
82 for x in range(0, width):
83 real = min_x + x * pixel_size_x
84 row.append(mandelbrot(real, imag, max_iters))
85 block.append(row)
86 return block
87
88
89def calculate(engine_conf):
90 # Subdivide the work into X pieces, then request each worker to calculate
91 # one of those chunks and then later we will write these chunks out to
92 # an image bitmap file.
93
94 # And unordered flow is used here since the mandelbrot calculation is an
95 # example of an embarrassingly parallel computation that we can scatter
96 # across as many workers as possible.
97 flow = uf.Flow("mandelbrot")
98
99 # These symbols will be automatically given to tasks as input to their
100 # execute method, in this case these are constants used in the mandelbrot
101 # calculation.
102 store = {
103 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
104 'image_config': {
105 'size': IMAGE_SIZE,
106 }
107 }
108
109 # We need the task names to be in the right order so that we can extract
110 # the final results in the right order (we don't care about the order when
111 # executing).
112 task_names = []
113
114 # Compose our workflow.
115 height, _width = IMAGE_SIZE
116 chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
117 for i in range(0, CHUNK_COUNT):
118 chunk_name = 'chunk_%s' % i
119 task_name = "calculation_%s" % i
120 # Break the calculation up into chunk size pieces.
121 rows = [i * chunk_size, i * chunk_size + chunk_size]
122 flow.add(
123 MandelCalculator(task_name,
124 # This ensures the storage symbol with name
125 # 'chunk_name' is sent into the tasks local
126 # symbol 'chunk'. This is how we give each
127 # calculator its own correct sequence of rows
128 # to work on.
129 rebind={'chunk': chunk_name}))
130 store[chunk_name] = rows
131 task_names.append(task_name)
132
133 # Now execute it.
134 eng = engines.load(flow, store=store, engine_conf=engine_conf)
135 eng.run()
136
137 # Gather all the results and order them for further processing.
138 gather = []
139 for name in task_names:
140 gather.extend(eng.storage.get(name))
141 points = []
142 for y, row in enumerate(gather):
143 for x, color in enumerate(row):
144 points.append(((x, y), color))
145 return points
146
147
148def write_image(results, output_filename=None):
149 print("Gathered %s results that represents a mandelbrot"
150 " image (using %s chunks that are computed jointly"
151 " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
152 if not output_filename:
153 return
154
155 # Pillow (the PIL fork) saves us from writing our own image writer...
156 try:
157 from PIL import Image
158 except ImportError as e:
159 # To currently get this (may change in the future),
160 # $ pip install Pillow
161 raise RuntimeError("Pillow is required to write image files: %s" % e)
162
163 # Limit to 255, find the max and normalize to that...
164 color_max = 0
165 for _point, color in results:
166 color_max = max(color, color_max)
167
168 # Use gray scale since we don't really have other colors.
169 img = Image.new('L', IMAGE_SIZE, "black")
170 pixels = img.load()
171 for (x, y), color in results:
172 if color_max == 0:
173 color = 0
174 else:
175 color = int((float(color) / color_max) * 255.0)
176 pixels[x, y] = color
177 img.save(output_filename)
178
179
180def create_fractal():
181 logging.basicConfig(level=logging.ERROR)
182
183 # Setup our transport configuration and merge it into the worker and
184 # engine configuration so that both of those use it correctly.
185 shared_conf = dict(BASE_SHARED_CONF)
186 shared_conf.update({
187 'transport': 'memory',
188 'transport_options': {
189 'polling_interval': 0.1,
190 },
191 })
192
193 if len(sys.argv) >= 2:
194 output_filename = sys.argv[1]
195 else:
196 output_filename = None
197
198 worker_conf = dict(WORKER_CONF)
199 worker_conf.update(shared_conf)
200 engine_conf = dict(ENGINE_CONF)
201 engine_conf.update(shared_conf)
202 workers = []
203 worker_topics = []
204
205 print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
206 try:
207 # Create a set of workers to simulate actual remote workers.
208 print('Running %s workers.' % (WORKERS))
209 for i in range(0, WORKERS):
210 worker_conf['topic'] = 'calculator_%s' % (i + 1)
211 worker_topics.append(worker_conf['topic'])
212 w = worker.Worker(**worker_conf)
213 runner = threading_utils.daemon_thread(w.run)
214 runner.start()
215 w.wait()
216 workers.append((runner, w.stop))
217
218 # Now use those workers to do something.
219 engine_conf['topics'] = worker_topics
220 results = calculate(engine_conf)
221 print('Execution finished.')
222 finally:
223 # And cleanup.
224 print('Stopping workers.')
225 while workers:
226 r, stopper = workers.pop()
227 stopper()
228 r.join()
229 print("Writing image...")
230 write_image(results, output_filename=output_filename)
231
232
233if __name__ == "__main__":
234 create_fractal()
Jobboard producer/consumer (simple)¶
Note
Full source located at jobboard_produce_consume_colors
1import contextlib
2import logging
3import os
4import random
5import sys
6import threading
7import time
8
9logging.basicConfig(level=logging.ERROR)
10
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15
16from zake import fake_client
17
18from taskflow import exceptions as excp
19from taskflow.jobs import backends
20from taskflow.utils import threading_utils
21
22# In this example we show how a jobboard can be used to post work for other
23# entities to work on. This example creates a set of jobs using one producer
24# thread (typically this would be split across many machines) and then having
25# other worker threads with their own jobboards select work using a given
26# filters [red/blue] and then perform that work (and consuming or abandoning
27# the job after it has been completed or failed).
28
29# Things to note:
30# - No persistence layer is used (or logbook), just the job details are used
31# to determine if a job should be selected by a worker or not.
32# - This example runs in a single process (this is expected to be atypical
33# but this example shows that it can be done if needed, for testing...)
34# - The iterjobs(), claim(), consume()/abandon() worker workflow.
35# - The post() producer workflow.
36
37SHARED_CONF = {
38 'path': "/taskflow/jobs",
39 'board': 'zookeeper',
40}
41
42# How many workers and producers of work will be created (as threads).
43PRODUCERS = 3
44WORKERS = 5
45
46# How many units of work each producer will create.
47PRODUCER_UNITS = 10
48
49# How many units of work are expected to be produced (used so workers can
50# know when to stop running and shutdown, typically this would not be a
51# a value but we have to limit this example's execution time to be less than
52# infinity).
53EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
54
55# Delay between producing/consuming more work.
56WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
57
58# To ensure threads don't trample other threads output.
59STDOUT_LOCK = threading.Lock()
60
61
62def dispatch_work(job):
63 # This is where the jobs contained work *would* be done
64 time.sleep(1.0)
65
66
67def safe_print(name, message, prefix=""):
68 with STDOUT_LOCK:
69 if prefix:
70 print("{} {}: {}".format(prefix, name, message))
71 else:
72 print("{}: {}".format(name, message))
73
74
75def worker(ident, client, consumed):
76 # Create a personal board (using the same client so that it works in
77 # the same process) and start looking for jobs on the board that we want
78 # to perform.
79 name = "W-%s" % (ident)
80 safe_print(name, "started")
81 claimed_jobs = 0
82 consumed_jobs = 0
83 abandoned_jobs = 0
84 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
85 while len(consumed) != EXPECTED_UNITS:
86 favorite_color = random.choice(['blue', 'red'])
87 for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
88 # See if we should even bother with it...
89 if job.details.get('color') != favorite_color:
90 continue
91 safe_print(name, "'%s' [attempting claim]" % (job))
92 try:
93 board.claim(job, name)
94 claimed_jobs += 1
95 safe_print(name, "'%s' [claimed]" % (job))
96 except (excp.NotFound, excp.UnclaimableJob):
97 safe_print(name, "'%s' [claim unsuccessful]" % (job))
98 else:
99 try:
100 dispatch_work(job)
101 board.consume(job, name)
102 safe_print(name, "'%s' [consumed]" % (job))
103 consumed_jobs += 1
104 consumed.append(job)
105 except Exception:
106 board.abandon(job, name)
107 abandoned_jobs += 1
108 safe_print(name, "'%s' [abandoned]" % (job))
109 time.sleep(WORKER_DELAY)
110 safe_print(name,
111 "finished (claimed %s jobs, consumed %s jobs,"
112 " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
113 abandoned_jobs), prefix=">>>")
114
115
116def producer(ident, client):
117 # Create a personal board (using the same client so that it works in
118 # the same process) and start posting jobs on the board that we want
119 # some entity to perform.
120 name = "P-%s" % (ident)
121 safe_print(name, "started")
122 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
123 for i in range(0, PRODUCER_UNITS):
124 job_name = "{}-{}".format(name, i)
125 details = {
126 'color': random.choice(['red', 'blue']),
127 }
128 job = board.post(job_name, book=None, details=details)
129 safe_print(name, "'%s' [posted]" % (job))
130 time.sleep(PRODUCER_DELAY)
131 safe_print(name, "finished", prefix=">>>")
132
133
134def main():
135 # TODO(harlowja): Hack to make eventlet work right, remove when the
136 # following is fixed: https://github.com/eventlet/eventlet/issues/230
137 from taskflow.utils import eventlet_utils as _eu # noqa
138 try:
139 import eventlet as _eventlet # noqa
140 except ImportError:
141 pass
142
143 with contextlib.closing(fake_client.FakeClient()) as c:
144 created = []
145 for i in range(0, PRODUCERS):
146 p = threading_utils.daemon_thread(producer, i + 1, c)
147 created.append(p)
148 p.start()
149 consumed = collections.deque()
150 for i in range(0, WORKERS):
151 w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
152 created.append(w)
153 w.start()
154 while created:
155 t = created.pop()
156 t.join()
157 # At the end there should be nothing leftover, let's verify that.
158 board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
159 board.connect()
160 with contextlib.closing(board):
161 if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
162 return 1
163 return 0
164
165
166if __name__ == "__main__":
167 sys.exit(main())
Conductor simulating a CI pipeline¶
Note
Full source located at tox_conductor
1import itertools
2import logging
3import os
4import shutil
5import socket
6import sys
7import tempfile
8import threading
9import time
10
11logging.basicConfig(level=logging.ERROR)
12
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16sys.path.insert(0, top_dir)
17
18from oslo_utils import timeutils
19from oslo_utils import uuidutils
20from zake import fake_client
21
22from taskflow.conductors import backends as conductors
23from taskflow import engines
24from taskflow.jobs import backends as boards
25from taskflow.patterns import linear_flow
26from taskflow.persistence import backends as persistence
27from taskflow.persistence import models
28from taskflow import task
29from taskflow.utils import threading_utils
30
31# INTRO: This examples shows how a worker/producer can post desired work (jobs)
32# to a jobboard and a conductor can consume that work (jobs) from that jobboard
33# and execute those jobs in a reliable & async manner (for example, if the
34# conductor were to crash then the job will be released back onto the jobboard
35# and another conductor can attempt to finish it, from wherever that job last
36# left off).
37#
38# In this example a in-memory jobboard (and in-memory storage) is created and
39# used that simulates how this would be done at a larger scale (it is an
40# example after all).
41
42# Restrict how long this example runs for...
43RUN_TIME = 5
44REVIEW_CREATION_DELAY = 0.5
45SCAN_DELAY = 0.1
46NAME = "{}_{}".format(socket.getfqdn(), os.getpid())
47
48# This won't really use zookeeper but will use a local version of it using
49# the zake library that mimics an actual zookeeper cluster using threads and
50# an in-memory data structure.
51JOBBOARD_CONF = {
52 'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
53}
54
55
56class RunReview(task.Task):
57 # A dummy task that clones the review and runs tox...
58
59 def _clone_review(self, review, temp_dir):
60 print("Cloning review '{}' into {}".format(review['id'], temp_dir))
61
62 def _run_tox(self, temp_dir):
63 print("Running tox in %s" % temp_dir)
64
65 def execute(self, review, temp_dir):
66 self._clone_review(review, temp_dir)
67 self._run_tox(temp_dir)
68
69
70class MakeTempDir(task.Task):
71 # A task that creates and destroys a temporary dir (on failure).
72 #
73 # It provides the location of the temporary dir for other tasks to use
74 # as they see fit.
75
76 default_provides = 'temp_dir'
77
78 def execute(self):
79 return tempfile.mkdtemp()
80
81 def revert(self, *args, **kwargs):
82 temp_dir = kwargs.get(task.REVERT_RESULT)
83 if temp_dir:
84 shutil.rmtree(temp_dir)
85
86
87class CleanResources(task.Task):
88 # A task that cleans up any workflow resources.
89
90 def execute(self, temp_dir):
91 print("Removing %s" % temp_dir)
92 shutil.rmtree(temp_dir)
93
94
95def review_iter():
96 """Makes reviews (never-ending iterator/generator)."""
97 review_id_gen = itertools.count(0)
98 while True:
99 review_id = next(review_id_gen)
100 review = {
101 'id': review_id,
102 }
103 yield review
104
105
106# The reason this is at the module namespace level is important, since it must
107# be accessible from a conductor dispatching an engine, if it was a lambda
108# function for example, it would not be reimportable and the conductor would
109# be unable to reference it when creating the workflow to run.
110def create_review_workflow():
111 """Factory method used to create a review workflow to run."""
112 f = linear_flow.Flow("tester")
113 f.add(
114 MakeTempDir(name="maker"),
115 RunReview(name="runner"),
116 CleanResources(name="cleaner")
117 )
118 return f
119
120
121def generate_reviewer(client, saver, name=NAME):
122 """Creates a review producer thread with the given name prefix."""
123 real_name = "%s_reviewer" % name
124 no_more = threading.Event()
125 jb = boards.fetch(real_name, JOBBOARD_CONF,
126 client=client, persistence=saver)
127
128 def make_save_book(saver, review_id):
129 # Record what we want to happen (sometime in the future).
130 book = models.LogBook("book_%s" % review_id)
131 detail = models.FlowDetail("flow_%s" % review_id,
132 uuidutils.generate_uuid())
133 book.add(detail)
134 # Associate the factory method we want to be called (in the future)
135 # with the book, so that the conductor will be able to call into
136 # that factory to retrieve the workflow objects that represent the
137 # work.
138 #
139 # These args and kwargs *can* be used to save any specific parameters
140 # into the factory when it is being called to create the workflow
141 # objects (typically used to tell a factory how to create a unique
142 # workflow that represents this review).
143 factory_args = ()
144 factory_kwargs = {}
145 engines.save_factory_details(detail, create_review_workflow,
146 factory_args, factory_kwargs)
147 with contextlib.closing(saver.get_connection()) as conn:
148 conn.save_logbook(book)
149 return book
150
151 def run():
152 """Periodically publishes 'fake' reviews to analyze."""
153 jb.connect()
154 review_generator = review_iter()
155 with contextlib.closing(jb):
156 while not no_more.is_set():
157 review = next(review_generator)
158 details = {
159 'store': {
160 'review': review,
161 },
162 }
163 job_name = "{}_{}".format(real_name, review['id'])
164 print("Posting review '%s'" % review['id'])
165 jb.post(job_name,
166 book=make_save_book(saver, review['id']),
167 details=details)
168 time.sleep(REVIEW_CREATION_DELAY)
169
170 # Return the unstarted thread, and a callback that can be used
171 # shutdown that thread (to avoid running forever).
172 return (threading_utils.daemon_thread(target=run), no_more.set)
173
174
175def generate_conductor(client, saver, name=NAME):
176 """Creates a conductor thread with the given name prefix."""
177 real_name = "%s_conductor" % name
178 jb = boards.fetch(name, JOBBOARD_CONF,
179 client=client, persistence=saver)
180 conductor = conductors.fetch("blocking", real_name, jb,
181 engine='parallel', wait_timeout=SCAN_DELAY)
182
183 def run():
184 jb.connect()
185 with contextlib.closing(jb):
186 conductor.run()
187
188 # Return the unstarted thread, and a callback that can be used
189 # shutdown that thread (to avoid running forever).
190 return (threading_utils.daemon_thread(target=run), conductor.stop)
191
192
193def main():
194 # Need to share the same backend, so that data can be shared...
195 persistence_conf = {
196 'connection': 'memory',
197 }
198 saver = persistence.fetch(persistence_conf)
199 with contextlib.closing(saver.get_connection()) as conn:
200 # This ensures that the needed backend setup/data directories/schema
201 # upgrades and so on... exist before they are attempted to be used...
202 conn.upgrade()
203 fc1 = fake_client.FakeClient()
204 # Done like this to share the same client storage location so the correct
205 # zookeeper features work across clients...
206 fc2 = fake_client.FakeClient(storage=fc1.storage)
207 entities = [
208 generate_reviewer(fc1, saver),
209 generate_conductor(fc2, saver),
210 ]
211 for t, stopper in entities:
212 t.start()
213 try:
214 watch = timeutils.StopWatch(duration=RUN_TIME)
215 watch.start()
216 while not watch.expired():
217 time.sleep(0.1)
218 finally:
219 for t, stopper in reversed(entities):
220 stopper()
221 t.join()
222
223
224if __name__ == '__main__':
225 main()
Conductor running 99 bottles of beer song requests¶
Note
Full source located at 99_bottles
1import functools
2import logging
3import os
4import sys
5import time
6import traceback
7
8from kazoo import client
9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13sys.path.insert(0, top_dir)
14
15from taskflow.conductors import backends as conductor_backends
16from taskflow import engines
17from taskflow.jobs import backends as job_backends
18from taskflow import logging as taskflow_logging
19from taskflow.patterns import linear_flow as lf
20from taskflow.persistence import backends as persistence_backends
21from taskflow.persistence import models
22from taskflow import task
23
24from oslo_utils import timeutils
25from oslo_utils import uuidutils
26
27# Instructions!
28#
29# 1. Install zookeeper (or change host listed below)
30# 2. Download this example, place in file '99_bottles.py'
31# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
32# 4. Run `python 99_bottles.py c` a few times (in different shells)
33# 5. On demand kill previously listed processes created in (4) and watch
34# the work resume on another process (and repeat)
35# 6. Keep enough workers alive to eventually finish the song (if desired).
36
37ME = os.getpid()
38ZK_HOST = "localhost:2181"
39JB_CONF = {
40 'hosts': ZK_HOST,
41 'board': 'zookeeper',
42 'path': '/taskflow/99-bottles-demo',
43}
44PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
45TAKE_DOWN_DELAY = 1.0
46PASS_AROUND_DELAY = 3.0
47HOW_MANY_BOTTLES = 99
48
49
50class TakeABottleDown(task.Task):
51 def execute(self, bottles_left):
52 sys.stdout.write('Take one down, ')
53 sys.stdout.flush()
54 time.sleep(TAKE_DOWN_DELAY)
55 return bottles_left - 1
56
57
58class PassItAround(task.Task):
59 def execute(self):
60 sys.stdout.write('pass it around, ')
61 sys.stdout.flush()
62 time.sleep(PASS_AROUND_DELAY)
63
64
65class Conclusion(task.Task):
66 def execute(self, bottles_left):
67 sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
68 sys.stdout.flush()
69
70
71def make_bottles(count):
72 # This is the function that will be called to generate the workflow
73 # and will also be called to regenerate it on resumption so that work
74 # can continue from where it last left off...
75
76 s = lf.Flow("bottle-song")
77
78 take_bottle = TakeABottleDown("take-bottle-%s" % count,
79 inject={'bottles_left': count},
80 provides='bottles_left')
81 pass_it = PassItAround("pass-%s-around" % count)
82 next_bottles = Conclusion("next-bottles-%s" % (count - 1))
83 s.add(take_bottle, pass_it, next_bottles)
84
85 for bottle in reversed(list(range(1, count))):
86 take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
87 provides='bottles_left')
88 pass_it = PassItAround("pass-%s-around" % bottle)
89 next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
90 s.add(take_bottle, pass_it, next_bottles)
91
92 return s
93
94
95def run_conductor(only_run_once=False):
96 # This continuously runs consumers until its stopped via ctrl-c or other
97 # kill signal...
98 event_watches = {}
99
100 # This will be triggered by the conductor doing various activities
101 # with engines, and is quite nice to be able to see the various timing
102 # segments (which is useful for debugging, or watching, or figuring out
103 # where to optimize).
104 def on_conductor_event(cond, event, details):
105 print("Event '%s' has been received..." % event)
106 print("Details = %s" % details)
107 if event.endswith("_start"):
108 w = timeutils.StopWatch()
109 w.start()
110 base_event = event[0:-len("_start")]
111 event_watches[base_event] = w
112 if event.endswith("_end"):
113 base_event = event[0:-len("_end")]
114 try:
115 w = event_watches.pop(base_event)
116 w.stop()
117 print("It took %0.3f seconds for event '%s' to finish"
118 % (w.elapsed(), base_event))
119 except KeyError:
120 pass
121 if event == 'running_end' and only_run_once:
122 cond.stop()
123
124 print("Starting conductor with pid: %s" % ME)
125 my_name = "conductor-%s" % ME
126 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
127 with contextlib.closing(persist_backend):
128 with contextlib.closing(persist_backend.get_connection()) as conn:
129 conn.upgrade()
130 job_backend = job_backends.fetch(my_name, JB_CONF,
131 persistence=persist_backend)
132 job_backend.connect()
133 with contextlib.closing(job_backend):
134 cond = conductor_backends.fetch('blocking', my_name, job_backend,
135 persistence=persist_backend)
136 on_conductor_event = functools.partial(on_conductor_event, cond)
137 cond.notifier.register(cond.notifier.ANY, on_conductor_event)
138 # Run forever, and kill -9 or ctrl-c me...
139 try:
140 cond.run()
141 finally:
142 cond.stop()
143 cond.wait()
144
145
146def run_poster():
147 # This just posts a single job and then ends...
148 print("Starting poster with pid: %s" % ME)
149 my_name = "poster-%s" % ME
150 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
151 with contextlib.closing(persist_backend):
152 with contextlib.closing(persist_backend.get_connection()) as conn:
153 conn.upgrade()
154 job_backend = job_backends.fetch(my_name, JB_CONF,
155 persistence=persist_backend)
156 job_backend.connect()
157 with contextlib.closing(job_backend):
158 # Create information in the persistence backend about the
159 # unit of work we want to complete and the factory that
160 # can be called to create the tasks that the work unit needs
161 # to be done.
162 lb = models.LogBook("post-from-%s" % my_name)
163 fd = models.FlowDetail("song-from-%s" % my_name,
164 uuidutils.generate_uuid())
165 lb.add(fd)
166 with contextlib.closing(persist_backend.get_connection()) as conn:
167 conn.save_logbook(lb)
168 engines.save_factory_details(fd, make_bottles,
169 [HOW_MANY_BOTTLES], {},
170 backend=persist_backend)
171 # Post, and be done with it!
172 jb = job_backend.post("song-from-%s" % my_name, book=lb)
173 print("Posted: %s" % jb)
174 print("Goodbye...")
175
176
177def main_local():
178 # Run locally typically this is activating during unit testing when all
179 # the examples are made sure to still function correctly...
180 global TAKE_DOWN_DELAY
181 global PASS_AROUND_DELAY
182 global JB_CONF
183 # Make everything go much faster (so that this finishes quickly).
184 PASS_AROUND_DELAY = 0.01
185 TAKE_DOWN_DELAY = 0.01
186 JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
187 run_poster()
188 run_conductor(only_run_once=True)
189
190
191def check_for_zookeeper(timeout=1):
192 sys.stderr.write("Testing for the existence of a zookeeper server...\n")
193 sys.stderr.write("Please wait....\n")
194 with contextlib.closing(client.KazooClient()) as test_client:
195 try:
196 test_client.start(timeout=timeout)
197 except test_client.handler.timeout_exception:
198 sys.stderr.write("Zookeeper is needed for running this example!\n")
199 traceback.print_exc()
200 return False
201 else:
202 test_client.stop()
203 return True
204
205
206def main():
207 if not check_for_zookeeper():
208 return
209 if len(sys.argv) == 1:
210 main_local()
211 elif sys.argv[1] in ('p', 'c'):
212 if sys.argv[-1] == "v":
213 logging.basicConfig(level=taskflow_logging.TRACE)
214 else:
215 logging.basicConfig(level=logging.ERROR)
216 if sys.argv[1] == 'p':
217 run_poster()
218 else:
219 run_conductor()
220 else:
221 sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
222
223
224if __name__ == '__main__':
225 main()