/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.prometheus.impl;

import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser;
import com.alibaba.otter.canal.prometheus.InstanceRegistry;
import com.google.common.base.Preconditions;
import io.prometheus.client.Collector;
import io.prometheus.client.CounterMetricFamily;
import io.prometheus.client.GaugeMetricFamily;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParserCollector
extends Collector
implements InstanceRegistry {
    private static final Logger logger = LoggerFactory.getLogger(ParserCollector.class);
    private static final long NANO_PER_MILLI = 1000000L;
    private static final String PUBLISH_BLOCKING = "canal_instance_publish_blocking_time";
    private static final String RECEIVED_BINLOG = "canal_instance_received_binlog_bytes";
    private static final String PARSER_MODE = "canal_instance_parser_mode";
    private static final String MODE_LABEL = "parallel";
    private static final String PARSER_LABEL = "parser";
    private static final String PUBLISH_BLOCKING_HELP = "Publish blocking time of dump thread in milliseconds";
    private static final String RECEIVED_BINLOG_HELP = "Received binlog bytes";
    private static final String MODE_HELP = "Parser mode(parallel/serial) of instance";
    private final List<String> modeLabels = Arrays.asList("destination", "parallel");
    private final List<String> parserLabels = Arrays.asList("destination", "parser");
    private final ConcurrentMap<String, ParserMetricsHolder> instances = new ConcurrentHashMap<String, ParserMetricsHolder>();

    private ParserCollector() {
    }

    public static ParserCollector instance() {
        return SingletonHolder.SINGLETON;
    }

    public List<Collector.MetricFamilySamples> collect() {
        ArrayList<Collector.MetricFamilySamples> mfs = new ArrayList<Collector.MetricFamilySamples>();
        CounterMetricFamily bytesCounter = new CounterMetricFamily(RECEIVED_BINLOG, RECEIVED_BINLOG_HELP, this.parserLabels);
        GaugeMetricFamily modeGauge = new GaugeMetricFamily(PARSER_MODE, MODE_HELP, this.modeLabels);
        CounterMetricFamily blockingCounter = new CounterMetricFamily(PUBLISH_BLOCKING, PUBLISH_BLOCKING_HELP, this.parserLabels);
        for (ParserMetricsHolder emh : this.instances.values()) {
            if (emh instanceof GroupParserMetricsHolder) {
                GroupParserMetricsHolder group = (GroupParserMetricsHolder)emh;
                for (ParserMetricsHolder semh : group.holders) {
                    this.singleCollect(bytesCounter, blockingCounter, modeGauge, semh);
                }
                continue;
            }
            this.singleCollect(bytesCounter, blockingCounter, modeGauge, emh);
        }
        mfs.add((Collector.MetricFamilySamples)bytesCounter);
        mfs.add((Collector.MetricFamilySamples)modeGauge);
        if (!blockingCounter.samples.isEmpty()) {
            mfs.add((Collector.MetricFamilySamples)blockingCounter);
        }
        return mfs;
    }

    private void singleCollect(CounterMetricFamily bytesCounter, CounterMetricFamily blockingCounter, GaugeMetricFamily modeGauge, ParserMetricsHolder holder) {
        if (holder.isParallel) {
            blockingCounter.addMetric(holder.parserLabelValues, holder.eventsPublishBlockingTime.doubleValue() / 1000000.0);
        }
        modeGauge.addMetric(holder.modeLabelValues, 1.0);
        bytesCounter.addMetric(holder.parserLabelValues, holder.receivedBinlogBytes.doubleValue());
    }

    @Override
    public void register(CanalInstance instance) {
        ParserMetricsHolder holder;
        String destination = instance.getDestination();
        CanalEventParser parser = instance.getEventParser();
        if (parser instanceof AbstractMysqlEventParser) {
            holder = this.singleHolder(destination, (AbstractMysqlEventParser)parser, "0");
        } else if (parser instanceof GroupEventParser) {
            holder = this.groupHolder(destination, (GroupEventParser)parser);
        } else {
            throw new IllegalArgumentException("CanalEventParser must be either AbstractMysqlEventParser or GroupEventParser.");
        }
        Preconditions.checkNotNull((Object)holder);
        ParserMetricsHolder old = this.instances.put(destination, holder);
        if (old != null) {
            logger.warn("Remove stale ParserCollector for instance {}.", (Object)destination);
        }
    }

    private ParserMetricsHolder singleHolder(String destination, AbstractMysqlEventParser parser, String id) {
        ParserMetricsHolder holder = new ParserMetricsHolder();
        holder.parserLabelValues = Arrays.asList(destination, id);
        holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(parser.isParallel()));
        holder.eventsPublishBlockingTime = parser.getEventsPublishBlockingTime();
        holder.receivedBinlogBytes = parser.getReceivedBinlogBytes();
        holder.isParallel = parser.isParallel();
        Preconditions.checkNotNull((Object)holder.eventsPublishBlockingTime);
        Preconditions.checkNotNull((Object)holder.receivedBinlogBytes);
        return holder;
    }

    private GroupParserMetricsHolder groupHolder(String destination, GroupEventParser group) {
        List parsers = group.getEventParsers();
        GroupParserMetricsHolder groupHolder = new GroupParserMetricsHolder();
        int num = parsers.size();
        for (int i = 0; i < num; ++i) {
            CanalEventParser parser = (CanalEventParser)parsers.get(i);
            if (parser instanceof AbstractMysqlEventParser) {
                ParserMetricsHolder single = this.singleHolder(destination, (AbstractMysqlEventParser)parser, Integer.toString(i + 1));
                groupHolder.holders.add(single);
                continue;
            }
            logger.warn("Null or non AbstractMysqlEventParser, ignore.");
        }
        return groupHolder;
    }

    @Override
    public void unregister(CanalInstance instance) {
        String destination = instance.getDestination();
        this.instances.remove(destination);
    }

    private class GroupParserMetricsHolder
    extends ParserMetricsHolder {
        private final List<ParserMetricsHolder> holders;

        private GroupParserMetricsHolder() {
            this.holders = new ArrayList<ParserMetricsHolder>();
        }
    }

    private class ParserMetricsHolder {
        private List<String> parserLabelValues;
        private List<String> modeLabelValues;
        private AtomicLong receivedBinlogBytes;
        private AtomicLong eventsPublishBlockingTime;
        private boolean isParallel;

        private ParserMetricsHolder() {
        }
    }

    private static class SingletonHolder {
        private static final ParserCollector SINGLETON = new ParserCollector();

        private SingletonHolder() {
        }
    }
}

