/*
 * Decompiled with CFR 0.152.
 */
package monasca.persister.pipeline.event;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.RepoException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlushableHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
    private final int batchSize;
    private long flushTimeMillis = System.currentTimeMillis();
    private final long millisBetweenFlushes;
    private final int secondsBetweenFlushes;
    private int msgCount = 0;
    private long batchCount = 0L;
    private final Meter processedMeter;
    private final Meter flushMeter;
    private final Timer flushTimer;
    protected final String threadId;
    protected ObjectMapper objectMapper = new ObjectMapper();
    protected final String handlerName;

    protected FlushableHandler(PipelineConfig configuration, Environment environment, String threadId, int batchSize) {
        this.threadId = threadId;
        this.handlerName = String.format("%s[%s]", this.getClass().getName(), threadId);
        this.processedMeter = environment.metrics().meter(this.handlerName + "." + "events-processed-meter");
        this.flushMeter = environment.metrics().meter(this.handlerName + "." + "flush-meter");
        this.flushTimer = environment.metrics().timer(this.handlerName + "." + "flush-timer");
        this.secondsBetweenFlushes = configuration.getMaxBatchTime();
        this.millisBetweenFlushes = this.secondsBetweenFlushes * 1000;
        this.batchSize = batchSize;
        this.initObjectMapper();
    }

    protected abstract void initObjectMapper();

    protected abstract int flushRepository() throws RepoException;

    protected abstract int process(String var1);

    public boolean onEvent(String msg) throws RepoException {
        if (msg == null) {
            if (this.isFlushTime()) {
                int msgFlushCnt = this.flush();
                return msgFlushCnt > 0;
            }
            return false;
        }
        this.msgCount += this.process(msg);
        this.processedMeter.mark();
        if (this.isBatchSize()) {
            int msgFlushCnt = this.flush();
            return msgFlushCnt > 0;
        }
        return false;
    }

    private boolean isBatchSize() {
        logger.debug("[{}]: checking batch size", (Object)this.threadId);
        if (this.msgCount >= this.batchSize) {
            logger.debug("[{}]: batch sized {} attained", (Object)this.threadId, (Object)this.batchSize);
            return true;
        }
        logger.debug("[{}]: batch size now at {}, batch size {} not attained", new Object[]{this.threadId, this.msgCount, this.batchSize});
        return false;
    }

    private boolean isFlushTime() {
        logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.", (Object)this.threadId, (Object)this.secondsBetweenFlushes);
        long now = System.currentTimeMillis();
        if (this.flushTimeMillis <= now) {
            logger.debug("[{}]: {} ms past flush time. flushing to repository now.", (Object)this.threadId, (Object)(now - this.flushTimeMillis));
            return true;
        }
        logger.debug("[{}]: {} ms to next flush time. no need to flush at this time.", (Object)this.threadId, (Object)(this.flushTimeMillis - now));
        return false;
    }

    public int flush() throws RepoException {
        logger.debug("[{}]: flushing", (Object)this.threadId);
        Timer.Context context = this.flushTimer.time();
        int msgFlushCnt = this.flushRepository();
        context.stop();
        this.flushMeter.mark();
        this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;
        logger.debug("[{}]: flushed {} msg", (Object)this.threadId, (Object)msgFlushCnt);
        this.msgCount = 0;
        ++this.batchCount;
        return msgFlushCnt;
    }

    protected long getBatchCount() {
        return this.batchCount;
    }

    protected int getMsgCount() {
        return this.msgCount;
    }
}

