Source code for monasca_log_api.healthcheck.kafka_check
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import kafka.client as client
from oslo_config import cfg
from oslo_log import log
LOG = log.getLogger(__name__)
CONF = cfg.CONF
kafka_check_opts = [
cfg.StrOpt('kafka_url',
required=True,
help='Url to kafka server'),
cfg.ListOpt('kafka_topics',
required=True,
default=['logs'],
help='Verify existence of configured topics')
]
kafka_check_group = cfg.OptGroup(name='kafka_healthcheck',
title='kafka_healthcheck')
cfg.CONF.register_group(kafka_check_group)
cfg.CONF.register_opts(kafka_check_opts, kafka_check_group)
CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message'])
"""Result from the healthcheck, contains healthy(boolean) and message"""
# TODO(feature) monasca-common candidate
[docs]class KafkaHealthCheck(object):
"""Evaluates kafka health
Healthcheck verifies if:
* kafka server is up and running
* there is a configured topic in kafka
If following conditions are met healthcheck returns healthy status.
Otherwise unhealthy status is returned with explanation.
Example of middleware configuration:
.. code-block:: ini
[kafka_healthcheck]
kafka_url = localhost:8900
kafka_topics = log
Note:
It is possible to specify multiple topics if necessary.
Just separate them with ,
"""
[docs] def healthcheck(self):
url = CONF.kafka_healthcheck.kafka_url
try:
kafka_client = client.KafkaClient(hosts=url)
except client.KafkaUnavailableError as ex:
LOG.error(repr(ex))
error_str = 'Could not connect to kafka at %s' % url
return CheckResult(healthy=False, message=error_str)
result = self._verify_topics(kafka_client)
self._disconnect_gracefully(kafka_client)
return result
# noinspection PyMethodMayBeStatic
def _verify_topics(self, kafka_client):
topics = CONF.kafka_healthcheck.kafka_topics
for t in topics:
# kafka client loads metadata for topics as fast
# as possible (happens in __init__), therefore this
# topic_partitions is sure to be filled
for_topic = t in kafka_client.topic_partitions
if not for_topic:
error_str = 'Kafka: Topic %s not found' % t
LOG.error(error_str)
return CheckResult(healthy=False, message=error_str)
return CheckResult(healthy=True, message='OK')
# noinspection PyMethodMayBeStatic
def _disconnect_gracefully(self, kafka_client):
# at this point, client is connected so it must be closed
# regardless of topic existence
try:
kafka_client.close()
except Exception as ex:
# log that something went wrong and move on
LOG.error(repr(ex))