Source code for monasca_log_api.healthcheck.kafka_check
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import kafka.client as client
from oslo_config import cfg
from oslo_log import log
LOG = log.getLogger(__name__)
CONF = cfg.CONF
kafka_check_opts = [
    cfg.StrOpt('kafka_url',
               required=True,
               help='Url to kafka server'),
    cfg.ListOpt('kafka_topics',
                required=True,
                default=['logs'],
                help='Verify existence of configured topics')
]
kafka_check_group = cfg.OptGroup(name='kafka_healthcheck',
                                 title='kafka_healthcheck')
cfg.CONF.register_group(kafka_check_group)
cfg.CONF.register_opts(kafka_check_opts, kafka_check_group)
CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message'])
"""Result from the healthcheck, contains healthy(boolean) and message"""
# TODO(feature) monasca-common candidate
[docs]class KafkaHealthCheck(object):
    """Evaluates kafka health
    Healthcheck verifies if:
    * kafka server is up and running
    * there is a configured topic in kafka
    If following conditions are met healthcheck returns healthy status.
    Otherwise unhealthy status is returned with explanation.
     Example of middleware configuration:
    .. code-block:: ini
      [kafka_healthcheck]
      kafka_url = localhost:8900
      kafka_topics = log
    Note:
        It is possible to specify multiple topics if necessary.
        Just separate them with ,
    """
[docs]    def healthcheck(self):
        url = CONF.kafka_healthcheck.kafka_url
        try:
            kafka_client = client.KafkaClient(hosts=url)
        except client.KafkaUnavailableError as ex:
            LOG.error(repr(ex))
            error_str = 'Could not connect to kafka at %s' % url
            return CheckResult(healthy=False, message=error_str)
        result = self._verify_topics(kafka_client)
        self._disconnect_gracefully(kafka_client)
        return result
    # noinspection PyMethodMayBeStatic 
    def _verify_topics(self, kafka_client):
        topics = CONF.kafka_healthcheck.kafka_topics
        for t in topics:
            # kafka client loads metadata for topics as fast
            # as possible (happens in __init__), therefore this
            # topic_partitions is sure to be filled
            for_topic = t in kafka_client.topic_partitions
            if not for_topic:
                error_str = 'Kafka: Topic %s not found' % t
                LOG.error(error_str)
                return CheckResult(healthy=False, message=error_str)
        return CheckResult(healthy=True, message='OK')
    # noinspection PyMethodMayBeStatic
    def _disconnect_gracefully(self, kafka_client):
        # at this point, client is connected so it must be closed
        # regardless of topic existence
        try:
            kafka_client.close()
        except Exception as ex:
            # log that something went wrong and move on
            LOG.error(repr(ex))