Examples¶
While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):
To explore more of these examples please check out the examples directory in the TaskFlow source tree.
Note
If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.
Hello world¶
Note
Full source located at hello_world.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow.patterns import unordered_flow as uf
16from taskflow import task
17
18
19# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
20# an overly simplistic workflow can be created that runs using different
21# engines using different styles of execution (all can be used to run in
22# parallel if a workflow is provided that is parallelizable).
23
24class PrinterTask(task.Task):
25 def __init__(self, name, show_name=True, inject=None):
26 super(PrinterTask, self).__init__(name, inject=inject)
27 self._show_name = show_name
28
29 def execute(self, output):
30 if self._show_name:
31 print("%s: %s" % (self.name, output))
32 else:
33 print(output)
34
35
36# This will be the work that we want done, which for this example is just to
37# print 'hello world' (like a song) using different tasks and different
38# execution models.
39song = lf.Flow("beats")
40
41# Unordered flows when ran can be ran in parallel; and a chorus is everyone
42# singing at once of course!
43hi_chorus = uf.Flow('hello')
44world_chorus = uf.Flow('world')
45for (name, hello, world) in [('bob', 'hello', 'world'),
46 ('joe', 'hellooo', 'worllllld'),
47 ('sue', "helloooooo!", 'wooorllld!')]:
48 hi_chorus.add(PrinterTask("%s@hello" % name,
49 # This will show up to the execute() method of
50 # the task as the argument named 'output' (which
51 # will allow us to print the character we want).
52 inject={'output': hello}))
53 world_chorus.add(PrinterTask("%s@world" % name,
54 inject={'output': world}))
55
56# The composition starts with the conductor and then runs in sequence with
57# the chorus running in parallel, but no matter what the 'hello' chorus must
58# always run before the 'world' chorus (otherwise the world will fall apart).
59song.add(PrinterTask("conductor@begin",
60 show_name=False, inject={'output': "*ding*"}),
61 hi_chorus,
62 world_chorus,
63 PrinterTask("conductor@end",
64 show_name=False, inject={'output': "*dong*"}))
65
66# Run in parallel using eventlet green threads...
67try:
68 import eventlet as _eventlet # noqa
69except ImportError:
70 # No eventlet currently active, skip running with it...
71 pass
72else:
73 print("-- Running in parallel using eventlet --")
74 e = engines.load(song, executor='greenthreaded', engine='parallel',
75 max_workers=1)
76 e.run()
77
78
79# Run in parallel using real threads...
80print("-- Running in parallel using threads --")
81e = engines.load(song, executor='threaded', engine='parallel',
82 max_workers=1)
83e.run()
84
85
86# Run in parallel using external processes...
87print("-- Running in parallel using processes --")
88e = engines.load(song, executor='processes', engine='parallel',
89 max_workers=1)
90e.run()
91
92
93# Run serially (aka, if the workflow could have been ran in parallel, it will
94# not be when ran in this mode)...
95print("-- Running serially --")
96e = engines.load(song, engine='serial')
97e.run()
98print("-- Statistics gathered --")
99print(e.statistics)
Passing values from and to tasks¶
Note
Full source located at simple_linear_pass.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8self_dir = os.path.abspath(os.path.dirname(__file__))
9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15from taskflow import engines
16from taskflow.patterns import linear_flow
17from taskflow import task
18
19# INTRO: This example shows how a task (in a linear/serial workflow) can
20# produce an output that can be then consumed/used by a downstream task.
21
22
23class TaskA(task.Task):
24 default_provides = 'a'
25
26 def execute(self):
27 print("Executing '%s'" % (self.name))
28 return 'a'
29
30
31class TaskB(task.Task):
32 def execute(self, a):
33 print("Executing '%s'" % (self.name))
34 print("Got input '%s'" % (a))
35
36
37print("Constructing...")
38wf = linear_flow.Flow("pass-from-to")
39wf.add(TaskA('a'), TaskB('b'))
40
41print("Loading...")
42e = engines.load(wf)
43
44print("Compiling...")
45e.compile()
46
47print("Preparing...")
48e.prepare()
49
50print("Running...")
51e.run()
52
53print("Done...")
Using listeners¶
Note
Full source located at echo_listener.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.DEBUG)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import logging as logging_listener
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: This example walks through a miniature workflow which will do a
19# simple echo operation; during this execution a listener is associated with
20# the engine to receive all notifications about what the flow has performed,
21# this example dumps that output to the stdout for viewing (at debug level
22# to show all the information which is possible).
23
24
25class Echo(task.Task):
26 def execute(self):
27 print(self.name)
28
29
30# Generate the work to be done (but don't do it yet).
31wf = lf.Flow('abc')
32wf.add(Echo('a'))
33wf.add(Echo('b'))
34wf.add(Echo('c'))
35
36# This will associate the listener with the engine (the listener
37# will automatically register for notifications with the engine and deregister
38# when the context is exited).
39e = engines.load(wf)
40with logging_listener.DynamicLoggingListener(e):
41 e.run()
Using listeners (to watch a phone call)¶
Note
Full source located at simple_linear_listening.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16from taskflow.types import notifier
17
18ANY = notifier.Notifier.ANY
19
20# INTRO: In this example we create two tasks (this time as functions instead
21# of task subclasses as in the simple_linear.py example), each of which ~calls~
22# a given ~phone~ number (provided as a function input) in a linear fashion
23# (one after the other).
24#
25# For a workflow which is serial this shows an extremely simple way
26# of structuring your tasks (the code that does the work) into a linear
27# sequence (the flow) and then passing the work off to an engine, with some
28# initial data to be ran in a reliable manner.
29#
30# This example shows a basic usage of the taskflow structures without involving
31# the complexity of persistence. Using the structures that taskflow provides
32# via tasks and flows makes it possible for you to easily at a later time
33# hook in a persistence layer (and then gain the functionality that offers)
34# when you decide the complexity of adding that layer in is 'worth it' for your
35# applications usage pattern (which some applications may not need).
36#
37# It **also** adds on to the simple_linear.py example by adding a set of
38# callback functions which the engine will call when a flow state transition
39# or task state transition occurs. These types of functions are useful for
40# updating task or flow progress, or for debugging, sending notifications to
41# external systems, or for other yet unknown future usage that you may create!
42
43
44def call_jim(context):
45 print("Calling jim.")
46 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
47
48
49def call_joe(context):
50 print("Calling joe.")
51 print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
52
53
54def flow_watch(state, details):
55 print('Flow => %s' % state)
56
57
58def task_watch(state, details):
59 print('Task %s => %s' % (details.get('task_name'), state))
60
61
62# Wrap your functions into a task type that knows how to treat your functions
63# as tasks. There was previous work done to just allow a function to be
64# directly passed, but in python 3.0 there is no easy way to capture an
65# instance method, so this wrapping approach was decided upon instead which
66# can attach to instance methods (if that's desired).
67flow = lf.Flow("Call-them")
68flow.add(task.FunctorTask(execute=call_jim))
69flow.add(task.FunctorTask(execute=call_joe))
70
71# Now load (but do not run) the flow using the provided initial data.
72engine = taskflow.engines.load(flow, store={
73 'context': {
74 "joe_number": 444,
75 "jim_number": 555,
76 }
77})
78
79# This is where we attach our callback functions to the 2 different
80# notification objects that an engine exposes. The usage of a ANY (kleene star)
81# here means that we want to be notified on all state changes, if you want to
82# restrict to a specific state change, just register that instead.
83engine.notifier.register(ANY, flow_watch)
84engine.atom_notifier.register(ANY, task_watch)
85
86# And now run!
87engine.run()
Dumping a in-memory backend¶
Note
Full source located at dump_memory_backend.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8self_dir = os.path.abspath(os.path.dirname(__file__))
9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15from taskflow import engines
16from taskflow.patterns import linear_flow as lf
17from taskflow import task
18
19# INTRO: in this example we create a dummy flow with a dummy task, and run
20# it using a in-memory backend and pre/post run we dump out the contents
21# of the in-memory backends tree structure (which can be quite useful to
22# look at for debugging or other analysis).
23
24
25class PrintTask(task.Task):
26 def execute(self):
27 print("Running '%s'" % self.name)
28
29# Make a little flow and run it...
30f = lf.Flow('root')
31for alpha in ['a', 'b', 'c']:
32 f.add(PrintTask(alpha))
33
34e = engines.load(f)
35e.compile()
36e.prepare()
37
38# After prepare the storage layer + backend can now be accessed safely...
39backend = e.storage.backend
40
41print("----------")
42print("Before run")
43print("----------")
44print(backend.memory.pformat())
45print("----------")
46
47e.run()
48
49print("---------")
50print("After run")
51print("---------")
52for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
53 value = backend.memory[path]
54 if value:
55 print("%s -> %s" % (path, value))
56 else:
57 print("%s" % (path))
Making phone calls¶
Note
Full source located at simple_linear.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: In this example we create two tasks, each of which ~calls~ a given
18# ~phone~ number (provided as a function input) in a linear fashion (one after
19# the other). For a workflow which is serial this shows a extremely simple way
20# of structuring your tasks (the code that does the work) into a linear
21# sequence (the flow) and then passing the work off to an engine, with some
22# initial data to be ran in a reliable manner.
23#
24# NOTE(harlowja): This example shows a basic usage of the taskflow structures
25# without involving the complexity of persistence. Using the structures that
26# taskflow provides via tasks and flows makes it possible for you to easily at
27# a later time hook in a persistence layer (and then gain the functionality
28# that offers) when you decide the complexity of adding that layer in
29# is 'worth it' for your application's usage pattern (which certain
30# applications may not need).
31
32
33class CallJim(task.Task):
34 def execute(self, jim_number, *args, **kwargs):
35 print("Calling jim %s." % jim_number)
36
37
38class CallJoe(task.Task):
39 def execute(self, joe_number, *args, **kwargs):
40 print("Calling joe %s." % joe_number)
41
42
43# Create your flow and associated tasks (the work to be done).
44flow = lf.Flow('simple-linear').add(
45 CallJim(),
46 CallJoe()
47)
48
49# Now run that flow using the provided initial data (store below).
50taskflow.engines.run(flow, store=dict(joe_number=444,
51 jim_number=555))
Making phone calls (automatically reverting)¶
Note
Full source located at reverting_linear.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: In this example we create three tasks, each of which ~calls~ a given
18# number (provided as a function input), one of those tasks *fails* calling a
19# given number (the suzzie calling); this causes the workflow to enter the
20# reverting process, which activates the revert methods of the previous two
21# phone ~calls~.
22#
23# This simulated calling makes it appear like all three calls occur or all
24# three don't occur (transaction-like capabilities). No persistence layer is
25# used here so reverting and executing will *not* be tolerant of process
26# failure.
27
28
29class CallJim(task.Task):
30 def execute(self, jim_number, *args, **kwargs):
31 print("Calling jim %s." % jim_number)
32
33 def revert(self, jim_number, *args, **kwargs):
34 print("Calling %s and apologizing." % jim_number)
35
36
37class CallJoe(task.Task):
38 def execute(self, joe_number, *args, **kwargs):
39 print("Calling joe %s." % joe_number)
40
41 def revert(self, joe_number, *args, **kwargs):
42 print("Calling %s and apologizing." % joe_number)
43
44
45class CallSuzzie(task.Task):
46 def execute(self, suzzie_number, *args, **kwargs):
47 raise IOError("Suzzie not home right now.")
48
49
50# Create your flow and associated tasks (the work to be done).
51flow = lf.Flow('simple-linear').add(
52 CallJim(),
53 CallJoe(),
54 CallSuzzie()
55)
56
57try:
58 # Now run that flow using the provided initial data (store below).
59 taskflow.engines.run(flow, store=dict(joe_number=444,
60 jim_number=555,
61 suzzie_number=666))
62except Exception as e:
63 # NOTE(harlowja): This exception will be the exception that came out of the
64 # 'CallSuzzie' task instead of a different exception, this is useful since
65 # typically surrounding code wants to handle the original exception and not
66 # a wrapped or altered one.
67 #
68 # *WARNING* If this flow was multi-threaded and multiple active tasks threw
69 # exceptions then the above exception would be wrapped into a combined
70 # exception (the object has methods to iterate over the contained
71 # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
72 # how to deal with multiple tasks failing while running.
73 #
74 # You will also note that this is not a problem in this case since no
75 # parallelism is involved; this is ensured by the usage of a linear flow
76 # and the default engine type which is 'serial' vs being 'parallel'.
77 print("Flow failed: %s" % e)
Building a car¶
Note
Full source located at build_a_car.
1
2import logging
3import os
4import sys
5
6
7logging.basicConfig(level=logging.ERROR)
8
9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12sys.path.insert(0, top_dir)
13
14
15import taskflow.engines
16from taskflow.patterns import graph_flow as gf
17from taskflow.patterns import linear_flow as lf
18from taskflow import task
19from taskflow.types import notifier
20
21ANY = notifier.Notifier.ANY
22
23import example_utils as eu # noqa
24
25
26# INTRO: This example shows how a graph flow and linear flow can be used
27# together to execute dependent & non-dependent tasks by going through the
28# steps required to build a simplistic car (an assembly line if you will). It
29# also shows how raw functions can be wrapped into a task object instead of
30# being forced to use the more *heavy* task base class. This is useful in
31# scenarios where pre-existing code has functions that you easily want to
32# plug-in to taskflow, without requiring a large amount of code changes.
33
34
35def build_frame():
36 return 'steel'
37
38
39def build_engine():
40 return 'honda'
41
42
43def build_doors():
44 return '2'
45
46
47def build_wheels():
48 return '4'
49
50
51# These just return true to indiciate success, they would in the real work
52# do more than just that.
53
54def install_engine(frame, engine):
55 return True
56
57
58def install_doors(frame, windows_installed, doors):
59 return True
60
61
62def install_windows(frame, doors):
63 return True
64
65
66def install_wheels(frame, engine, engine_installed, wheels):
67 return True
68
69
70def trash(**kwargs):
71 eu.print_wrapped("Throwing away pieces of car!")
72
73
74def startup(**kwargs):
75 # If you want to see the rollback function being activated try uncommenting
76 # the following line.
77 #
78 # raise ValueError("Car not verified")
79 return True
80
81
82def verify(spec, **kwargs):
83 # If the car is not what we ordered throw away the car (trigger reversion).
84 for key, value in kwargs.items():
85 if spec[key] != value:
86 raise Exception("Car doesn't match spec!")
87 return True
88
89
90# These two functions connect into the state transition notification emission
91# points that the engine outputs, they can be used to log state transitions
92# that are occurring, or they can be used to suspend the engine (or perform
93# other useful activities).
94def flow_watch(state, details):
95 print('Flow => %s' % state)
96
97
98def task_watch(state, details):
99 print('Task %s => %s' % (details.get('task_name'), state))
100
101
102flow = lf.Flow("make-auto").add(
103 task.FunctorTask(startup, revert=trash, provides='ran'),
104 # A graph flow allows automatic dependency based ordering, the ordering
105 # is determined by analyzing the symbols required and provided and ordering
106 # execution based on a functioning order (if one exists).
107 gf.Flow("install-parts").add(
108 task.FunctorTask(build_frame, provides='frame'),
109 task.FunctorTask(build_engine, provides='engine'),
110 task.FunctorTask(build_doors, provides='doors'),
111 task.FunctorTask(build_wheels, provides='wheels'),
112 # These *_installed outputs allow for other tasks to depend on certain
113 # actions being performed (aka the components were installed), another
114 # way to do this is to link() the tasks manually instead of creating
115 # an 'artificial' data dependency that accomplishes the same goal the
116 # manual linking would result in.
117 task.FunctorTask(install_engine, provides='engine_installed'),
118 task.FunctorTask(install_doors, provides='doors_installed'),
119 task.FunctorTask(install_windows, provides='windows_installed'),
120 task.FunctorTask(install_wheels, provides='wheels_installed')),
121 task.FunctorTask(verify, requires=['frame',
122 'engine',
123 'doors',
124 'wheels',
125 'engine_installed',
126 'doors_installed',
127 'windows_installed',
128 'wheels_installed']))
129
130# This dictionary will be provided to the tasks as a specification for what
131# the tasks should produce, in this example this specification will influence
132# what those tasks do and what output they create. Different tasks depend on
133# different information from this specification, all of which will be provided
134# automatically by the engine to those tasks.
135spec = {
136 "frame": 'steel',
137 "engine": 'honda',
138 "doors": '2',
139 "wheels": '4',
140 # These are used to compare the result product, a car without the pieces
141 # installed is not a car after all.
142 "engine_installed": True,
143 "doors_installed": True,
144 "windows_installed": True,
145 "wheels_installed": True,
146}
147
148
149engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
150
151# This registers all (ANY) state transitions to trigger a call to the
152# flow_watch function for flow state transitions, and registers the
153# same all (ANY) state transitions for task state transitions.
154engine.notifier.register(ANY, flow_watch)
155engine.atom_notifier.register(ANY, task_watch)
156
157eu.print_wrapped("Building a car")
158engine.run()
159
160# Alter the specification and ensure that the reverting logic gets triggered
161# since the resultant car that will be built by the build_wheels function will
162# build a car with 4 doors only (not 5), this will cause the verification
163# task to mark the car that is produced as not matching the desired spec.
164spec['doors'] = 5
165
166engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
167engine.notifier.register(ANY, flow_watch)
168engine.atom_notifier.register(ANY, task_watch)
169
170eu.print_wrapped("Building a wrong car that doesn't match specification")
171try:
172 engine.run()
173except Exception as e:
174 eu.print_wrapped("Flow failed: %s" % e)
Iterating over the alphabet (using processes)¶
Note
Full source located at alphabet_soup.
1
2import fractions
3import functools
4import logging
5import os
6import string
7import sys
8import time
9
10logging.basicConfig(level=logging.ERROR)
11
12self_dir = os.path.abspath(os.path.dirname(__file__))
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16sys.path.insert(0, top_dir)
17sys.path.insert(0, self_dir)
18
19from taskflow import engines
20from taskflow import exceptions
21from taskflow.patterns import linear_flow
22from taskflow import task
23
24
25# In this example we show how a simple linear set of tasks can be executed
26# using local processes (and not threads or remote workers) with minimal (if
27# any) modification to those tasks to make them safe to run in this mode.
28#
29# This is useful since it allows further scaling up your workflows when thread
30# execution starts to become a bottleneck (which it can start to be due to the
31# GIL in python). It also offers a intermediary scalable runner that can be
32# used when the scale and/or setup of remote workers is not desirable.
33
34
35def progress_printer(task, event_type, details):
36 # This callback, attached to each task will be called in the local
37 # process (not the child processes)...
38 progress = details.pop('progress')
39 progress = int(progress * 100.0)
40 print("Task '%s' reached %d%% completion" % (task.name, progress))
41
42
43class AlphabetTask(task.Task):
44 # Second delay between each progress part.
45 _DELAY = 0.1
46
47 # This task will run in X main stages (each with a different progress
48 # report that will be delivered back to the running process...). The
49 # initial 0% and 100% are triggered automatically by the engine when
50 # a task is started and finished (so that's why those are not emitted
51 # here).
52 _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
53
54 def execute(self):
55 for p in self._PROGRESS_PARTS:
56 self.update_progress(p)
57 time.sleep(self._DELAY)
58
59
60print("Constructing...")
61soup = linear_flow.Flow("alphabet-soup")
62for letter in string.ascii_lowercase:
63 abc = AlphabetTask(letter)
64 abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
65 functools.partial(progress_printer, abc))
66 soup.add(abc)
67try:
68 print("Loading...")
69 e = engines.load(soup, engine='parallel', executor='processes')
70 print("Compiling...")
71 e.compile()
72 print("Preparing...")
73 e.prepare()
74 print("Running...")
75 e.run()
76 print("Done: %s" % e.statistics)
77except exceptions.NotImplementedError as e:
78 print(e)
Watching execution timing¶
Note
Full source located at timing_listener.
1
2import logging
3import os
4import random
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13sys.path.insert(0, top_dir)
14
15from taskflow import engines
16from taskflow.listeners import timing
17from taskflow.patterns import linear_flow as lf
18from taskflow import task
19
20# INTRO: in this example we will attach a listener to an engine
21# and have variable run time tasks run and show how the listener will print
22# out how long those tasks took (when they started and when they finished).
23#
24# This shows how timing metrics can be gathered (or attached onto an engine)
25# after a workflow has been constructed, making it easy to gather metrics
26# dynamically for situations where this kind of information is applicable (or
27# even adding this information on at a later point in the future when your
28# application starts to slow down).
29
30
31class VariableTask(task.Task):
32 def __init__(self, name):
33 super(VariableTask, self).__init__(name)
34 self._sleepy_time = random.random()
35
36 def execute(self):
37 time.sleep(self._sleepy_time)
38
39
40f = lf.Flow('root')
41f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
42e = engines.load(f)
43with timing.PrintingDurationListener(e):
44 e.run()
Distance calculator¶
Note
Full source located at distance_calculator
1
2import collections
3import math
4import os
5import sys
6
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11
12from taskflow import engines
13from taskflow.patterns import linear_flow
14from taskflow import task
15
16# INTRO: This shows how to use a tasks/atoms ability to take requirements from
17# its execute functions default parameters and shows how to provide those
18# via different methods when needed, to influence those parameters to in
19# this case calculate the distance between two points in 2D space.
20
21# A 2D point.
22Point = collections.namedtuple("Point", "x,y")
23
24
25def is_near(val, expected, tolerance=0.001):
26 # Floats don't really provide equality...
27 if val > (expected + tolerance):
28 return False
29 if val < (expected - tolerance):
30 return False
31 return True
32
33
34class DistanceTask(task.Task):
35 # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
36
37 default_provides = 'distance'
38
39 def execute(self, a=Point(0, 0), b=Point(0, 0)):
40 return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
41
42
43if __name__ == '__main__':
44 # For these we rely on the execute() methods points by default being
45 # at the origin (and we override it with store values when we want) at
46 # execution time (which then influences what is calculated).
47 any_distance = linear_flow.Flow("origin").add(DistanceTask())
48 results = engines.run(any_distance)
49 print(results)
50 print("%s is near-enough to %s: %s" % (results['distance'],
51 0.0,
52 is_near(results['distance'], 0.0)))
53
54 results = engines.run(any_distance, store={'a': Point(1, 1)})
55 print(results)
56 print("%s is near-enough to %s: %s" % (results['distance'],
57 1.4142,
58 is_near(results['distance'],
59 1.4142)))
60
61 results = engines.run(any_distance, store={'a': Point(10, 10)})
62 print(results)
63 print("%s is near-enough to %s: %s" % (results['distance'],
64 14.14199,
65 is_near(results['distance'],
66 14.14199)))
67
68 results = engines.run(any_distance,
69 store={'a': Point(5, 5), 'b': Point(10, 10)})
70 print(results)
71 print("%s is near-enough to %s: %s" % (results['distance'],
72 7.07106,
73 is_near(results['distance'],
74 7.07106)))
75
76 # For this we use the ability to override at task creation time the
77 # optional arguments so that we don't need to continue to send them
78 # in via the 'store' argument like in the above (and we fix the new
79 # starting point 'a' at (10, 10) instead of (0, 0)...
80
81 ten_distance = linear_flow.Flow("ten")
82 ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
83 results = engines.run(ten_distance, store={'b': Point(10, 10)})
84 print(results)
85 print("%s is near-enough to %s: %s" % (results['distance'],
86 0.0,
87 is_near(results['distance'], 0.0)))
88
89 results = engines.run(ten_distance)
90 print(results)
91 print("%s is near-enough to %s: %s" % (results['distance'],
92 14.14199,
93 is_near(results['distance'],
94 14.14199)))
Table multiplier (in parallel)¶
Note
Full source located at parallel_table_multiply
1
2import csv
3import logging
4import os
5import random
6import sys
7
8logging.basicConfig(level=logging.ERROR)
9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13sys.path.insert(0, top_dir)
14
15import futurist
16
17from taskflow import engines
18from taskflow.patterns import unordered_flow as uf
19from taskflow import task
20
21# INTRO: This example walks through a miniature workflow which does a parallel
22# table modification where each row in the table gets adjusted by a thread, or
23# green thread (if eventlet is available) in parallel and then the result
24# is reformed into a new table and some verifications are performed on it
25# to ensure everything went as expected.
26
27
28MULTIPLER = 10
29
30
31class RowMultiplier(task.Task):
32 """Performs a modification of an input row, creating a output row."""
33
34 def __init__(self, name, index, row, multiplier):
35 super(RowMultiplier, self).__init__(name=name)
36 self.index = index
37 self.multiplier = multiplier
38 self.row = row
39
40 def execute(self):
41 return [r * self.multiplier for r in self.row]
42
43
44def make_flow(table):
45 # This creation will allow for parallel computation (since the flow here
46 # is specifically unordered; and when things are unordered they have
47 # no dependencies and when things have no dependencies they can just be
48 # ran at the same time, limited in concurrency by the executor or max
49 # workers of that executor...)
50 f = uf.Flow("root")
51 for i, row in enumerate(table):
52 f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
53 # NOTE(harlowja): at this point nothing has ran, the above is just
54 # defining what should be done (but not actually doing it) and associating
55 # an ordering dependencies that should be enforced (the flow pattern used
56 # forces this), the engine in the later main() function will actually
57 # perform this work...
58 return f
59
60
61def main():
62 if len(sys.argv) == 2:
63 tbl = []
64 with open(sys.argv[1], 'rb') as fh:
65 reader = csv.reader(fh)
66 for row in reader:
67 tbl.append([float(r) if r else 0.0 for r in row])
68 else:
69 # Make some random table out of thin air...
70 tbl = []
71 cols = random.randint(1, 100)
72 rows = random.randint(1, 100)
73 for _i in range(0, rows):
74 row = []
75 for _j in range(0, cols):
76 row.append(random.random())
77 tbl.append(row)
78
79 # Generate the work to be done.
80 f = make_flow(tbl)
81
82 # Now run it (using the specified executor)...
83 try:
84 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
85 except RuntimeError:
86 # No eventlet currently active, use real threads instead.
87 executor = futurist.ThreadPoolExecutor(max_workers=5)
88 try:
89 e = engines.load(f, engine='parallel', executor=executor)
90 for st in e.run_iter():
91 print(st)
92 finally:
93 executor.shutdown()
94
95 # Find the old rows and put them into place...
96 #
97 # TODO(harlowja): probably easier just to sort instead of search...
98 computed_tbl = []
99 for i in range(0, len(tbl)):
100 for t in f:
101 if t.index == i:
102 computed_tbl.append(e.storage.get(t.name))
103
104 # Do some basic validation (which causes the return code of this process
105 # to be different if things were not as expected...)
106 if len(computed_tbl) != len(tbl):
107 return 1
108 else:
109 return 0
110
111
112if __name__ == "__main__":
113 sys.exit(main())
Linear equation solver (explicit dependencies)¶
Note
Full source located at calculate_linear.
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17
18# INTRO: In this example a linear flow is used to group four tasks to calculate
19# a value. A single added task is used twice, showing how this can be done
20# and the twice added task takes in different bound values. In the first case
21# it uses default parameters ('x' and 'y') and in the second case arguments
22# are bound with ('z', 'd') keys from the engines internal storage mechanism.
23#
24# A multiplier task uses a binding that another task also provides, but this
25# example explicitly shows that 'z' parameter is bound with 'a' key
26# This shows that if a task depends on a key named the same as a key provided
27# from another task the name can be remapped to take the desired key from a
28# different origin.
29
30
31# This task provides some values from as a result of execution, this can be
32# useful when you want to provide values from a static set to other tasks that
33# depend on those values existing before those tasks can run.
34#
35# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
36# that just provides those values on engine running by prepopulating the
37# storage backend before your tasks are ran (which accomplishes a similar goal
38# in a more uniform manner).
39class Provider(task.Task):
40
41 def __init__(self, name, *args, **kwargs):
42 super(Provider, self).__init__(name=name, **kwargs)
43 self._provide = args
44
45 def execute(self):
46 return self._provide
47
48
49# This task adds two input variables and returns the result.
50#
51# Note that since this task does not have a revert() function (since addition
52# is a stateless operation) there are no side-effects that this function needs
53# to undo if some later operation fails.
54class Adder(task.Task):
55 def execute(self, x, y):
56 return x + y
57
58
59# This task multiplies an input variable by a multiplier and returns the
60# result.
61#
62# Note that since this task does not have a revert() function (since
63# multiplication is a stateless operation) and there are no side-effects that
64# this function needs to undo if some later operation fails.
65class Multiplier(task.Task):
66 def __init__(self, name, multiplier, provides=None, rebind=None):
67 super(Multiplier, self).__init__(name=name, provides=provides,
68 rebind=rebind)
69 self._multiplier = multiplier
70
71 def execute(self, z):
72 return z * self._multiplier
73
74
75# Note here that the ordering is established so that the correct sequences
76# of operations occurs where the adding and multiplying is done according
77# to the expected and typical mathematical model. A graph flow could also be
78# used here to automatically infer & ensure the correct ordering.
79flow = lf.Flow('root').add(
80 # Provide the initial values for other tasks to depend on.
81 #
82 # x = 2, y = 3, d = 5
83 Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
84 # z = x+y = 5
85 Adder("add-1", provides='z'),
86 # a = z+d = 10
87 Adder("add-2", provides='a', rebind=['z', 'd']),
88 # Calculate 'r = a*3 = 30'
89 #
90 # Note here that the 'z' argument of the execute() function will not be
91 # bound to the 'z' variable provided from the above 'provider' object but
92 # instead the 'z' argument will be taken from the 'a' variable provided
93 # by the second add-2 listed above.
94 Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
95)
96
97# The result here will be all results (from all tasks) which is stored in an
98# in-memory storage location that backs this engine since it is not configured
99# with persistence storage.
100results = taskflow.engines.run(flow)
101print(results)
Linear equation solver (inferred dependencies)¶
Source:
graph_flow.py
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import graph_flow as gf
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18
19# In this example there are complex *inferred* dependencies between tasks that
20# are used to perform a simple set of linear equations.
21#
22# As you will see below the tasks just define what they require as input
23# and produce as output (named values). Then the user doesn't care about
24# ordering the tasks (in this case the tasks calculate pieces of the overall
25# equation).
26#
27# As you will notice a graph flow resolves dependencies automatically using the
28# tasks symbol requirements and provided symbol values and no orderin
29# dependency has to be manually created.
30#
31# Also notice that flows of any types can be nested into a graph flow; showing
32# that subflow dependencies (and associated ordering) will be inferred too.
33
34
35class Adder(task.Task):
36
37 def execute(self, x, y):
38 return x + y
39
40
41flow = gf.Flow('root').add(
42 lf.Flow('nested_linear').add(
43 # x2 = y3+y4 = 12
44 Adder("add2", provides='x2', rebind=['y3', 'y4']),
45 # x1 = y1+y2 = 4
46 Adder("add1", provides='x1', rebind=['y1', 'y2'])
47 ),
48 # x5 = x1+x3 = 20
49 Adder("add5", provides='x5', rebind=['x1', 'x3']),
50 # x3 = x1+x2 = 16
51 Adder("add3", provides='x3', rebind=['x1', 'x2']),
52 # x4 = x2+y5 = 21
53 Adder("add4", provides='x4', rebind=['x2', 'y5']),
54 # x6 = x5+x4 = 41
55 Adder("add6", provides='x6', rebind=['x5', 'x4']),
56 # x7 = x6+x6 = 82
57 Adder("add7", provides='x7', rebind=['x6', 'x6']))
58
59# Provide the initial variable inputs using a storage dictionary.
60store = {
61 "y1": 1,
62 "y2": 3,
63 "y3": 5,
64 "y4": 7,
65 "y5": 9,
66}
67
68# This is the expected values that should be created.
69unexpected = 0
70expected = [
71 ('x1', 4),
72 ('x2', 12),
73 ('x3', 16),
74 ('x4', 21),
75 ('x5', 20),
76 ('x6', 41),
77 ('x7', 82),
78]
79
80result = taskflow.engines.run(
81 flow, engine='serial', store=store)
82
83print("Single threaded engine result %s" % result)
84for (name, value) in expected:
85 actual = result.get(name)
86 if actual != value:
87 sys.stderr.write("%s != %s\n" % (actual, value))
88 unexpected += 1
89
90result = taskflow.engines.run(
91 flow, engine='parallel', store=store)
92
93print("Multi threaded engine result %s" % result)
94for (name, value) in expected:
95 actual = result.get(name)
96 if actual != value:
97 sys.stderr.write("%s != %s\n" % (actual, value))
98 unexpected += 1
99
100if unexpected:
101 sys.exit(1)
Linear equation solver (in parallel)¶
Note
Full source located at calculate_in_parallel
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow.patterns import unordered_flow as uf
16from taskflow import task
17
18# INTRO: These examples show how a linear flow and an unordered flow can be
19# used together to execute calculations in parallel and then use the
20# result for the next task/s. The adder task is used for all calculations
21# and argument bindings are used to set correct parameters for each task.
22
23
24# This task provides some values from as a result of execution, this can be
25# useful when you want to provide values from a static set to other tasks that
26# depend on those values existing before those tasks can run.
27#
28# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
29# that provides those values on engine running by prepopulating the storage
30# backend before your tasks are ran (which accomplishes a similar goal in a
31# more uniform manner).
32class Provider(task.Task):
33 def __init__(self, name, *args, **kwargs):
34 super(Provider, self).__init__(name=name, **kwargs)
35 self._provide = args
36
37 def execute(self):
38 return self._provide
39
40
41# This task adds two input variables and returns the result of that addition.
42#
43# Note that since this task does not have a revert() function (since addition
44# is a stateless operation) there are no side-effects that this function needs
45# to undo if some later operation fails.
46class Adder(task.Task):
47 def execute(self, x, y):
48 return x + y
49
50
51flow = lf.Flow('root').add(
52 # Provide the initial values for other tasks to depend on.
53 #
54 # x1 = 2, y1 = 3, x2 = 5, x3 = 8
55 Provider("provide-adder", 2, 3, 5, 8,
56 provides=('x1', 'y1', 'x2', 'y2')),
57 # Note here that we define the flow that contains the 2 adders to be an
58 # unordered flow since the order in which these execute does not matter,
59 # another way to solve this would be to use a graph_flow pattern, which
60 # also can run in parallel (since they have no ordering dependencies).
61 uf.Flow('adders').add(
62 # Calculate 'z1 = x1+y1 = 5'
63 #
64 # Rebind here means that the execute() function x argument will be
65 # satisfied from a previous output named 'x1', and the y argument
66 # of execute() will be populated from the previous output named 'y1'
67 #
68 # The output (result of adding) will be mapped into a variable named
69 # 'z1' which can then be refereed to and depended on by other tasks.
70 Adder(name="add", provides='z1', rebind=['x1', 'y1']),
71 # z2 = x2+y2 = 13
72 Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
73 ),
74 # r = z1+z2 = 18
75 Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
76
77
78# The result here will be all results (from all tasks) which is stored in an
79# in-memory storage location that backs this engine since it is not configured
80# with persistence storage.
81result = taskflow.engines.run(flow, engine='parallel')
82print(result)
Creating a volume (in parallel)¶
Note
Full source located at create_parallel_volume
1
2import contextlib
3import logging
4import os
5import random
6import sys
7import time
8
9logging.basicConfig(level=logging.ERROR)
10
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12 os.pardir,
13 os.pardir))
14sys.path.insert(0, top_dir)
15
16from oslo_utils import reflection
17
18from taskflow import engines
19from taskflow.listeners import printing
20from taskflow.patterns import unordered_flow as uf
21from taskflow import task
22
23# INTRO: These examples show how unordered_flow can be used to create a large
24# number of fake volumes in parallel (or serially, depending on a constant that
25# can be easily changed).
26
27
28@contextlib.contextmanager
29def show_time(name):
30 start = time.time()
31 yield
32 end = time.time()
33 print(" -- %s took %0.3f seconds" % (name, end - start))
34
35
36# This affects how many volumes to create and how much time to *simulate*
37# passing for that volume to be created.
38MAX_CREATE_TIME = 3
39VOLUME_COUNT = 5
40
41# This will be used to determine if all the volumes are created in parallel
42# or whether the volumes are created serially (in an undefined ordered since
43# a unordered flow is used). Note that there is a disconnection between the
44# ordering and the concept of parallelism (since unordered items can still be
45# ran in a serial ordering). A typical use-case for offering both is to allow
46# for debugging using a serial approach, while when running at a larger scale
47# one would likely want to use the parallel approach.
48#
49# If you switch this flag from serial to parallel you can see the overall
50# time difference that this causes.
51SERIAL = False
52if SERIAL:
53 engine = 'serial'
54else:
55 engine = 'parallel'
56
57
58class VolumeCreator(task.Task):
59 def __init__(self, volume_id):
60 # Note here that the volume name is composed of the name of the class
61 # along with the volume id that is being created, since a name of a
62 # task uniquely identifies that task in storage it is important that
63 # the name be relevant and identifiable if the task is recreated for
64 # subsequent resumption (if applicable).
65 #
66 # UUIDs are *not* used as they can not be tied back to a previous tasks
67 # state on resumption (since they are unique and will vary for each
68 # task that is created). A name based off the volume id that is to be
69 # created is more easily tied back to the original task so that the
70 # volume create can be resumed/revert, and is much easier to use for
71 # audit and tracking purposes.
72 base_name = reflection.get_callable_name(self)
73 super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
74 volume_id))
75 self._volume_id = volume_id
76
77 def execute(self):
78 print("Making volume %s" % (self._volume_id))
79 time.sleep(random.random() * MAX_CREATE_TIME)
80 print("Finished making volume %s" % (self._volume_id))
81
82
83# Assume there is no ordering dependency between volumes.
84flow = uf.Flow("volume-maker")
85for i in range(0, VOLUME_COUNT):
86 flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
87
88
89# Show how much time the overall engine loading and running takes.
90with show_time(name=flow.name.title()):
91 eng = engines.load(flow, engine=engine)
92 # This context manager automatically adds (and automatically removes) a
93 # helpful set of state transition notification printing helper utilities
94 # that show you exactly what transitions the engine is going through
95 # while running the various volume create tasks.
96 with printing.PrintingListener(eng):
97 eng.run()
Summation mapper(s) and reducer (in parallel)¶
Note
Full source located at simple_map_reduce
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8self_dir = os.path.abspath(os.path.dirname(__file__))
9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15# INTRO: These examples show a simplistic map/reduce implementation where
16# a set of mapper(s) will sum a series of input numbers (in parallel) and
17# return their individual summed result. A reducer will then use those
18# produced values and perform a final summation and this result will then be
19# printed (and verified to ensure the calculation was as expected).
20
21from taskflow import engines
22from taskflow.patterns import linear_flow
23from taskflow.patterns import unordered_flow
24from taskflow import task
25
26
27class SumMapper(task.Task):
28 def execute(self, inputs):
29 # Sums some set of provided inputs.
30 return sum(inputs)
31
32
33class TotalReducer(task.Task):
34 def execute(self, *args, **kwargs):
35 # Reduces all mapped summed outputs into a single value.
36 total = 0
37 for (k, v) in kwargs.items():
38 # If any other kwargs was passed in, we don't want to use those
39 # in the calculation of the total...
40 if k.startswith('reduction_'):
41 total += v
42 return total
43
44
45def chunk_iter(chunk_size, upperbound):
46 """Yields back chunk size pieces from zero to upperbound - 1."""
47 chunk = []
48 for i in range(0, upperbound):
49 chunk.append(i)
50 if len(chunk) == chunk_size:
51 yield chunk
52 chunk = []
53
54
55# Upper bound of numbers to sum for example purposes...
56UPPER_BOUND = 10000
57
58# How many mappers we want to have.
59SPLIT = 10
60
61# How big of a chunk we want to give each mapper.
62CHUNK_SIZE = UPPER_BOUND // SPLIT
63
64# This will be the workflow we will compose and run.
65w = linear_flow.Flow("root")
66
67# The mappers will run in parallel.
68store = {}
69provided = []
70mappers = unordered_flow.Flow('map')
71for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
72 mapper_name = 'mapper_%s' % i
73 # Give that mapper some information to compute.
74 store[mapper_name] = chunk
75 # The reducer uses all of the outputs of the mappers, so it needs
76 # to be recorded that it needs access to them (under a specific name).
77 provided.append("reduction_%s" % i)
78 mappers.add(SumMapper(name=mapper_name,
79 rebind={'inputs': mapper_name},
80 provides=provided[-1]))
81w.add(mappers)
82
83# The reducer will run last (after all the mappers).
84w.add(TotalReducer('reducer', requires=provided))
85
86# Now go!
87e = engines.load(w, engine='parallel', store=store, max_workers=4)
88print("Running a parallel engine with options: %s" % e.options)
89e.run()
90
91# Now get the result the reducer created.
92total = e.storage.get('reducer')
93print("Calculated result = %s" % total)
94
95# Calculate it manually to verify that it worked...
96calc_total = sum(range(0, UPPER_BOUND))
97if calc_total != total:
98 sys.exit(1)
Storing & emitting a bill¶
Note
Full source located at fake_billing
1
2import json
3import logging
4import os
5import sys
6import time
7
8logging.basicConfig(level=logging.ERROR)
9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13sys.path.insert(0, top_dir)
14
15from oslo_utils import uuidutils
16
17from taskflow import engines
18from taskflow.listeners import printing
19from taskflow.patterns import graph_flow as gf
20from taskflow.patterns import linear_flow as lf
21from taskflow import task
22from taskflow.utils import misc
23
24# INTRO: This example walks through a miniature workflow which simulates
25# the reception of an API request, creation of a database entry, driver
26# activation (which invokes a 'fake' webservice) and final completion.
27#
28# This example also shows how a function/object (in this class the url sending)
29# that occurs during driver activation can update the progress of a task
30# without being aware of the internals of how to do this by associating a
31# callback that the url sending can update as the sending progresses from 0.0%
32# complete to 100% complete.
33
34
35class DB(object):
36 def query(self, sql):
37 print("Querying with: %s" % (sql))
38
39
40class UrlCaller(object):
41 def __init__(self):
42 self._send_time = 0.5
43 self._chunks = 25
44
45 def send(self, url, data, status_cb=None):
46 sleep_time = float(self._send_time) / self._chunks
47 for i in range(0, len(data)):
48 time.sleep(sleep_time)
49 # As we send the data, each chunk we 'fake' send will progress
50 # the sending progress that much further to 100%.
51 if status_cb:
52 status_cb(float(i) / len(data))
53
54
55# Since engines save the output of tasks to a optional persistent storage
56# backend resources have to be dealt with in a slightly different manner since
57# resources are transient and can *not* be persisted (or serialized). For tasks
58# that require access to a set of resources it is a common pattern to provide
59# a object (in this case this object) on construction of those tasks via the
60# task constructor.
61class ResourceFetcher(object):
62 def __init__(self):
63 self._db_handle = None
64 self._url_handle = None
65
66 @property
67 def db_handle(self):
68 if self._db_handle is None:
69 self._db_handle = DB()
70 return self._db_handle
71
72 @property
73 def url_handle(self):
74 if self._url_handle is None:
75 self._url_handle = UrlCaller()
76 return self._url_handle
77
78
79class ExtractInputRequest(task.Task):
80 def __init__(self, resources):
81 super(ExtractInputRequest, self).__init__(provides="parsed_request")
82 self._resources = resources
83
84 def execute(self, request):
85 return {
86 'user': request.user,
87 'user_id': misc.as_int(request.id),
88 'request_id': uuidutils.generate_uuid(),
89 }
90
91
92class MakeDBEntry(task.Task):
93 def __init__(self, resources):
94 super(MakeDBEntry, self).__init__()
95 self._resources = resources
96
97 def execute(self, parsed_request):
98 db_handle = self._resources.db_handle
99 db_handle.query("INSERT %s INTO mydb" % (parsed_request))
100
101 def revert(self, result, parsed_request):
102 db_handle = self._resources.db_handle
103 db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
104
105
106class ActivateDriver(task.Task):
107 def __init__(self, resources):
108 super(ActivateDriver, self).__init__(provides='sent_to')
109 self._resources = resources
110 self._url = "http://blahblah.com"
111
112 def execute(self, parsed_request):
113 print("Sending billing data to %s" % (self._url))
114 url_sender = self._resources.url_handle
115 # Note that here we attach our update_progress function (which is a
116 # function that the engine also 'binds' to) to the progress function
117 # that the url sending helper class uses. This allows the task progress
118 # to be tied to the url sending progress, which is very useful for
119 # downstream systems to be aware of what a task is doing at any time.
120 url_sender.send(self._url, json.dumps(parsed_request),
121 status_cb=self.update_progress)
122 return self._url
123
124 def update_progress(self, progress, **kwargs):
125 # Override the parent method to also print out the status.
126 super(ActivateDriver, self).update_progress(progress, **kwargs)
127 print("%s is %0.2f%% done" % (self.name, progress * 100))
128
129
130class DeclareSuccess(task.Task):
131 def execute(self, sent_to):
132 print("Done!")
133 print("All data processed and sent to %s" % (sent_to))
134
135
136class DummyUser(object):
137 def __init__(self, user, id_):
138 self.user = user
139 self.id = id_
140
141
142# Resources (db handles and similar) of course can *not* be persisted so we
143# need to make sure that we pass this resource fetcher to the tasks constructor
144# so that the tasks have access to any needed resources (the resources are
145# lazily loaded so that they are only created when they are used).
146resources = ResourceFetcher()
147flow = lf.Flow("initialize-me")
148
149# 1. First we extract the api request into a usable format.
150# 2. Then we go ahead and make a database entry for our request.
151flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
152
153# 3. Then we activate our payment method and finally declare success.
154sub_flow = gf.Flow("after-initialize")
155sub_flow.add(ActivateDriver(resources), DeclareSuccess())
156flow.add(sub_flow)
157
158# Initially populate the storage with the following request object,
159# prepopulating this allows the tasks that dependent on the 'request' variable
160# to start processing (in this case this is the ExtractInputRequest task).
161store = {
162 'request': DummyUser(user="bob", id_="1.35"),
163}
164eng = engines.load(flow, engine='serial', store=store)
165
166# This context manager automatically adds (and automatically removes) a
167# helpful set of state transition notification printing helper utilities
168# that show you exactly what transitions the engine is going through
169# while running the various billing related tasks.
170with printing.PrintingListener(eng):
171 eng.run()
Suspending a workflow & resuming¶
Note
Full source located at resume_from_backend
1
2import contextlib
3import logging
4import os
5import sys
6
7logging.basicConfig(level=logging.ERROR)
8
9self_dir = os.path.abspath(os.path.dirname(__file__))
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11 os.pardir,
12 os.pardir))
13sys.path.insert(0, top_dir)
14sys.path.insert(0, self_dir)
15
16from oslo_utils import uuidutils
17
18import taskflow.engines
19from taskflow.patterns import linear_flow as lf
20from taskflow.persistence import models
21from taskflow import task
22
23import example_utils as eu # noqa
24
25# INTRO: In this example linear_flow is used to group three tasks, one which
26# will suspend the future work the engine may do. This suspend engine is then
27# discarded and the workflow is reloaded from the persisted data and then the
28# workflow is resumed from where it was suspended. This allows you to see how
29# to start an engine, have a task stop the engine from doing future work (if
30# a multi-threaded engine is being used, then the currently active work is not
31# preempted) and then resume the work later.
32#
33# Usage:
34#
35# With a filesystem directory as backend
36#
37# python taskflow/examples/resume_from_backend.py
38#
39# With ZooKeeper as backend
40#
41# python taskflow/examples/resume_from_backend.py \
42# zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
43
44
45# UTILITY FUNCTIONS #########################################
46
47
48def print_task_states(flowdetail, msg):
49 eu.print_wrapped(msg)
50 print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state))
51 # Sort by these so that our test validation doesn't get confused by the
52 # order in which the items in the flow detail can be in.
53 items = sorted((td.name, td.version, td.state, td.results)
54 for td in flowdetail)
55 for item in items:
56 print(" %s==%s: %s, result=%s" % item)
57
58
59def find_flow_detail(backend, lb_id, fd_id):
60 conn = backend.get_connection()
61 lb = conn.get_logbook(lb_id)
62 return lb.find(fd_id)
63
64
65# CREATE FLOW ###############################################
66
67
68class InterruptTask(task.Task):
69 def execute(self):
70 # DO NOT TRY THIS AT HOME
71 engine.suspend()
72
73
74class TestTask(task.Task):
75 def execute(self):
76 print('executing %s' % self)
77 return 'ok'
78
79
80def flow_factory():
81 return lf.Flow('resume from backend example').add(
82 TestTask(name='first'),
83 InterruptTask(name='boom'),
84 TestTask(name='second'))
85
86
87# INITIALIZE PERSISTENCE ####################################
88
89with eu.get_backend() as backend:
90
91 # Create a place where the persistence information will be stored.
92 book = models.LogBook("example")
93 flow_detail = models.FlowDetail("resume from backend example",
94 uuid=uuidutils.generate_uuid())
95 book.add(flow_detail)
96 with contextlib.closing(backend.get_connection()) as conn:
97 conn.save_logbook(book)
98
99 # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
100
101 flow = flow_factory()
102 engine = taskflow.engines.load(flow, flow_detail=flow_detail,
103 book=book, backend=backend)
104
105 print_task_states(flow_detail, "At the beginning, there is no state")
106 eu.print_wrapped("Running")
107 engine.run()
108 print_task_states(flow_detail, "After running")
109
110 # RE-CREATE, RESUME, RUN ####################################
111
112 eu.print_wrapped("Resuming and running again")
113
114 # NOTE(harlowja): reload the flow detail from backend, this will allow us
115 # to resume the flow from its suspended state, but first we need to search
116 # for the right flow details in the correct logbook where things are
117 # stored.
118 #
119 # We could avoid re-loading the engine and just do engine.run() again, but
120 # this example shows how another process may unsuspend a given flow and
121 # start it again for situations where this is useful to-do (say the process
122 # running the above flow crashes).
123 flow2 = flow_factory()
124 flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
125 engine2 = taskflow.engines.load(flow2,
126 flow_detail=flow_detail_2,
127 backend=backend, book=book)
128 engine2.run()
129 print_task_states(flow_detail_2, "At the end")
Creating a virtual machine (resumable)¶
Note
Full source located at resume_vm_boot
1
2import contextlib
3import hashlib
4import logging
5import os
6import random
7import sys
8import time
9
10logging.basicConfig(level=logging.ERROR)
11
12self_dir = os.path.abspath(os.path.dirname(__file__))
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16sys.path.insert(0, top_dir)
17sys.path.insert(0, self_dir)
18
19import futurist
20from oslo_utils import uuidutils
21
22from taskflow import engines
23from taskflow import exceptions as exc
24from taskflow.patterns import graph_flow as gf
25from taskflow.patterns import linear_flow as lf
26from taskflow.persistence import models
27from taskflow import task
28
29import example_utils as eu # noqa
30
31# INTRO: These examples show how a hierarchy of flows can be used to create a
32# vm in a reliable & resumable manner using taskflow + a miniature version of
33# what nova does while booting a vm.
34
35
36@contextlib.contextmanager
37def slow_down(how_long=0.5):
38 try:
39 yield how_long
40 finally:
41 if len(sys.argv) > 1:
42 # Only both to do this if user input provided.
43 print("** Ctrl-c me please!!! **")
44 time.sleep(how_long)
45
46
47class PrintText(task.Task):
48 """Just inserts some text print outs in a workflow."""
49 def __init__(self, print_what, no_slow=False):
50 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
51 super(PrintText, self).__init__(name="Print: %s" % (content_hash))
52 self._text = print_what
53 self._no_slow = no_slow
54
55 def execute(self):
56 if self._no_slow:
57 eu.print_wrapped(self._text)
58 else:
59 with slow_down():
60 eu.print_wrapped(self._text)
61
62
63class DefineVMSpec(task.Task):
64 """Defines a vm specification to be."""
65 def __init__(self, name):
66 super(DefineVMSpec, self).__init__(provides='vm_spec', name=name)
67
68 def execute(self):
69 return {
70 'type': 'kvm',
71 'disks': 2,
72 'vcpu': 1,
73 'ips': 1,
74 'volumes': 3,
75 }
76
77
78class LocateImages(task.Task):
79 """Locates where the vm images are."""
80 def __init__(self, name):
81 super(LocateImages, self).__init__(provides='image_locations',
82 name=name)
83
84 def execute(self, vm_spec):
85 image_locations = {}
86 for i in range(0, vm_spec['disks']):
87 url = "http://www.yahoo.com/images/%s" % (i)
88 image_locations[url] = "/tmp/%s.img" % (i)
89 return image_locations
90
91
92class DownloadImages(task.Task):
93 """Downloads all the vm images."""
94 def __init__(self, name):
95 super(DownloadImages, self).__init__(provides='download_paths',
96 name=name)
97
98 def execute(self, image_locations):
99 for src, loc in image_locations.items():
100 with slow_down(1):
101 print("Downloading from %s => %s" % (src, loc))
102 return sorted(image_locations.values())
103
104
105class CreateNetworkTpl(task.Task):
106 """Generates the network settings file to be placed in the images."""
107 SYSCONFIG_CONTENTS = """DEVICE=eth%s
108BOOTPROTO=static
109IPADDR=%s
110ONBOOT=yes"""
111
112 def __init__(self, name):
113 super(CreateNetworkTpl, self).__init__(provides='network_settings',
114 name=name)
115
116 def execute(self, ips):
117 settings = []
118 for i, ip in enumerate(ips):
119 settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
120 return settings
121
122
123class AllocateIP(task.Task):
124 """Allocates the ips for the given vm."""
125 def __init__(self, name):
126 super(AllocateIP, self).__init__(provides='ips', name=name)
127
128 def execute(self, vm_spec):
129 ips = []
130 for _i in range(0, vm_spec.get('ips', 0)):
131 ips.append("192.168.0.%s" % (random.randint(1, 254)))
132 return ips
133
134
135class WriteNetworkSettings(task.Task):
136 """Writes all the network settings into the downloaded images."""
137 def execute(self, download_paths, network_settings):
138 for j, path in enumerate(download_paths):
139 with slow_down(1):
140 print("Mounting %s to /tmp/%s" % (path, j))
141 for i, setting in enumerate(network_settings):
142 filename = ("/tmp/etc/sysconfig/network-scripts/"
143 "ifcfg-eth%s" % (i))
144 with slow_down(1):
145 print("Writing to %s" % (filename))
146 print(setting)
147
148
149class BootVM(task.Task):
150 """Fires off the vm boot operation."""
151 def execute(self, vm_spec):
152 print("Starting vm!")
153 with slow_down(1):
154 print("Created: %s" % (vm_spec))
155
156
157class AllocateVolumes(task.Task):
158 """Allocates the volumes for the vm."""
159 def execute(self, vm_spec):
160 volumes = []
161 for i in range(0, vm_spec['volumes']):
162 with slow_down(1):
163 volumes.append("/dev/vda%s" % (i + 1))
164 print("Allocated volume %s" % volumes[-1])
165 return volumes
166
167
168class FormatVolumes(task.Task):
169 """Formats the volumes for the vm."""
170 def execute(self, volumes):
171 for v in volumes:
172 print("Formatting volume %s" % v)
173 with slow_down(1):
174 pass
175 print("Formatted volume %s" % v)
176
177
178def create_flow():
179 # Setup the set of things to do (mini-nova).
180 flow = lf.Flow("root").add(
181 PrintText("Starting vm creation.", no_slow=True),
182 lf.Flow('vm-maker').add(
183 # First create a specification for the final vm to-be.
184 DefineVMSpec("define_spec"),
185 # This does all the image stuff.
186 gf.Flow("img-maker").add(
187 LocateImages("locate_images"),
188 DownloadImages("download_images"),
189 ),
190 # This does all the network stuff.
191 gf.Flow("net-maker").add(
192 AllocateIP("get_my_ips"),
193 CreateNetworkTpl("fetch_net_settings"),
194 WriteNetworkSettings("write_net_settings"),
195 ),
196 # This does all the volume stuff.
197 gf.Flow("volume-maker").add(
198 AllocateVolumes("allocate_my_volumes", provides='volumes'),
199 FormatVolumes("volume_formatter"),
200 ),
201 # Finally boot it all.
202 BootVM("boot-it"),
203 ),
204 # Ya it worked!
205 PrintText("Finished vm create.", no_slow=True),
206 PrintText("Instance is running!", no_slow=True))
207 return flow
208
209eu.print_wrapped("Initializing")
210
211# Setup the persistence & resumption layer.
212with eu.get_backend() as backend:
213
214 # Try to find a previously passed in tracking id...
215 try:
216 book_id, flow_id = sys.argv[2].split("+", 1)
217 if not uuidutils.is_uuid_like(book_id):
218 book_id = None
219 if not uuidutils.is_uuid_like(flow_id):
220 flow_id = None
221 except (IndexError, ValueError):
222 book_id = None
223 flow_id = None
224
225 # Set up how we want our engine to run, serial, parallel...
226 try:
227 executor = futurist.GreenThreadPoolExecutor(max_workers=5)
228 except RuntimeError:
229 # No eventlet installed, just let the default be used instead.
230 executor = None
231
232 # Create/fetch a logbook that will track the workflows work.
233 book = None
234 flow_detail = None
235 if all([book_id, flow_id]):
236 # Try to find in a prior logbook and flow detail...
237 with contextlib.closing(backend.get_connection()) as conn:
238 try:
239 book = conn.get_logbook(book_id)
240 flow_detail = book.find(flow_id)
241 except exc.NotFound:
242 pass
243 if book is None and flow_detail is None:
244 book = models.LogBook("vm-boot")
245 with contextlib.closing(backend.get_connection()) as conn:
246 conn.save_logbook(book)
247 engine = engines.load_from_factory(create_flow,
248 backend=backend, book=book,
249 engine='parallel',
250 executor=executor)
251 print("!! Your tracking id is: '%s+%s'" % (book.uuid,
252 engine.storage.flow_uuid))
253 print("!! Please submit this on later runs for tracking purposes")
254 else:
255 # Attempt to load from a previously partially completed flow.
256 engine = engines.load_from_detail(flow_detail, backend=backend,
257 engine='parallel', executor=executor)
258
259 # Make me my vm please!
260 eu.print_wrapped('Running')
261 engine.run()
262
263# How to use.
264#
265# 1. $ python me.py "sqlite:////tmp/nova.db"
266# 2. ctrl-c before this finishes
267# 3. Find the tracking id (search for 'Your tracking id is')
268# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
269# 5. Watch it pick up where it left off.
270# 6. Profit!
Creating a volume (resumable)¶
Note
Full source located at resume_volume_create
1
2import contextlib
3import hashlib
4import logging
5import os
6import random
7import sys
8import time
9
10logging.basicConfig(level=logging.ERROR)
11
12self_dir = os.path.abspath(os.path.dirname(__file__))
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16sys.path.insert(0, top_dir)
17sys.path.insert(0, self_dir)
18
19from oslo_utils import uuidutils
20
21from taskflow import engines
22from taskflow.patterns import graph_flow as gf
23from taskflow.patterns import linear_flow as lf
24from taskflow.persistence import models
25from taskflow import task
26
27import example_utils # noqa
28
29# INTRO: These examples show how a hierarchy of flows can be used to create a
30# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
31# version of what cinder does while creating a volume (very miniature).
32
33
34@contextlib.contextmanager
35def slow_down(how_long=0.5):
36 try:
37 yield how_long
38 finally:
39 print("** Ctrl-c me please!!! **")
40 time.sleep(how_long)
41
42
43def find_flow_detail(backend, book_id, flow_id):
44 # NOTE(harlowja): this is used to attempt to find a given logbook with
45 # a given id and a given flow details inside that logbook, we need this
46 # reference so that we can resume the correct flow (as a logbook tracks
47 # flows and a flow detail tracks a individual flow).
48 #
49 # Without a reference to the logbook and the flow details in that logbook
50 # we will not know exactly what we should resume and that would mean we
51 # can't resume what we don't know.
52 with contextlib.closing(backend.get_connection()) as conn:
53 lb = conn.get_logbook(book_id)
54 return lb.find(flow_id)
55
56
57class PrintText(task.Task):
58 def __init__(self, print_what, no_slow=False):
59 content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
60 super(PrintText, self).__init__(name="Print: %s" % (content_hash))
61 self._text = print_what
62 self._no_slow = no_slow
63
64 def execute(self):
65 if self._no_slow:
66 print("-" * (len(self._text)))
67 print(self._text)
68 print("-" * (len(self._text)))
69 else:
70 with slow_down():
71 print("-" * (len(self._text)))
72 print(self._text)
73 print("-" * (len(self._text)))
74
75
76class CreateSpecForVolumes(task.Task):
77 def execute(self):
78 volumes = []
79 for i in range(0, random.randint(1, 10)):
80 volumes.append({
81 'type': 'disk',
82 'location': "/dev/vda%s" % (i + 1),
83 })
84 return volumes
85
86
87class PrepareVolumes(task.Task):
88 def execute(self, volume_specs):
89 for v in volume_specs:
90 with slow_down():
91 print("Dusting off your hard drive %s" % (v))
92 with slow_down():
93 print("Taking a well deserved break.")
94 print("Your drive %s has been certified." % (v))
95
96
97# Setup the set of things to do (mini-cinder).
98flow = lf.Flow("root").add(
99 PrintText("Starting volume create", no_slow=True),
100 gf.Flow('maker').add(
101 CreateSpecForVolumes("volume_specs", provides='volume_specs'),
102 PrintText("I need a nap, it took me a while to build those specs."),
103 PrepareVolumes(),
104 ),
105 PrintText("Finished volume create", no_slow=True))
106
107# Setup the persistence & resumption layer.
108with example_utils.get_backend() as backend:
109 try:
110 book_id, flow_id = sys.argv[2].split("+", 1)
111 except (IndexError, ValueError):
112 book_id = None
113 flow_id = None
114
115 if not all([book_id, flow_id]):
116 # If no 'tracking id' (think a fedex or ups tracking id) is provided
117 # then we create one by creating a logbook (where flow details are
118 # stored) and creating a flow detail (where flow and task state is
119 # stored). The combination of these 2 objects unique ids (uuids) allows
120 # the users of taskflow to reassociate the workflows that were
121 # potentially running (and which may have partially completed) back
122 # with taskflow so that those workflows can be resumed (or reverted)
123 # after a process/thread/engine has failed in someway.
124 book = models.LogBook('resume-volume-create')
125 flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
126 book.add(flow_detail)
127 with contextlib.closing(backend.get_connection()) as conn:
128 conn.save_logbook(book)
129 print("!! Your tracking id is: '%s+%s'" % (book.uuid,
130 flow_detail.uuid))
131 print("!! Please submit this on later runs for tracking purposes")
132 else:
133 flow_detail = find_flow_detail(backend, book_id, flow_id)
134
135 # Load and run.
136 engine = engines.load(flow,
137 flow_detail=flow_detail,
138 backend=backend, engine='serial')
139 engine.run()
140
141# How to use.
142#
143# 1. $ python me.py "sqlite:////tmp/cinder.db"
144# 2. ctrl-c before this finishes
145# 3. Find the tracking id (search for 'Your tracking id is')
146# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
147# 5. Profit!
Running engines via iteration¶
Note
Full source located at run_by_iter
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8self_dir = os.path.abspath(os.path.dirname(__file__))
9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10 os.pardir,
11 os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15
16from taskflow import engines
17from taskflow.patterns import linear_flow as lf
18from taskflow import task
19
20
21# INTRO: This example shows how to run a set of engines at the same time, each
22# running in different engines using a single thread of control to iterate over
23# each engine (which causes that engine to advanced to its next state during
24# each iteration).
25
26
27class EchoTask(task.Task):
28 def execute(self, value):
29 print(value)
30 return chr(ord(value) + 1)
31
32
33def make_alphabet_flow(i):
34 f = lf.Flow("alphabet_%s" % (i))
35 start_value = 'A'
36 end_value = 'Z'
37 curr_value = start_value
38 while ord(curr_value) <= ord(end_value):
39 next_value = chr(ord(curr_value) + 1)
40 if curr_value != end_value:
41 f.add(EchoTask(name="echoer_%s" % curr_value,
42 rebind={'value': curr_value},
43 provides=next_value))
44 else:
45 f.add(EchoTask(name="echoer_%s" % curr_value,
46 rebind={'value': curr_value}))
47 curr_value = next_value
48 return f
49
50
51# Adjust this number to change how many engines/flows run at once.
52flow_count = 1
53flows = []
54for i in range(0, flow_count):
55 f = make_alphabet_flow(i + 1)
56 flows.append(make_alphabet_flow(i + 1))
57engine_iters = []
58for f in flows:
59 e = engines.load(f)
60 e.compile()
61 e.storage.inject({'A': 'A'})
62 e.prepare()
63 engine_iters.append(e.run_iter())
64while engine_iters:
65 for it in list(engine_iters):
66 try:
67 print(next(it))
68 except StopIteration:
69 engine_iters.remove(it)
Controlling retries using a retry controller¶
Note
Full source located at retry_flow
1
2import logging
3import os
4import sys
5
6logging.basicConfig(level=logging.ERROR)
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import retry
16from taskflow import task
17
18# INTRO: In this example we create a retry controller that receives a phone
19# directory and tries different phone numbers. The next task tries to call Jim
20# using the given number. If it is not a Jim's number, the task raises an
21# exception and retry controller takes the next number from the phone
22# directory and retries the call.
23#
24# This example shows a basic usage of retry controllers in a flow.
25# Retry controllers allows to revert and retry a failed subflow with new
26# parameters.
27
28
29class CallJim(task.Task):
30 def execute(self, jim_number):
31 print("Calling jim %s." % jim_number)
32 if jim_number != 555:
33 raise Exception("Wrong number!")
34 else:
35 print("Hello Jim!")
36
37 def revert(self, jim_number, **kwargs):
38 print("Wrong number, apologizing.")
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('retrying-linear',
43 retry=retry.ParameterizedForEach(
44 rebind=['phone_directory'],
45 provides='jim_number')).add(CallJim())
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})
Distributed execution (simple)¶
Note
Full source located at wbe_simple_linear
1
2import json
3import logging
4import os
5import sys
6import tempfile
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.engines.worker_based import worker
15from taskflow.patterns import linear_flow as lf
16from taskflow.tests import utils
17from taskflow.utils import threading_utils
18
19import example_utils # noqa
20
21# INTRO: This example walks through a miniature workflow which shows how to
22# start up a number of workers (these workers will process task execution and
23# reversion requests using any provided input data) and then use an engine
24# that creates a set of *capable* tasks and flows (the engine can not create
25# tasks that the workers are not able to run, this will end in failure) that
26# those workers will run and then executes that workflow seamlessly using the
27# workers to perform the actual execution.
28#
29# NOTE(harlowja): this example simulates the expected larger number of workers
30# by using a set of threads (which in this example simulate the remote workers
31# that would typically be running on other external machines).
32
33# A filesystem can also be used as the queue transport (useful as simple
34# transport type that does not involve setting up a larger mq system). If this
35# is false then the memory transport is used instead, both work in standalone
36# setups.
37USE_FILESYSTEM = False
38BASE_SHARED_CONF = {
39 'exchange': 'taskflow',
40}
41
42# Until https://github.com/celery/kombu/issues/398 is resolved it is not
43# recommended to run many worker threads in this example due to the types
44# of errors mentioned in that issue.
45MEMORY_WORKERS = 2
46FILE_WORKERS = 1
47WORKER_CONF = {
48 # These are the tasks the worker can execute, they *must* be importable,
49 # typically this list is used to restrict what workers may execute to
50 # a smaller set of *allowed* tasks that are known to be safe (one would
51 # not want to allow all python code to be executed).
52 'tasks': [
53 'taskflow.tests.utils:TaskOneArgOneReturn',
54 'taskflow.tests.utils:TaskMultiArgOneReturn'
55 ],
56}
57
58
59def run(engine_options):
60 flow = lf.Flow('simple-linear').add(
61 utils.TaskOneArgOneReturn(provides='result1'),
62 utils.TaskMultiArgOneReturn(provides='result2')
63 )
64 eng = engines.load(flow,
65 store=dict(x=111, y=222, z=333),
66 engine='worker-based', **engine_options)
67 eng.run()
68 return eng.storage.fetch_all()
69
70
71if __name__ == "__main__":
72 logging.basicConfig(level=logging.ERROR)
73
74 # Setup our transport configuration and merge it into the worker and
75 # engine configuration so that both of those use it correctly.
76 shared_conf = dict(BASE_SHARED_CONF)
77
78 tmp_path = None
79 if USE_FILESYSTEM:
80 worker_count = FILE_WORKERS
81 tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
82 shared_conf.update({
83 'transport': 'filesystem',
84 'transport_options': {
85 'data_folder_in': tmp_path,
86 'data_folder_out': tmp_path,
87 'polling_interval': 0.1,
88 },
89 })
90 else:
91 worker_count = MEMORY_WORKERS
92 shared_conf.update({
93 'transport': 'memory',
94 'transport_options': {
95 'polling_interval': 0.1,
96 },
97 })
98 worker_conf = dict(WORKER_CONF)
99 worker_conf.update(shared_conf)
100 engine_options = dict(shared_conf)
101 workers = []
102 worker_topics = []
103
104 try:
105 # Create a set of workers to simulate actual remote workers.
106 print('Running %s workers.' % (worker_count))
107 for i in range(0, worker_count):
108 worker_conf['topic'] = 'worker-%s' % (i + 1)
109 worker_topics.append(worker_conf['topic'])
110 w = worker.Worker(**worker_conf)
111 runner = threading_utils.daemon_thread(w.run)
112 runner.start()
113 w.wait()
114 workers.append((runner, w.stop))
115
116 # Now use those workers to do something.
117 print('Executing some work.')
118 engine_options['topics'] = worker_topics
119 result = run(engine_options)
120 print('Execution finished.')
121 # This is done so that the test examples can work correctly
122 # even when the keys change order (which will happen in various
123 # python versions).
124 print("Result = %s" % json.dumps(result, sort_keys=True))
125 finally:
126 # And cleanup.
127 print('Stopping workers.')
128 while workers:
129 r, stopper = workers.pop()
130 stopper()
131 r.join()
132 if tmp_path:
133 example_utils.rm_path(tmp_path)
Distributed notification (simple)¶
Note
Full source located at wbe_event_sender
1
2import logging
3import os
4import string
5import sys
6import time
7
8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
9 os.pardir,
10 os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.engines.worker_based import worker
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17from taskflow.types import notifier
18from taskflow.utils import threading_utils
19
20ANY = notifier.Notifier.ANY
21
22# INTRO: These examples show how to use a remote worker's event notification
23# attribute to proxy back task event notifications to the controlling process.
24#
25# In this case a simple set of events is triggered by a worker running a
26# task (simulated to be remote by using a kombu memory transport and threads).
27# Those events that the 'remote worker' produces will then be proxied back to
28# the task that the engine is running 'remotely', and then they will be emitted
29# back to the original callbacks that exist in the originating engine
30# process/thread. This creates a one-way *notification* channel that can
31# transparently be used in-process, outside-of-process using remote workers and
32# so-on that allows tasks to signal to its controlling process some sort of
33# action that has occurred that the task may need to tell others about (for
34# example to trigger some type of response when the task reaches 50% done...).
35
36
37def event_receiver(event_type, details):
38 """This is the callback that (in this example) doesn't do much..."""
39 print("Recieved event '%s'" % event_type)
40 print("Details = %s" % details)
41
42
43class EventReporter(task.Task):
44 """This is the task that will be running 'remotely' (not really remote)."""
45
46 EVENTS = tuple(string.ascii_uppercase)
47 EVENT_DELAY = 0.1
48
49 def execute(self):
50 for i, e in enumerate(self.EVENTS):
51 details = {
52 'leftover': self.EVENTS[i:],
53 }
54 self.notifier.notify(e, details)
55 time.sleep(self.EVENT_DELAY)
56
57
58BASE_SHARED_CONF = {
59 'exchange': 'taskflow',
60 'transport': 'memory',
61 'transport_options': {
62 'polling_interval': 0.1,
63 },
64}
65
66# Until https://github.com/celery/kombu/issues/398 is resolved it is not
67# recommended to run many worker threads in this example due to the types
68# of errors mentioned in that issue.
69MEMORY_WORKERS = 1
70WORKER_CONF = {
71 'tasks': [
72 # Used to locate which tasks we can run (we don't want to allow
73 # arbitrary code/tasks to be ran by any worker since that would
74 # open up a variety of vulnerabilities).
75 '%s:EventReporter' % (__name__),
76 ],
77}
78
79
80def run(engine_options):
81 reporter = EventReporter()
82 reporter.notifier.register(ANY, event_receiver)
83 flow = lf.Flow('event-reporter').add(reporter)
84 eng = engines.load(flow, engine='worker-based', **engine_options)
85 eng.run()
86
87
88if __name__ == "__main__":
89 logging.basicConfig(level=logging.ERROR)
90
91 # Setup our transport configuration and merge it into the worker and
92 # engine configuration so that both of those objects use it correctly.
93 worker_conf = dict(WORKER_CONF)
94 worker_conf.update(BASE_SHARED_CONF)
95 engine_options = dict(BASE_SHARED_CONF)
96 workers = []
97
98 # These topics will be used to request worker information on; those
99 # workers will respond with their capabilities which the executing engine
100 # will use to match pending tasks to a matched worker, this will cause
101 # the task to be sent for execution, and the engine will wait until it
102 # is finished (a response is received) and then the engine will either
103 # continue with other tasks, do some retry/failure resolution logic or
104 # stop (and potentially re-raise the remote workers failure)...
105 worker_topics = []
106
107 try:
108 # Create a set of worker threads to simulate actual remote workers...
109 print('Running %s workers.' % (MEMORY_WORKERS))
110 for i in range(0, MEMORY_WORKERS):
111 # Give each one its own unique topic name so that they can
112 # correctly communicate with the engine (they will all share the
113 # same exchange).
114 worker_conf['topic'] = 'worker-%s' % (i + 1)
115 worker_topics.append(worker_conf['topic'])
116 w = worker.Worker(**worker_conf)
117 runner = threading_utils.daemon_thread(w.run)
118 runner.start()
119 w.wait()
120 workers.append((runner, w.stop))
121
122 # Now use those workers to do something.
123 print('Executing some work.')
124 engine_options['topics'] = worker_topics
125 result = run(engine_options)
126 print('Execution finished.')
127 finally:
128 # And cleanup.
129 print('Stopping workers.')
130 while workers:
131 r, stopper = workers.pop()
132 stopper()
133 r.join()
Distributed mandelbrot (complex)¶
Note
Full source located at wbe_mandelbrot
Output¶
Code¶
1
2import logging
3import math
4import os
5import sys
6
7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
8 os.pardir,
9 os.pardir))
10sys.path.insert(0, top_dir)
11
12from taskflow import engines
13from taskflow.engines.worker_based import worker
14from taskflow.patterns import unordered_flow as uf
15from taskflow import task
16from taskflow.utils import threading_utils
17
18# INTRO: This example walks through a workflow that will in parallel compute
19# a mandelbrot result set (using X 'remote' workers) and then combine their
20# results together to form a final mandelbrot fractal image. It shows a usage
21# of taskflow to perform a well-known embarrassingly parallel problem that has
22# the added benefit of also being an elegant visualization.
23#
24# NOTE(harlowja): this example simulates the expected larger number of workers
25# by using a set of threads (which in this example simulate the remote workers
26# that would typically be running on other external machines).
27#
28# NOTE(harlowja): to have it produce an image run (after installing pillow):
29#
30# $ python taskflow/examples/wbe_mandelbrot.py output.png
31
32BASE_SHARED_CONF = {
33 'exchange': 'taskflow',
34}
35WORKERS = 2
36WORKER_CONF = {
37 # These are the tasks the worker can execute, they *must* be importable,
38 # typically this list is used to restrict what workers may execute to
39 # a smaller set of *allowed* tasks that are known to be safe (one would
40 # not want to allow all python code to be executed).
41 'tasks': [
42 '%s:MandelCalculator' % (__name__),
43 ],
44}
45ENGINE_CONF = {
46 'engine': 'worker-based',
47}
48
49# Mandelbrot & image settings...
50IMAGE_SIZE = (512, 512)
51CHUNK_COUNT = 8
52MAX_ITERATIONS = 25
53
54
55class MandelCalculator(task.Task):
56 def execute(self, image_config, mandelbrot_config, chunk):
57 """Returns the number of iterations before the computation "escapes".
58
59 Given the real and imaginary parts of a complex number, determine if it
60 is a candidate for membership in the mandelbrot set given a fixed
61 number of iterations.
62 """
63
64 # Parts borrowed from (credit to mark harris and benoît mandelbrot).
65 #
66 # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
67 def mandelbrot(x, y, max_iters):
68 c = complex(x, y)
69 z = 0.0j
70 for i in range(max_iters):
71 z = z * z + c
72 if (z.real * z.real + z.imag * z.imag) >= 4:
73 return i
74 return max_iters
75
76 min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
77 height, width = image_config['size']
78 pixel_size_x = (max_x - min_x) / width
79 pixel_size_y = (max_y - min_y) / height
80 block = []
81 for y in range(chunk[0], chunk[1]):
82 row = []
83 imag = min_y + y * pixel_size_y
84 for x in range(0, width):
85 real = min_x + x * pixel_size_x
86 row.append(mandelbrot(real, imag, max_iters))
87 block.append(row)
88 return block
89
90
91def calculate(engine_conf):
92 # Subdivide the work into X pieces, then request each worker to calculate
93 # one of those chunks and then later we will write these chunks out to
94 # an image bitmap file.
95
96 # And unordered flow is used here since the mandelbrot calculation is an
97 # example of an embarrassingly parallel computation that we can scatter
98 # across as many workers as possible.
99 flow = uf.Flow("mandelbrot")
100
101 # These symbols will be automatically given to tasks as input to their
102 # execute method, in this case these are constants used in the mandelbrot
103 # calculation.
104 store = {
105 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
106 'image_config': {
107 'size': IMAGE_SIZE,
108 }
109 }
110
111 # We need the task names to be in the right order so that we can extract
112 # the final results in the right order (we don't care about the order when
113 # executing).
114 task_names = []
115
116 # Compose our workflow.
117 height, _width = IMAGE_SIZE
118 chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
119 for i in range(0, CHUNK_COUNT):
120 chunk_name = 'chunk_%s' % i
121 task_name = "calculation_%s" % i
122 # Break the calculation up into chunk size pieces.
123 rows = [i * chunk_size, i * chunk_size + chunk_size]
124 flow.add(
125 MandelCalculator(task_name,
126 # This ensures the storage symbol with name
127 # 'chunk_name' is sent into the tasks local
128 # symbol 'chunk'. This is how we give each
129 # calculator its own correct sequence of rows
130 # to work on.
131 rebind={'chunk': chunk_name}))
132 store[chunk_name] = rows
133 task_names.append(task_name)
134
135 # Now execute it.
136 eng = engines.load(flow, store=store, engine_conf=engine_conf)
137 eng.run()
138
139 # Gather all the results and order them for further processing.
140 gather = []
141 for name in task_names:
142 gather.extend(eng.storage.get(name))
143 points = []
144 for y, row in enumerate(gather):
145 for x, color in enumerate(row):
146 points.append(((x, y), color))
147 return points
148
149
150def write_image(results, output_filename=None):
151 print("Gathered %s results that represents a mandelbrot"
152 " image (using %s chunks that are computed jointly"
153 " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
154 if not output_filename:
155 return
156
157 # Pillow (the PIL fork) saves us from writing our own image writer...
158 try:
159 from PIL import Image
160 except ImportError as e:
161 # To currently get this (may change in the future),
162 # $ pip install Pillow
163 raise RuntimeError("Pillow is required to write image files: %s" % e)
164
165 # Limit to 255, find the max and normalize to that...
166 color_max = 0
167 for _point, color in results:
168 color_max = max(color, color_max)
169
170 # Use gray scale since we don't really have other colors.
171 img = Image.new('L', IMAGE_SIZE, "black")
172 pixels = img.load()
173 for (x, y), color in results:
174 if color_max == 0:
175 color = 0
176 else:
177 color = int((float(color) / color_max) * 255.0)
178 pixels[x, y] = color
179 img.save(output_filename)
180
181
182def create_fractal():
183 logging.basicConfig(level=logging.ERROR)
184
185 # Setup our transport configuration and merge it into the worker and
186 # engine configuration so that both of those use it correctly.
187 shared_conf = dict(BASE_SHARED_CONF)
188 shared_conf.update({
189 'transport': 'memory',
190 'transport_options': {
191 'polling_interval': 0.1,
192 },
193 })
194
195 if len(sys.argv) >= 2:
196 output_filename = sys.argv[1]
197 else:
198 output_filename = None
199
200 worker_conf = dict(WORKER_CONF)
201 worker_conf.update(shared_conf)
202 engine_conf = dict(ENGINE_CONF)
203 engine_conf.update(shared_conf)
204 workers = []
205 worker_topics = []
206
207 print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
208 try:
209 # Create a set of workers to simulate actual remote workers.
210 print('Running %s workers.' % (WORKERS))
211 for i in range(0, WORKERS):
212 worker_conf['topic'] = 'calculator_%s' % (i + 1)
213 worker_topics.append(worker_conf['topic'])
214 w = worker.Worker(**worker_conf)
215 runner = threading_utils.daemon_thread(w.run)
216 runner.start()
217 w.wait()
218 workers.append((runner, w.stop))
219
220 # Now use those workers to do something.
221 engine_conf['topics'] = worker_topics
222 results = calculate(engine_conf)
223 print('Execution finished.')
224 finally:
225 # And cleanup.
226 print('Stopping workers.')
227 while workers:
228 r, stopper = workers.pop()
229 stopper()
230 r.join()
231 print("Writing image...")
232 write_image(results, output_filename=output_filename)
233
234
235if __name__ == "__main__":
236 create_fractal()
Jobboard producer/consumer (simple)¶
Note
Full source located at jobboard_produce_consume_colors
1
2import collections
3import contextlib
4import logging
5import os
6import random
7import sys
8import threading
9import time
10
11logging.basicConfig(level=logging.ERROR)
12
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14 os.pardir,
15 os.pardir))
16sys.path.insert(0, top_dir)
17
18from zake import fake_client
19
20from taskflow import exceptions as excp
21from taskflow.jobs import backends
22from taskflow.utils import threading_utils
23
24# In this example we show how a jobboard can be used to post work for other
25# entities to work on. This example creates a set of jobs using one producer
26# thread (typically this would be split across many machines) and then having
27# other worker threads with their own jobboards select work using a given
28# filters [red/blue] and then perform that work (and consuming or abandoning
29# the job after it has been completed or failed).
30
31# Things to note:
32# - No persistence layer is used (or logbook), just the job details are used
33# to determine if a job should be selected by a worker or not.
34# - This example runs in a single process (this is expected to be atypical
35# but this example shows that it can be done if needed, for testing...)
36# - The iterjobs(), claim(), consume()/abandon() worker workflow.
37# - The post() producer workflow.
38
39SHARED_CONF = {
40 'path': "/taskflow/jobs",
41 'board': 'zookeeper',
42}
43
44# How many workers and producers of work will be created (as threads).
45PRODUCERS = 3
46WORKERS = 5
47
48# How many units of work each producer will create.
49PRODUCER_UNITS = 10
50
51# How many units of work are expected to be produced (used so workers can
52# know when to stop running and shutdown, typically this would not be a
53# a value but we have to limit this example's execution time to be less than
54# infinity).
55EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
56
57# Delay between producing/consuming more work.
58WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
59
60# To ensure threads don't trample other threads output.
61STDOUT_LOCK = threading.Lock()
62
63
64def dispatch_work(job):
65 # This is where the jobs contained work *would* be done
66 time.sleep(1.0)
67
68
69def safe_print(name, message, prefix=""):
70 with STDOUT_LOCK:
71 if prefix:
72 print("%s %s: %s" % (prefix, name, message))
73 else:
74 print("%s: %s" % (name, message))
75
76
77def worker(ident, client, consumed):
78 # Create a personal board (using the same client so that it works in
79 # the same process) and start looking for jobs on the board that we want
80 # to perform.
81 name = "W-%s" % (ident)
82 safe_print(name, "started")
83 claimed_jobs = 0
84 consumed_jobs = 0
85 abandoned_jobs = 0
86 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
87 while len(consumed) != EXPECTED_UNITS:
88 favorite_color = random.choice(['blue', 'red'])
89 for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
90 # See if we should even bother with it...
91 if job.details.get('color') != favorite_color:
92 continue
93 safe_print(name, "'%s' [attempting claim]" % (job))
94 try:
95 board.claim(job, name)
96 claimed_jobs += 1
97 safe_print(name, "'%s' [claimed]" % (job))
98 except (excp.NotFound, excp.UnclaimableJob):
99 safe_print(name, "'%s' [claim unsuccessful]" % (job))
100 else:
101 try:
102 dispatch_work(job)
103 board.consume(job, name)
104 safe_print(name, "'%s' [consumed]" % (job))
105 consumed_jobs += 1
106 consumed.append(job)
107 except Exception:
108 board.abandon(job, name)
109 abandoned_jobs += 1
110 safe_print(name, "'%s' [abandoned]" % (job))
111 time.sleep(WORKER_DELAY)
112 safe_print(name,
113 "finished (claimed %s jobs, consumed %s jobs,"
114 " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
115 abandoned_jobs), prefix=">>>")
116
117
118def producer(ident, client):
119 # Create a personal board (using the same client so that it works in
120 # the same process) and start posting jobs on the board that we want
121 # some entity to perform.
122 name = "P-%s" % (ident)
123 safe_print(name, "started")
124 with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
125 for i in range(0, PRODUCER_UNITS):
126 job_name = "%s-%s" % (name, i)
127 details = {
128 'color': random.choice(['red', 'blue']),
129 }
130 job = board.post(job_name, book=None, details=details)
131 safe_print(name, "'%s' [posted]" % (job))
132 time.sleep(PRODUCER_DELAY)
133 safe_print(name, "finished", prefix=">>>")
134
135
136def main():
137 # TODO(harlowja): Hack to make eventlet work right, remove when the
138 # following is fixed: https://github.com/eventlet/eventlet/issues/230
139 from taskflow.utils import eventlet_utils as _eu # noqa
140 try:
141 import eventlet as _eventlet # noqa
142 except ImportError:
143 pass
144
145 with contextlib.closing(fake_client.FakeClient()) as c:
146 created = []
147 for i in range(0, PRODUCERS):
148 p = threading_utils.daemon_thread(producer, i + 1, c)
149 created.append(p)
150 p.start()
151 consumed = collections.deque()
152 for i in range(0, WORKERS):
153 w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
154 created.append(w)
155 w.start()
156 while created:
157 t = created.pop()
158 t.join()
159 # At the end there should be nothing leftover, let's verify that.
160 board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
161 board.connect()
162 with contextlib.closing(board):
163 if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
164 return 1
165 return 0
166
167
168if __name__ == "__main__":
169 sys.exit(main())
Conductor simulating a CI pipeline¶
Note
Full source located at tox_conductor
1
2import contextlib
3import itertools
4import logging
5import os
6import shutil
7import socket
8import sys
9import tempfile
10import threading
11import time
12
13logging.basicConfig(level=logging.ERROR)
14
15top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
16 os.pardir,
17 os.pardir))
18sys.path.insert(0, top_dir)
19
20from oslo_utils import timeutils
21from oslo_utils import uuidutils
22from zake import fake_client
23
24from taskflow.conductors import backends as conductors
25from taskflow import engines
26from taskflow.jobs import backends as boards
27from taskflow.patterns import linear_flow
28from taskflow.persistence import backends as persistence
29from taskflow.persistence import models
30from taskflow import task
31from taskflow.utils import threading_utils
32
33# INTRO: This examples shows how a worker/producer can post desired work (jobs)
34# to a jobboard and a conductor can consume that work (jobs) from that jobboard
35# and execute those jobs in a reliable & async manner (for example, if the
36# conductor were to crash then the job will be released back onto the jobboard
37# and another conductor can attempt to finish it, from wherever that job last
38# left off).
39#
40# In this example a in-memory jobboard (and in-memory storage) is created and
41# used that simulates how this would be done at a larger scale (it is an
42# example after all).
43
44# Restrict how long this example runs for...
45RUN_TIME = 5
46REVIEW_CREATION_DELAY = 0.5
47SCAN_DELAY = 0.1
48NAME = "%s_%s" % (socket.getfqdn(), os.getpid())
49
50# This won't really use zookeeper but will use a local version of it using
51# the zake library that mimics an actual zookeeper cluster using threads and
52# an in-memory data structure.
53JOBBOARD_CONF = {
54 'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
55}
56
57
58class RunReview(task.Task):
59 # A dummy task that clones the review and runs tox...
60
61 def _clone_review(self, review, temp_dir):
62 print("Cloning review '%s' into %s" % (review['id'], temp_dir))
63
64 def _run_tox(self, temp_dir):
65 print("Running tox in %s" % temp_dir)
66
67 def execute(self, review, temp_dir):
68 self._clone_review(review, temp_dir)
69 self._run_tox(temp_dir)
70
71
72class MakeTempDir(task.Task):
73 # A task that creates and destroys a temporary dir (on failure).
74 #
75 # It provides the location of the temporary dir for other tasks to use
76 # as they see fit.
77
78 default_provides = 'temp_dir'
79
80 def execute(self):
81 return tempfile.mkdtemp()
82
83 def revert(self, *args, **kwargs):
84 temp_dir = kwargs.get(task.REVERT_RESULT)
85 if temp_dir:
86 shutil.rmtree(temp_dir)
87
88
89class CleanResources(task.Task):
90 # A task that cleans up any workflow resources.
91
92 def execute(self, temp_dir):
93 print("Removing %s" % temp_dir)
94 shutil.rmtree(temp_dir)
95
96
97def review_iter():
98 """Makes reviews (never-ending iterator/generator)."""
99 review_id_gen = itertools.count(0)
100 while True:
101 review_id = next(review_id_gen)
102 review = {
103 'id': review_id,
104 }
105 yield review
106
107
108# The reason this is at the module namespace level is important, since it must
109# be accessible from a conductor dispatching an engine, if it was a lambda
110# function for example, it would not be reimportable and the conductor would
111# be unable to reference it when creating the workflow to run.
112def create_review_workflow():
113 """Factory method used to create a review workflow to run."""
114 f = linear_flow.Flow("tester")
115 f.add(
116 MakeTempDir(name="maker"),
117 RunReview(name="runner"),
118 CleanResources(name="cleaner")
119 )
120 return f
121
122
123def generate_reviewer(client, saver, name=NAME):
124 """Creates a review producer thread with the given name prefix."""
125 real_name = "%s_reviewer" % name
126 no_more = threading.Event()
127 jb = boards.fetch(real_name, JOBBOARD_CONF,
128 client=client, persistence=saver)
129
130 def make_save_book(saver, review_id):
131 # Record what we want to happen (sometime in the future).
132 book = models.LogBook("book_%s" % review_id)
133 detail = models.FlowDetail("flow_%s" % review_id,
134 uuidutils.generate_uuid())
135 book.add(detail)
136 # Associate the factory method we want to be called (in the future)
137 # with the book, so that the conductor will be able to call into
138 # that factory to retrieve the workflow objects that represent the
139 # work.
140 #
141 # These args and kwargs *can* be used to save any specific parameters
142 # into the factory when it is being called to create the workflow
143 # objects (typically used to tell a factory how to create a unique
144 # workflow that represents this review).
145 factory_args = ()
146 factory_kwargs = {}
147 engines.save_factory_details(detail, create_review_workflow,
148 factory_args, factory_kwargs)
149 with contextlib.closing(saver.get_connection()) as conn:
150 conn.save_logbook(book)
151 return book
152
153 def run():
154 """Periodically publishes 'fake' reviews to analyze."""
155 jb.connect()
156 review_generator = review_iter()
157 with contextlib.closing(jb):
158 while not no_more.is_set():
159 review = next(review_generator)
160 details = {
161 'store': {
162 'review': review,
163 },
164 }
165 job_name = "%s_%s" % (real_name, review['id'])
166 print("Posting review '%s'" % review['id'])
167 jb.post(job_name,
168 book=make_save_book(saver, review['id']),
169 details=details)
170 time.sleep(REVIEW_CREATION_DELAY)
171
172 # Return the unstarted thread, and a callback that can be used
173 # shutdown that thread (to avoid running forever).
174 return (threading_utils.daemon_thread(target=run), no_more.set)
175
176
177def generate_conductor(client, saver, name=NAME):
178 """Creates a conductor thread with the given name prefix."""
179 real_name = "%s_conductor" % name
180 jb = boards.fetch(name, JOBBOARD_CONF,
181 client=client, persistence=saver)
182 conductor = conductors.fetch("blocking", real_name, jb,
183 engine='parallel', wait_timeout=SCAN_DELAY)
184
185 def run():
186 jb.connect()
187 with contextlib.closing(jb):
188 conductor.run()
189
190 # Return the unstarted thread, and a callback that can be used
191 # shutdown that thread (to avoid running forever).
192 return (threading_utils.daemon_thread(target=run), conductor.stop)
193
194
195def main():
196 # Need to share the same backend, so that data can be shared...
197 persistence_conf = {
198 'connection': 'memory',
199 }
200 saver = persistence.fetch(persistence_conf)
201 with contextlib.closing(saver.get_connection()) as conn:
202 # This ensures that the needed backend setup/data directories/schema
203 # upgrades and so on... exist before they are attempted to be used...
204 conn.upgrade()
205 fc1 = fake_client.FakeClient()
206 # Done like this to share the same client storage location so the correct
207 # zookeeper features work across clients...
208 fc2 = fake_client.FakeClient(storage=fc1.storage)
209 entities = [
210 generate_reviewer(fc1, saver),
211 generate_conductor(fc2, saver),
212 ]
213 for t, stopper in entities:
214 t.start()
215 try:
216 watch = timeutils.StopWatch(duration=RUN_TIME)
217 watch.start()
218 while not watch.expired():
219 time.sleep(0.1)
220 finally:
221 for t, stopper in reversed(entities):
222 stopper()
223 t.join()
224
225
226if __name__ == '__main__':
227 main()
Conductor running 99 bottles of beer song requests¶
Note
Full source located at 99_bottles
1
2import contextlib
3import functools
4import logging
5import os
6import sys
7import time
8import traceback
9
10from kazoo import client
11
12top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
13 os.pardir,
14 os.pardir))
15sys.path.insert(0, top_dir)
16
17from taskflow.conductors import backends as conductor_backends
18from taskflow import engines
19from taskflow.jobs import backends as job_backends
20from taskflow import logging as taskflow_logging
21from taskflow.patterns import linear_flow as lf
22from taskflow.persistence import backends as persistence_backends
23from taskflow.persistence import models
24from taskflow import task
25
26from oslo_utils import timeutils
27from oslo_utils import uuidutils
28
29# Instructions!
30#
31# 1. Install zookeeper (or change host listed below)
32# 2. Download this example, place in file '99_bottles.py'
33# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
34# 4. Run `python 99_bottles.py c` a few times (in different shells)
35# 5. On demand kill previously listed processes created in (4) and watch
36# the work resume on another process (and repeat)
37# 6. Keep enough workers alive to eventually finish the song (if desired).
38
39ME = os.getpid()
40ZK_HOST = "localhost:2181"
41JB_CONF = {
42 'hosts': ZK_HOST,
43 'board': 'zookeeper',
44 'path': '/taskflow/99-bottles-demo',
45}
46PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
47TAKE_DOWN_DELAY = 1.0
48PASS_AROUND_DELAY = 3.0
49HOW_MANY_BOTTLES = 99
50
51
52class TakeABottleDown(task.Task):
53 def execute(self, bottles_left):
54 sys.stdout.write('Take one down, ')
55 sys.stdout.flush()
56 time.sleep(TAKE_DOWN_DELAY)
57 return bottles_left - 1
58
59
60class PassItAround(task.Task):
61 def execute(self):
62 sys.stdout.write('pass it around, ')
63 sys.stdout.flush()
64 time.sleep(PASS_AROUND_DELAY)
65
66
67class Conclusion(task.Task):
68 def execute(self, bottles_left):
69 sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
70 sys.stdout.flush()
71
72
73def make_bottles(count):
74 # This is the function that will be called to generate the workflow
75 # and will also be called to regenerate it on resumption so that work
76 # can continue from where it last left off...
77
78 s = lf.Flow("bottle-song")
79
80 take_bottle = TakeABottleDown("take-bottle-%s" % count,
81 inject={'bottles_left': count},
82 provides='bottles_left')
83 pass_it = PassItAround("pass-%s-around" % count)
84 next_bottles = Conclusion("next-bottles-%s" % (count - 1))
85 s.add(take_bottle, pass_it, next_bottles)
86
87 for bottle in reversed(list(range(1, count))):
88 take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
89 provides='bottles_left')
90 pass_it = PassItAround("pass-%s-around" % bottle)
91 next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
92 s.add(take_bottle, pass_it, next_bottles)
93
94 return s
95
96
97def run_conductor(only_run_once=False):
98 # This continuously runs consumers until its stopped via ctrl-c or other
99 # kill signal...
100 event_watches = {}
101
102 # This will be triggered by the conductor doing various activities
103 # with engines, and is quite nice to be able to see the various timing
104 # segments (which is useful for debugging, or watching, or figuring out
105 # where to optimize).
106 def on_conductor_event(cond, event, details):
107 print("Event '%s' has been received..." % event)
108 print("Details = %s" % details)
109 if event.endswith("_start"):
110 w = timeutils.StopWatch()
111 w.start()
112 base_event = event[0:-len("_start")]
113 event_watches[base_event] = w
114 if event.endswith("_end"):
115 base_event = event[0:-len("_end")]
116 try:
117 w = event_watches.pop(base_event)
118 w.stop()
119 print("It took %0.3f seconds for event '%s' to finish"
120 % (w.elapsed(), base_event))
121 except KeyError:
122 pass
123 if event == 'running_end' and only_run_once:
124 cond.stop()
125
126 print("Starting conductor with pid: %s" % ME)
127 my_name = "conductor-%s" % ME
128 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
129 with contextlib.closing(persist_backend):
130 with contextlib.closing(persist_backend.get_connection()) as conn:
131 conn.upgrade()
132 job_backend = job_backends.fetch(my_name, JB_CONF,
133 persistence=persist_backend)
134 job_backend.connect()
135 with contextlib.closing(job_backend):
136 cond = conductor_backends.fetch('blocking', my_name, job_backend,
137 persistence=persist_backend)
138 on_conductor_event = functools.partial(on_conductor_event, cond)
139 cond.notifier.register(cond.notifier.ANY, on_conductor_event)
140 # Run forever, and kill -9 or ctrl-c me...
141 try:
142 cond.run()
143 finally:
144 cond.stop()
145 cond.wait()
146
147
148def run_poster():
149 # This just posts a single job and then ends...
150 print("Starting poster with pid: %s" % ME)
151 my_name = "poster-%s" % ME
152 persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
153 with contextlib.closing(persist_backend):
154 with contextlib.closing(persist_backend.get_connection()) as conn:
155 conn.upgrade()
156 job_backend = job_backends.fetch(my_name, JB_CONF,
157 persistence=persist_backend)
158 job_backend.connect()
159 with contextlib.closing(job_backend):
160 # Create information in the persistence backend about the
161 # unit of work we want to complete and the factory that
162 # can be called to create the tasks that the work unit needs
163 # to be done.
164 lb = models.LogBook("post-from-%s" % my_name)
165 fd = models.FlowDetail("song-from-%s" % my_name,
166 uuidutils.generate_uuid())
167 lb.add(fd)
168 with contextlib.closing(persist_backend.get_connection()) as conn:
169 conn.save_logbook(lb)
170 engines.save_factory_details(fd, make_bottles,
171 [HOW_MANY_BOTTLES], {},
172 backend=persist_backend)
173 # Post, and be done with it!
174 jb = job_backend.post("song-from-%s" % my_name, book=lb)
175 print("Posted: %s" % jb)
176 print("Goodbye...")
177
178
179def main_local():
180 # Run locally typically this is activating during unit testing when all
181 # the examples are made sure to still function correctly...
182 global TAKE_DOWN_DELAY
183 global PASS_AROUND_DELAY
184 global JB_CONF
185 # Make everything go much faster (so that this finishes quickly).
186 PASS_AROUND_DELAY = 0.01
187 TAKE_DOWN_DELAY = 0.01
188 JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
189 run_poster()
190 run_conductor(only_run_once=True)
191
192
193def check_for_zookeeper(timeout=1):
194 sys.stderr.write("Testing for the existence of a zookeeper server...\n")
195 sys.stderr.write("Please wait....\n")
196 with contextlib.closing(client.KazooClient()) as test_client:
197 try:
198 test_client.start(timeout=timeout)
199 except test_client.handler.timeout_exception:
200 sys.stderr.write("Zookeeper is needed for running this example!\n")
201 traceback.print_exc()
202 return False
203 else:
204 test_client.stop()
205 return True
206
207
208def main():
209 if not check_for_zookeeper():
210 return
211 if len(sys.argv) == 1:
212 main_local()
213 elif sys.argv[1] in ('p', 'c'):
214 if sys.argv[-1] == "v":
215 logging.basicConfig(level=taskflow_logging.TRACE)
216 else:
217 logging.basicConfig(level=logging.ERROR)
218 if sys.argv[1] == 'p':
219 run_poster()
220 else:
221 run_conductor()
222 else:
223 sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
224
225
226if __name__ == '__main__':
227 main()