/*
 * Decompiled with CFR 0.152.
 */
package monasca.common.messaging.kafka;

import com.codahale.metrics.health.HealthCheck;
import com.google.common.base.Joiner;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndMetadata;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import monasca.common.messaging.kafka.KafkaConfiguration;

public class KafkaHealthCheck
extends HealthCheck {
    private final KafkaConfiguration config;

    public KafkaHealthCheck(KafkaConfiguration config) {
        this.config = config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected HealthCheck.Result check() throws Exception {
        Producer<String, String> producer = null;
        ConsumerConnector consumer = null;
        ExecutorService executor = null;
        try {
            producer = this.createProducer();
            consumer = this.createConsumer();
            KeyedMessage keyedMessage = new KeyedMessage(this.config.healthCheckTopic, null, (Object)"test");
            producer.send(keyedMessage);
            HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(this.config.healthCheckTopic, 1);
            final ConsumerIterator streamIterator = ((KafkaStream)((List)consumer.createMessageStreams(topicCountMap).get(this.config.healthCheckTopic)).get(0)).iterator();
            final Thread mainThread = Thread.currentThread();
            executor = Executors.newSingleThreadExecutor();
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    while (streamIterator.hasNext()) {
                        MessageAndMetadata data = streamIterator.next();
                        String msg = new String((byte[])data.message());
                        System.out.println("Received " + msg);
                        if (!msg.equals("test")) continue;
                        mainThread.interrupt();
                        return;
                    }
                }
            });
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException ignore) {
                // empty catch block
            }
            HealthCheck.Result result = HealthCheck.Result.healthy();
            return result;
        }
        catch (Exception e) {
            HealthCheck.Result result = HealthCheck.Result.unhealthy((Throwable)e);
            return result;
        }
        finally {
            if (executor != null) {
                executor.shutdownNow();
            }
            if (producer != null) {
                try {
                    producer.close();
                }
                catch (Exception ignore) {}
            }
            if (consumer != null) {
                consumer.commitOffsets();
                try {
                    consumer.shutdown();
                }
                catch (Exception ignore) {}
            }
        }
    }

    Producer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", Joiner.on((char)',').join((Object[])this.config.brokerUris));
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);
        return new Producer(config);
    }

    ConsumerConnector createConsumer() {
        Properties props = new Properties();
        props.put("zookeeper.connect", Joiner.on((char)',').join((Object[])this.config.zookeeperUris));
        props.put("group.id", "test");
        props.put("auto.offset.reset", "largest");
        ConsumerConfig config = new ConsumerConfig(props);
        return Consumer.createJavaConsumerConnector((ConsumerConfig)config);
    }
}

