/*
 * Decompiled with CFR 0.152.
 */
package monasca.persister;

import com.codahale.metrics.health.HealthCheck;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import io.dropwizard.Application;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.PersisterModule;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaChannelFactory;
import monasca.persister.consumer.KafkaConsumer;
import monasca.persister.consumer.KafkaConsumerFactory;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import monasca.persister.consumer.KafkaConsumerRunnableBasicFactory;
import monasca.persister.consumer.ManagedConsumer;
import monasca.persister.consumer.ManagedConsumerFactory;
import monasca.persister.healthcheck.SimpleHealthCheck;
import monasca.persister.pipeline.ManagedPipeline;
import monasca.persister.pipeline.ManagedPipelineFactory;
import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.resource.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersisterApplication
extends Application<PersisterConfig> {
    private static final Logger logger = LoggerFactory.getLogger(PersisterApplication.class);

    public static void main(String[] args) throws Exception {
        if (args.length == 1 && args[0].toLowerCase().contains("version")) {
            PersisterApplication.showVersion();
            System.exit(0);
        }
        new PersisterApplication().run(args);
    }

    private static void showVersion() {
        Package pkg = Package.getPackage("monasca.persister");
        System.out.println("-------- Version Information --------");
        System.out.println(pkg.getImplementationVersion());
    }

    public void initialize(Bootstrap<PersisterConfig> bootstrap) {
    }

    public String getName() {
        return "monasca-persister";
    }

    public void run(PersisterConfig configuration, Environment environment) throws Exception {
        Injector injector = Guice.createInjector((Module[])new Module[]{new PersisterModule(configuration, environment)});
        environment.jersey().register((Object)new Resource());
        environment.healthChecks().register("test-health-check", (HealthCheck)new SimpleHealthCheck());
        KafkaChannelFactory kafkaChannelFactory = (KafkaChannelFactory)injector.getInstance(KafkaChannelFactory.class);
        ManagedConsumerFactory metricManagedConsumerFactory = (ManagedConsumerFactory)injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<ManagedConsumerFactory<MetricEnvelope[]>>(){}));
        KafkaConsumerFactory kafkaMetricConsumerFactory = (KafkaConsumerFactory)injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<KafkaConsumerFactory<MetricEnvelope[]>>(){}));
        KafkaConsumerRunnableBasicFactory kafkaMetricConsumerRunnableBasicFactory = (KafkaConsumerRunnableBasicFactory)injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<KafkaConsumerRunnableBasicFactory<MetricEnvelope[]>>(){}));
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
        int totalNumberOfThreads = configuration.getMetricConfiguration().getNumThreads() + configuration.getAlarmHistoryConfiguration().getNumThreads();
        ExecutorService executorService = Executors.newFixedThreadPool(totalNumberOfThreads, threadFactory);
        for (int i = 0; i < configuration.getMetricConfiguration().getNumThreads(); ++i) {
            String threadId = "metric-" + String.valueOf(i);
            KafkaChannel kafkaMetricChannel = kafkaChannelFactory.create(configuration.getMetricConfiguration(), threadId);
            ManagedPipeline<MetricEnvelope[]> managedMetricPipeline = this.getMetricPipeline(configuration, threadId, injector);
            KafkaConsumerRunnableBasic<MetricEnvelope[]> kafkaMetricConsumerRunnableBasic = kafkaMetricConsumerRunnableBasicFactory.create(managedMetricPipeline, kafkaMetricChannel, threadId);
            KafkaConsumer<MetricEnvelope[]> kafkaMetricConsumer = kafkaMetricConsumerFactory.create(kafkaMetricConsumerRunnableBasic, threadId, executorService);
            ManagedConsumer<MetricEnvelope[]> managedMetricConsumer = metricManagedConsumerFactory.create(kafkaMetricConsumer, threadId);
            environment.lifecycle().manage(managedMetricConsumer);
        }
        ManagedConsumerFactory alarmStateTransitionsManagedConsumerFactory = (ManagedConsumerFactory)injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<ManagedConsumerFactory<AlarmStateTransitionedEvent>>(){}));
        KafkaConsumerFactory kafkaAlarmStateTransitionConsumerFactory = (KafkaConsumerFactory)injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<KafkaConsumerFactory<AlarmStateTransitionedEvent>>(){}));
        KafkaConsumerRunnableBasicFactory kafkaAlarmStateTransitionConsumerRunnableBasicFactory = (KafkaConsumerRunnableBasicFactory)injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<KafkaConsumerRunnableBasicFactory<AlarmStateTransitionedEvent>>(){}));
        for (int i = 0; i < configuration.getAlarmHistoryConfiguration().getNumThreads(); ++i) {
            String threadId = "alarm-state-transition-" + String.valueOf(i);
            KafkaChannel kafkaAlarmStateTransitionChannel = kafkaChannelFactory.create(configuration.getAlarmHistoryConfiguration(), threadId);
            ManagedPipeline<AlarmStateTransitionedEvent> managedAlarmStateTransitionPipeline = this.getAlarmStateHistoryPipeline(configuration, threadId, injector);
            KafkaConsumerRunnableBasic<AlarmStateTransitionedEvent> kafkaAlarmStateTransitionConsumerRunnableBasic = kafkaAlarmStateTransitionConsumerRunnableBasicFactory.create(managedAlarmStateTransitionPipeline, kafkaAlarmStateTransitionChannel, threadId);
            KafkaConsumer<AlarmStateTransitionedEvent> kafkaAlarmStateTransitionConsumer = kafkaAlarmStateTransitionConsumerFactory.create(kafkaAlarmStateTransitionConsumerRunnableBasic, threadId, executorService);
            ManagedConsumer<AlarmStateTransitionedEvent> managedAlarmStateTransitionConsumer = alarmStateTransitionsManagedConsumerFactory.create(kafkaAlarmStateTransitionConsumer, threadId);
            environment.lifecycle().manage(managedAlarmStateTransitionConsumer);
        }
    }

    private ManagedPipeline<MetricEnvelope[]> getMetricPipeline(PersisterConfig configuration, String threadId, Injector injector) {
        logger.debug("Creating metric pipeline [{}]...", (Object)threadId);
        int batchSize = configuration.getMetricConfiguration().getBatchSize();
        logger.debug("Batch size for metric pipeline [{}]", (Object)batchSize);
        MetricHandlerFactory metricEventHandlerFactory = (MetricHandlerFactory)injector.getInstance(MetricHandlerFactory.class);
        ManagedPipelineFactory managedPipelineFactory = (ManagedPipelineFactory)injector.getInstance(Key.get((TypeLiteral)new TypeLiteral<ManagedPipelineFactory<MetricEnvelope[]>>(){}));
        ManagedPipeline<MetricEnvelope[]> pipeline = managedPipelineFactory.create(metricEventHandlerFactory.create(configuration.getMetricConfiguration(), threadId, batchSize), threadId);
        logger.debug("Instance of metric pipeline [{}] fully created", (Object)threadId);
        return pipeline;
    }

    public ManagedPipeline<AlarmStateTransitionedEvent> getAlarmStateHistoryPipeline(PersisterConfig configuration, String threadId, Injector injector) {
        logger.debug("Creating alarm state history pipeline [{}]...", (Object)threadId);
        int batchSize = configuration.getAlarmHistoryConfiguration().getBatchSize();
        logger.debug("Batch size for each AlarmStateHistoryPipeline [{}]", (Object)batchSize);
        AlarmStateTransitionHandlerFactory alarmHistoryEventHandlerFactory = (AlarmStateTransitionHandlerFactory)injector.getInstance(AlarmStateTransitionHandlerFactory.class);
        ManagedPipelineFactory alarmStateTransitionPipelineFactory = (ManagedPipelineFactory)injector.getInstance((Key)new Key<ManagedPipelineFactory<AlarmStateTransitionedEvent>>(){});
        ManagedPipeline<AlarmStateTransitionedEvent> pipeline = alarmStateTransitionPipelineFactory.create(alarmHistoryEventHandlerFactory.create(configuration.getAlarmHistoryConfiguration(), threadId, batchSize), threadId);
        logger.debug("Instance of alarm state history pipeline [{}] fully created", (Object)threadId);
        return pipeline;
    }
}

