/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.drc.clusterclient.impl;

import com.aliyun.drc.client.Listener;
import com.aliyun.drc.client.message.DataMessage;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.impl.ClientCluster;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import com.aliyun.drc.clusterclient.partition.Checkpoint;
import com.aliyun.drc.clusterclient.partition.Partition;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DrcClientListener
implements Listener {
    private static Logger logger = LoggerFactory.getLogger(DrcClientListener.class);
    private static final long LOG_TAGGING_PERIOD_IN_MS = 60000L;
    private Partition partition;
    private ClusterListener clusterListener;
    private ClientCluster clientCluster;
    private long lastStatisticsTime = 0L;

    public DrcClientListener(ClusterListener clusterListener, ClientCluster clientCluster) {
        this.clusterListener = clusterListener;
        this.clientCluster = clientCluster;
    }

    private void logTaggingInPeriod(DataMessage.Record record) {
        long currentTime = System.currentTimeMillis();
        if (this.lastStatisticsTime <= 0L) {
            this.lastStatisticsTime = currentTime;
        }
        if (currentTime - this.lastStatisticsTime >= 60000L) {
            this.lastStatisticsTime = currentTime;
            Checkpoint checkpoint = new Checkpoint(record);
            logger.info("checkpoint tagging in msgQueue:[" + checkpoint.toString() + "], msgQueue size:" + this.clusterListener.getMessageQueueSize());
        }
    }

    public void notify(DataMessage message) throws Exception {
        ArrayList<ClusterMessage> messages = new ArrayList<ClusterMessage>();
        block3: for (DataMessage.Record record : message.getRecordList()) {
            this.logTaggingInPeriod(record);
            switch (record.getOpt()) {
                case HEARTBEAT: {
                    this.partition.pushRecordToPartition(record);
                    continue block3;
                }
            }
            ClusterMessage msg = new ClusterMessage(record, this.partition);
            messages.add(msg);
            this.partition.pushRecordToPartition(record);
        }
        if (messages.size() == 0) {
            return;
        }
        long timeNeedSleep = this.clusterListener.notifyMessages(this.partition, messages);
        if (timeNeedSleep > 0L) {
            TimeUnit.MILLISECONDS.sleep(timeNeedSleep);
        }
    }

    public void notifyRuntimeLog(String level, String log) throws Exception {
        if (level.equalsIgnoreCase("INFO")) {
            logger.info(log);
        } else if (level.equalsIgnoreCase("ERROR")) {
            logger.error(log);
        } else if (level.equalsIgnoreCase("WARN")) {
            logger.warn(log);
        } else {
            logger.warn(log);
        }
    }

    public void handleException(Exception e) {
        try {
            logger.warn("Handle DrcClientListener exception, stop partition...", (Throwable)e);
            Thread doStopThread = new Thread(new Runnable(){

                @Override
                public void run() {
                    DrcClientListener.this.clientCluster.doStop(DrcClientListener.this.partition.getName());
                }
            });
            doStopThread.setName("DTS-DoStop-Thread");
            doStopThread.start();
            doStopThread.join();
        }
        catch (Exception e1) {
            logger.error("DrcClientListener do stop exception.", (Throwable)e1);
        }
    }

    public ClusterListener getListener() {
        return this.clusterListener;
    }

    public void setPartition(Partition partition) {
        this.partition = partition;
    }

    public Partition getPartition() {
        return this.partition;
    }
}

