/*
 * Decompiled with CFR 0.152.
 */
package monasca.thresh.infrastructure.thresholding;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import monasca.common.configuration.KafkaConsumerConfiguration;
import monasca.common.configuration.KafkaConsumerProperties;
import monasca.thresh.KafkaSpoutConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaSpout
extends BaseRichSpout
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSpout.class);
    private static final long serialVersionUID = 744004533863562119L;
    private final KafkaSpoutConfig kafkaSpoutConfig;
    private transient ConsumerConnector consumerConnector;
    private transient List<KafkaStream<byte[], byte[]>> streams = null;
    private SpoutOutputCollector collector;
    private volatile boolean shouldContinue;
    private byte[] message;
    private Thread readerThread;
    private String spoutName;
    private boolean waiting = false;

    protected KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) {
        this.kafkaSpoutConfig = kafkaSpoutConfig;
    }

    public void activate() {
        logger.info("Activated");
        if (this.streams == null) {
            HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(this.kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic(), new Integer(1));
            Map consumerMap = this.consumerConnector.createMessageStreams(topicCountMap);
            this.streams = (List)consumerMap.get(this.kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
        }
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        logger.info("Opened");
        this.collector = collector;
        logger.info(" topic = " + this.kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
        this.spoutName = String.format("%s-%d", context.getThisComponentId(), context.getThisTaskId());
        Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties((KafkaConsumerConfiguration)this.kafkaSpoutConfig.kafkaConsumerConfiguration);
        kafkaProperties.setProperty("consumer.id", String.valueOf(context.getThisTaskId()));
        ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
        this.consumerConnector = Consumer.createJavaConsumerConnector((ConsumerConfig)consumerConfig);
    }

    public synchronized void deactivate() {
        logger.info("deactivated");
        this.consumerConnector.shutdown();
        this.shouldContinue = false;
        this.notify();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (this.shouldContinue) {
            ConsumerIterator it = this.streams.get(0).iterator();
            if (!it.hasNext()) continue;
            byte[] message = (byte[])it.next().message();
            KafkaSpout kafkaSpout = this;
            synchronized (kafkaSpout) {
                this.message = message;
                if (this.waiting) {
                    this.notify();
                }
                while (this.message != null && this.shouldContinue) {
                    try {
                        this.wait();
                    }
                    catch (InterruptedException e) {
                        logger.info("Wait interrupted", (Throwable)e);
                    }
                }
            }
        }
        logger.info("readerThread {} exited", (Object)this.readerThread.getName());
        this.readerThread = null;
    }

    public void nextTuple() {
        logger.debug("nextTuple called");
        this.checkReaderRunning();
        byte[] message = this.getMessage();
        if (message != null) {
            logger.debug("streams iterator has next");
            this.processMessage(message, this.collector);
        }
    }

    private void checkReaderRunning() {
        this.shouldContinue = true;
        if (this.readerThread == null) {
            String threadName = String.format("%s reader", this.spoutName);
            this.readerThread = new Thread((Runnable)this, threadName);
            this.readerThread.start();
            logger.info("Started Reader Thread {}", (Object)this.readerThread.getName());
        }
    }

    private byte[] tryToGetMessage() {
        byte[] result = this.message;
        if (result != null) {
            this.message = null;
            this.notify();
        }
        return result;
    }

    private synchronized byte[] getMessage() {
        byte[] result = this.tryToGetMessage();
        if (result != null) {
            return result;
        }
        this.waiting = true;
        try {
            this.wait(this.kafkaSpoutConfig.maxWaitTime.intValue());
        }
        catch (InterruptedException e) {
            logger.info("Sleep interrupted", (Throwable)e);
        }
        this.waiting = false;
        return this.tryToGetMessage();
    }

    protected abstract void processMessage(byte[] var1, SpoutOutputCollector var2);
}

