/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.prometheus.impl;

import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.prometheus.CanalInstanceExports;
import com.alibaba.otter.canal.prometheus.InstanceRegistry;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.sink.entry.EntryEventSink;
import com.google.common.base.Preconditions;
import io.prometheus.client.Collector;
import io.prometheus.client.CounterMetricFamily;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SinkCollector
extends Collector
implements InstanceRegistry {
    private static final Logger logger = LoggerFactory.getLogger(SinkCollector.class);
    private static final long NANO_PER_MILLI = 1000000L;
    private static final String SINK_BLOCKING_TIME = "canal_instance_sink_blocking_time";
    private static final String SINK_BLOCK_TIME_HELP = "Total sink blocking time in milliseconds";
    private final ConcurrentMap<String, SinkMetricsHolder> instances = new ConcurrentHashMap<String, SinkMetricsHolder>();

    private SinkCollector() {
    }

    public static SinkCollector instance() {
        return SingletonHolder.SINGLETON;
    }

    public List<Collector.MetricFamilySamples> collect() {
        ArrayList<Collector.MetricFamilySamples> mfs = new ArrayList<Collector.MetricFamilySamples>();
        CounterMetricFamily blockingCounter = new CounterMetricFamily(SINK_BLOCKING_TIME, SINK_BLOCK_TIME_HELP, CanalInstanceExports.DEST_LABELS_LIST);
        for (SinkMetricsHolder smh : this.instances.values()) {
            blockingCounter.addMetric(smh.destLabelValues, smh.eventsSinkBlockingTime.doubleValue() / 1000000.0);
        }
        mfs.add((Collector.MetricFamilySamples)blockingCounter);
        return mfs;
    }

    @Override
    public void register(CanalInstance instance) {
        String destination = instance.getDestination();
        SinkMetricsHolder holder = new SinkMetricsHolder();
        holder.destLabelValues = Collections.singletonList(destination);
        CanalEventSink sink = instance.getEventSink();
        if (!(sink instanceof EntryEventSink)) {
            throw new IllegalArgumentException("CanalEventSink must be EntryEventSink");
        }
        EntryEventSink entrySink = (EntryEventSink)sink;
        holder.eventsSinkBlockingTime = entrySink.getEventsSinkBlockingTime();
        Preconditions.checkNotNull((Object)holder.eventsSinkBlockingTime);
        SinkMetricsHolder old = this.instances.put(destination, holder);
        if (old != null) {
            logger.warn("Remote stale SinkCollector for instance {}.", (Object)destination);
        }
    }

    @Override
    public void unregister(CanalInstance instance) {
        String destination = instance.getDestination();
        this.instances.remove(destination);
    }

    private class SinkMetricsHolder {
        private AtomicLong eventsSinkBlockingTime;
        private List<String> destLabelValues;

        private SinkMetricsHolder() {
        }
    }

    private static class SingletonHolder {
        private static final SinkCollector SINGLETON = new SinkCollector();

        private SingletonHolder() {
        }
    }
}

