Source code for osprofiler.drivers.messaging

# Copyright 2016 Mirantis Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import functools
import signal
import time
from typing import Any

from oslo_config import cfg
from oslo_utils import importutils

from osprofiler.drivers import base


[docs] class Messaging(base.Driver): def __init__( self, connection_str: str, project: str | None = None, service: str | None = None, host: str | None = None, context: Any = None, conf: cfg.ConfigOpts | None = None, transport_url: str | None = None, idle_timeout: int = 1, **kwargs: Any, ) -> None: """Driver that uses messaging as transport for notifications :param connection_str: OSProfiler driver connection string, equals to messaging:// :param project: project name that will be included into notification :param service: service name that will be included into notification :param host: host name that will be included into notification :param context: oslo.messaging context :param conf: oslo.config CONF object :param transport_url: oslo.messaging transport, e.g. rabbit://rabbit:password@devstack:5672/ :param idle_timeout: how long to wait for new notifications after the last one seen in the trace; this parameter is useful to collect full trace of asynchronous commands, e.g. when user runs `osprofiler` right after `openstack server create` :param kwargs: black hole for any other parameters """ self.oslo_messaging = importutils.try_import("oslo_messaging") if not self.oslo_messaging: raise ValueError( "Oslo.messaging library is required for messaging driver" ) super().__init__( connection_str, project=project, service=service, host=host ) self.context = context if not conf: oslo_config = importutils.try_import("oslo_config") if not oslo_config: raise ValueError( "Oslo.config library is required for messaging driver" ) conf = oslo_config.cfg.CONF transport_kwargs: dict[str, Any] = {} if transport_url: transport_kwargs["url"] = transport_url self.transport = self.oslo_messaging.get_notification_transport( conf, **transport_kwargs ) self.client = self.oslo_messaging.Notifier( self.transport, publisher_id=self.host, driver="messaging", topics=["profiler"], retry=0, ) self.idle_timeout = idle_timeout
[docs] @classmethod def get_name(cls) -> str: return "messaging"
[docs] def notify( self, info: dict[str, Any], context: Any = None, **kwargs: Any ) -> None: """Send notifications to backend via oslo.messaging notifier API. :param info: Contains information about trace element. In payload dict there are always 3 ids: "base_id" - uuid that is common for all notifications related to one trace. "parent_id" - uuid of parent element in trace "trace_id" - uuid of current element in trace With parent_id and trace_id it's quite simple to build tree of trace elements, which simplify analyze of trace. :param context: request context that is mostly used to specify current active user and tenant. """ info["project"] = self.project info["service"] = self.service self.client.info( context or self.context, "profiler.{}".format(info["service"]), info, )
[docs] def get_report(self, base_id: str) -> dict[str, Any]: notification_endpoint = NotifyEndpoint(self.oslo_messaging, base_id) endpoints = [notification_endpoint] targets = [self.oslo_messaging.Target(topic="profiler")] server = self.oslo_messaging.notify.get_notification_listener( self.transport, targets, endpoints, executor="threading" ) state: dict[str, bool] = dict(running=False) sfn = functools.partial(signal_handler, state=state) # modify signal handlers to handle interruption gracefully old_sigterm_handler = signal.signal(signal.SIGTERM, sfn) old_sigint_handler = signal.signal(signal.SIGINT, sfn) try: server.start() except self.oslo_messaging.server.ServerListenError: # failed to start the server raise except SignalExit: print( "Execution interrupted while trying to connect to " "messaging server. No data was collected." ) return {} # connected to server, now read the data try: # run until the trace is complete state["running"] = True while state["running"]: last_read_time = notification_endpoint.get_last_read_time() wait = self.idle_timeout - (time.time() - last_read_time) if wait < 0: state["running"] = False else: time.sleep(wait) except SignalExit: print("Execution interrupted. Terminating") finally: server.stop() server.wait() # restore original signal handlers signal.signal(signal.SIGTERM, old_sigterm_handler) signal.signal(signal.SIGINT, old_sigint_handler) events = notification_endpoint.get_messages() if not events: print( f"No events are collected for Trace UUID {base_id}. " f"Please note that osprofiler has read ALL events from " f"profiler topic, but has not found any for specified Trace " f"UUID." ) for n in events: trace_id = n["trace_id"] parent_id = n["parent_id"] name = n["name"] project = n["project"] service = n["service"] host = n["info"]["host"] timestamp = n["timestamp"] self._append_results( trace_id, parent_id, name, project, service, host, timestamp, n ) return self._parse_results()
[docs] class NotifyEndpoint: def __init__(self, oslo_messaging: Any, base_id: str) -> None: self.received_messages: list[Any] = [] self.last_read_time = time.time() self.filter_rule = oslo_messaging.NotificationFilter( payload={"base_id": base_id} )
[docs] def info( self, ctxt: Any, publisher_id: Any, event_type: Any, payload: Any, metadata: Any, ) -> None: self.received_messages.append(payload) self.last_read_time = time.time()
[docs] def get_messages(self) -> list[Any]: return self.received_messages
[docs] def get_last_read_time(self) -> float: return self.last_read_time # time when the latest event was received
[docs] class SignalExit(BaseException): pass
[docs] def signal_handler(signum: Any, frame: Any, state: dict[str, bool]) -> None: state["running"] = False raise SignalExit()