Source code for monasca_log_api.reference.common.log_publisher

# Copyright 2015 kornicameister@gmail.com
# Copyright 2016-2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import time

import falcon
from monasca_common.kafka import producer
from monasca_common.rest import utils as rest_utils
from oslo_config import cfg
from oslo_log import log

from monasca_log_api.monitoring import client
from monasca_log_api.monitoring import metrics
from monasca_log_api.reference.common import model

LOG = log.getLogger(__name__)
CONF = cfg.CONF

_MAX_MESSAGE_SIZE = 1048576
_RETRY_AFTER = 60
_TIMESTAMP_KEY_SIZE = len(
    bytearray(str(int(time.time() * 1000)).encode('utf-8')))
_TRUNCATED_PROPERTY_SIZE = len(
    bytearray('"truncated": true'.encode('utf-8')))
_KAFKA_META_DATA_SIZE = 32
_TRUNCATION_SAFE_OFFSET = 1

log_publisher_opts = [
    cfg.StrOpt('kafka_url',
               required=True,
               help='Url to kafka server'),
    cfg.MultiStrOpt('topics',
                    default=['logs'],
                    help='Consumer topics'),
    cfg.IntOpt('max_message_size',
               default=_MAX_MESSAGE_SIZE,
               required=True,
               help=('Message max size that can be sent '
                     'to kafka, default to %d bytes' % _MAX_MESSAGE_SIZE))
]

log_publisher_group = cfg.OptGroup(name='log_publisher', title='log_publisher')

cfg.CONF.register_group(log_publisher_group)
cfg.CONF.register_opts(log_publisher_opts, log_publisher_group)


[docs]class InvalidMessageException(Exception): pass
[docs]class LogPublisher(object): """Publishes log data to Kafka LogPublisher is able to send single message to multiple configured topic. It uses following configuration written in conf file :: [log_publisher] topics = 'logs' kafka_url = 'localhost:8900' Note: Uses :py:class:`monasca_common.kafka.producer.KafkaProducer` to ship logs to kafka. For more details see `monasca_common`_ github repository. .. _monasca_common: https://github.com/openstack/monasca-common """ def __init__(self): self._topics = CONF.log_publisher.topics self.max_message_size = CONF.log_publisher.max_message_size self._kafka_publisher = producer.KafkaProducer( url=CONF.log_publisher.kafka_url ) self._statsd = client.get_client() # setup counter, gauges etc self._logs_published_counter = self._statsd.get_counter( metrics.LOGS_PUBLISHED_METRIC ) self._publish_time_ms = self._statsd.get_timer( metrics.LOGS_PUBLISH_TIME_METRIC ) self._logs_lost_counter = self._statsd.get_counter( metrics.LOGS_PUBLISHED_LOST_METRIC ) self._logs_truncated_gauge = self._statsd.get_gauge( metrics.LOGS_TRUNCATED_METRIC ) LOG.info('Initializing LogPublisher <%s>', self)
[docs] def send_message(self, messages): """Sends message to each configured topic. Note: Falsy messages (i.e. empty) are not shipped to kafka See also * :py:class:`monasca_log_api.common.model.Envelope` * :py:meth:`._is_message_valid` :param dict|list messages: instance (or instances) of log envelope """ if not messages: return if not isinstance(messages, list): messages = [messages] sent_counter = 0 num_of_msgs = len(messages) LOG.debug('About to publish %d messages to %s topics', num_of_msgs, self._topics) try: send_messages = [] for message in messages: msg = self._transform_message(message) send_messages.append(msg) with self._publish_time_ms.time(name=None): self._publish(send_messages) sent_counter = len(send_messages) except Exception as ex: LOG.exception('Failure in publishing messages to kafka') raise ex finally: self._after_publish(sent_counter, num_of_msgs)
def _transform_message(self, message): """Transforms message into JSON. Method executes transformation operation for single element. Operation is set of following operations: * checking if message is valid (:py:func:`.LogPublisher._is_message_valid`) * truncating message if necessary (:py:func:`.LogPublisher._truncate`) :param model.Envelope message: instance of message :return: serialized message :rtype: str """ if not self._is_message_valid(message): raise InvalidMessageException() truncated = self._truncate(message) proper = self._ensure_type_bytes(truncated) return proper def _ensure_type_bytes(self, message): """Ensures that message will have proper type. Kafka client expects that messages being posted have certain data type (:py:func:`six.binary_type`). This method ensures by the means of encoding that such type will always be a case regardless if codebase runs under :py:data:`six.PY2` or :py:data:`six.PY3` """ message = message.encode('utf-8') return message def _truncate(self, envelope): """Truncates the message if needed. Each message send to kafka is verified. Method checks if message serialized to json exceeds maximum allowed size that can be posted to kafka queue. If so, method truncates message property of the log by difference between message and allowed size. :param Envelope envelope: original envelope :return: serialized message :rtype: str """ msg_str = model.serialize_envelope(envelope) envelope_size = ((len(bytearray(msg_str, 'utf-8', 'replace')) + _TIMESTAMP_KEY_SIZE + _KAFKA_META_DATA_SIZE) if msg_str is not None else -1) diff_size = ((envelope_size - self.max_message_size) + _TRUNCATION_SAFE_OFFSET) if diff_size > 1: truncated_by = diff_size + _TRUNCATED_PROPERTY_SIZE LOG.warning(('Detected message that exceeds %d bytes,' 'message will be truncated by %d bytes'), self.max_message_size, truncated_by) log_msg = envelope['log']['message'] truncated_log_msg = log_msg[:-truncated_by] envelope['log']['truncated'] = True envelope['log']['message'] = truncated_log_msg self._logs_truncated_gauge.send(name=None, value=truncated_by) msg_str = rest_utils.as_json(envelope) else: self._logs_truncated_gauge.send(name=None, value=0) return msg_str def _publish(self, messages): """Publishes messages to kafka. :param list messages: list of messages """ num_of_msg = len(messages) LOG.debug('Publishing %d messages', num_of_msg) try: for topic in self._topics: self._kafka_publisher.publish( topic, messages ) LOG.debug('Sent %d messages to topic %s', num_of_msg, topic) except Exception as ex: raise falcon.HTTPServiceUnavailable('Service unavailable', str(ex), 60) @staticmethod def _is_message_valid(message): """Validates message before sending. Methods checks if message is :py:class:`model.Envelope`. By being instance of this class it is ensured that all required keys are found and they will have their values. """ return message and isinstance(message, model.Envelope) def _after_publish(self, send_count, to_send_count): """Executed after publishing to sent metrics. :param int send_count: how many messages have been sent :param int to_send_count: how many messages should be sent """ failed_to_send = to_send_count - send_count if failed_to_send == 0: LOG.debug('Successfully published all [%d] messages', send_count) else: error_str = ('Failed to send all messages, %d ' 'messages out of %d have not been published') LOG.error(error_str, failed_to_send, to_send_count) self._logs_published_counter.increment(value=send_count) self._logs_lost_counter.increment(value=failed_to_send)