package com.taobao.drc.clusterclient;

import com.taobao.drc.clusterclient.AbstractClusterMessage;
import com.taobao.drc.clusterclient.partition.BaseCheckpoint;
import com.taobao.drc.clusterclient.partition.IPartition;
import com.taobao.drc.clusterclient.util.Gaugeable;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/taobao/drc/clusterclient/MessageNotifier.class */
public class MessageNotifier<M extends AbstractClusterMessage<?, ? extends IPartition, ? extends BaseCheckpoint>> implements Runnable, Gaugeable {
    private static final String KEY_MESSAGE_PENDING_NUM = "queue.size";
    private static final String KEY_MESSAGE_CAPACITY = "queue.capacity";
    private static final String KEY_MESSAGE_SIZE = "notified.size";
    private static final String KEY_MESSAGE_LAST_PERIOD_MS = "notified.sample_period_ms";
    private static final String KEY_MESSAGE_RPS_LAST_PERIOD = "notified.rps";
    private static final String KEY_MESSAGE_LAST_NOTIFIED_CHECKPOINT = "notified.last_notified_checkpoint";
    private static final String KEY_MESSAGE_LATENCY = "notified.latency_sec";
    private static final Logger logger = LoggerFactory.getLogger(MessageNotifier.class);
    private final int queueSize;
    private final BlockingQueue<List<M>> msgQueue;
    private final MessageListener<M> messageListener;
    private final Thread thread;
    private volatile long consumedCounter = 0;
    private volatile long lastSampleCounter = 0;
    private volatile long lastSampleMs = System.currentTimeMillis();
    private volatile M lastNotifiedMessage = null;
    private volatile boolean running;

    public MessageNotifier(MessageListener<M> messageListener, int i, String str) {
        if (messageListener == null) {
            throw new NullPointerException();
        }
        this.queueSize = i;
        this.messageListener = messageListener;
        this.thread = new Thread(this);
        this.thread.setName(str + "notifier");
        this.msgQueue = new ArrayBlockingQueue(i, true);
        this.running = true;
    }

    public void start() {
        this.thread.start();
    }

    public void shutdown() throws InterruptedException {
        this.running = false;
        this.thread.interrupt();
        this.thread.join();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                List<M> take = this.msgQueue.take();
                try {
                    this.messageListener.notify(take);
                    this.consumedCounter += take.size();
                    if (!take.isEmpty()) {
                        this.lastNotifiedMessage = take.get(take.size() - 1);
                    }
                } catch (Throwable th) {
                    this.consumedCounter += take.size();
                    throw th;
                    break;
                }
            } catch (InterruptedException e) {
                logger.info("Notified was interrupted");
            } catch (Exception e2) {
                logger.error("Caught exception in notifier", e2);
                this.messageListener.noException(e2);
            }
        }
    }

    public void onMessages(List<M> list) throws InterruptedException {
        if (list == null) {
            throw new NullPointerException();
        }
        this.msgQueue.put(list);
    }

    @Override // com.taobao.drc.clusterclient.util.Gaugeable
    public Map<String, Object> getMetrics() {
        TreeMap treeMap = new TreeMap();
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - this.lastSampleMs;
        long j2 = this.consumedCounter;
        long j3 = j2 - this.lastSampleCounter;
        this.lastSampleMs = currentTimeMillis;
        this.lastSampleCounter = j2;
        treeMap.put(KEY_MESSAGE_SIZE, Long.valueOf(this.consumedCounter));
        if (j > 0) {
            treeMap.put(KEY_MESSAGE_LAST_PERIOD_MS, Long.valueOf(j));
            treeMap.put(KEY_MESSAGE_RPS_LAST_PERIOD, Long.valueOf((j3 * 1000) / j));
        }
        treeMap.put(KEY_MESSAGE_CAPACITY, Integer.valueOf(this.queueSize));
        treeMap.put(KEY_MESSAGE_PENDING_NUM, Integer.valueOf(this.msgQueue.size()));
        if (this.lastNotifiedMessage != null) {
            treeMap.put(KEY_MESSAGE_LAST_NOTIFIED_CHECKPOINT, this.lastNotifiedMessage.getCheckpoint());
            if (this.lastNotifiedMessage.getCheckpoint() != null) {
                treeMap.put(KEY_MESSAGE_LATENCY, Long.valueOf((System.currentTimeMillis() / 1000) - Long.valueOf(((BaseCheckpoint) this.lastNotifiedMessage.getCheckpoint()).getTimestamp()).longValue()));
            }
        }
        return treeMap;
    }
}
