# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import platform
import socket
import sys
import futurist
from oslo_utils import reflection
from taskflow.engines.worker_based import endpoint
from taskflow.engines.worker_based import server
from taskflow import logging
from taskflow import task as t_task
from taskflow.utils import banner
from taskflow.utils import misc
from taskflow.utils import threading_utils as tu
LOG = logging.getLogger(__name__)
[docs]class Worker(object):
"""Worker that can be started on a remote host for handling tasks requests.
:param url: broker url
:param exchange: broker exchange name
:param topic: topic name under which worker is stated
:param tasks: task list that worker is capable of performing, items in
the list can be one of the following types; 1, a string naming the
python module name to search for tasks in or the task class name; 2, a
python module to search for tasks in; 3, a task class object that
will be used to create tasks from.
:param executor: custom executor object that can used for processing
requests in separate threads (if not provided one will be created)
:param threads_count: threads count to be passed to the
default executor (used only if an executor is not
passed in)
:param transport: transport to be used (e.g. amqp, memory, etc.)
:param transport_options: transport specific options (see:
http://kombu.readthedocs.org/ for what these
options imply and are expected to be)
:param retry_options: retry specific options
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
"""
def __init__(self, exchange, topic, tasks,
executor=None, threads_count=None, url=None,
transport=None, transport_options=None,
retry_options=None):
self._topic = topic
self._executor = executor
self._owns_executor = False
if self._executor is None:
self._executor = futurist.ThreadPoolExecutor(
max_workers=threads_count)
self._owns_executor = True
self._endpoints = self._derive_endpoints(tasks)
self._exchange = exchange
self._server = server.Server(topic, exchange, self._executor,
self._endpoints, url=url,
transport=transport,
transport_options=transport_options,
retry_options=retry_options)
@staticmethod
def _derive_endpoints(tasks):
"""Derive endpoints from list of strings, classes or packages."""
derived_tasks = misc.find_subclasses(tasks, t_task.Task)
return [endpoint.Endpoint(task) for task in derived_tasks]
@misc.cachedproperty
def banner(self):
"""A banner that can be useful to display before running."""
connection_details = self._server.connection_details
transport = connection_details.transport
if transport.driver_version:
transport_driver = "%s v%s" % (transport.driver_name,
transport.driver_version)
else:
transport_driver = transport.driver_name
try:
hostname = socket.getfqdn()
except socket.error:
hostname = "???"
try:
pid = os.getpid()
except OSError:
pid = "???"
chapters = {
'Connection details': {
'Driver': transport_driver,
'Exchange': self._exchange,
'Topic': self._topic,
'Transport': transport.driver_type,
'Uri': connection_details.uri,
},
'Powered by': {
'Executor': reflection.get_class_name(self._executor),
'Thread count': getattr(self._executor, 'max_workers', "???"),
},
'Supported endpoints': [str(ep) for ep in self._endpoints],
'System details': {
'Hostname': hostname,
'Pid': pid,
'Platform': platform.platform(),
'Python': sys.version.split("\n", 1)[0].strip(),
'Thread id': tu.get_ident(),
},
}
return banner.make_banner('WBE worker', chapters)
[docs] def run(self, display_banner=True, banner_writer=None):
"""Runs the worker."""
if display_banner:
if banner_writer is None:
for line in self.banner.splitlines():
LOG.info(line)
else:
banner_writer(self.banner)
self._server.start()
[docs] def wait(self):
"""Wait until worker is started."""
self._server.wait()
[docs] def stop(self):
"""Stop worker."""
self._server.stop()
if self._owns_executor:
self._executor.shutdown()
if __name__ == '__main__':
import argparse
import logging as log
parser = argparse.ArgumentParser()
parser.add_argument("--exchange", required=True)
parser.add_argument("--connection-url", required=True)
parser.add_argument("--topic", required=True)
parser.add_argument("--task", action='append',
metavar="TASK", default=[])
parser.add_argument("-v", "--verbose", action='store_true')
args = parser.parse_args()
if args.verbose:
log.basicConfig(level=logging.DEBUG, format="")
w = Worker(args.exchange, args.topic, args.task, url=args.connection_url)
w.run()