/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dts.subscribe.clients.recordfetcher;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.exception.TimestampSeekException;
import com.aliyun.dts.subscribe.clients.metastore.KafkaMetaStore;
import com.aliyun.dts.subscribe.clients.metastore.LocalFileMetaStore;
import com.aliyun.dts.subscribe.clients.metastore.MetaStoreCenter;
import com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap;
import java.io.Closeable;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.metrics.stats.Total;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordFetcher
implements Runnable,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(KafkaRecordFetcher.class);
    private static final String LOCAL_FILE_STORE_NAME = "localCheckpointStore";
    private static final String KAFKA_STORE_NAME = "kafkaCheckpointStore";
    private static final String USER_STORE_NAME = "userCheckpointStore";
    private ConsumerContext consumerContext;
    private LinkedBlockingQueue<ConsumerRecord> toProcessRecord;
    private final int tryTime;
    private final long tryBackTimeMS;
    private final AtomicBoolean useCheckpointConfig;
    private final Checkpoint initialCheckpoint;
    private volatile Checkpoint toCommitCheckpoint = null;
    private final ConsumerContext.ConsumerSubscribeMode subscribeMode;
    private final TopicPartition topicPartition;
    private final String groupID;
    private final MetaStoreCenter metaStoreCenter = new MetaStoreCenter();
    private long nextCommitTimestamp;
    private final Sensor recordStoreInCountSensor;
    private final Sensor recordStoreInByteSensor;

    public KafkaRecordFetcher(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> toProcessRecord) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = toProcessRecord;
        this.useCheckpointConfig = new AtomicBoolean(consumerContext.isForceUseCheckpoint());
        this.initialCheckpoint = consumerContext.getInitialCheckpoint();
        this.subscribeMode = consumerContext.getSubscribeMode();
        this.topicPartition = new TopicPartition(consumerContext.getTopic(), 0);
        this.groupID = consumerContext.getGroupID();
        this.tryTime = 150;
        this.tryBackTimeMS = 10000L;
        if (consumerContext.isUseLocalCheckpointStore()) {
            this.metaStoreCenter.registerStore(this.composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, this.groupID), new LocalFileMetaStore(this.composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, this.groupID)));
        }
        if (consumerContext.getUserRegisteredStore() != null) {
            this.metaStoreCenter.registerStore(USER_STORE_NAME, consumerContext.getUserRegisteredStore());
        }
        log.info("RecordGenerator: try time [" + this.tryTime + "], try backTimeMS [" + this.tryBackTimeMS + "]");
        Metrics metrics = consumerContext.getDtsMetrics().getCoreMetrics();
        this.recordStoreInCountSensor = metrics.sensor("record-store-in-row");
        this.recordStoreInCountSensor.add(metrics.metricName("inCounts", "recordstore"), (MeasurableStat)new Total());
        this.recordStoreInCountSensor.add(metrics.metricName("inRps", "recordstore"), (MeasurableStat)new SimpleRate());
        this.recordStoreInByteSensor = metrics.sensor("record-store-in-byte");
        this.recordStoreInByteSensor.add(metrics.metricName("inBytes", "recordstore"), (MeasurableStat)new Total());
        this.recordStoreInByteSensor.add(metrics.metricName("inBps", "recordstore"), (MeasurableStat)new SimpleRate());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int haveTryTime = 0;
        String message = "first start";
        ConsumerWrap kafkaConsumerWrap = null;
        while (!this.consumerContext.isExited()) {
            try {
                kafkaConsumerWrap = this.getConsumerWrap(message);
                while (!this.consumerContext.isExited()) {
                    this.mayCommitCheckpoint();
                    ConsumerRecords<byte[], byte[]> records = kafkaConsumerWrap.poll();
                    for (ConsumerRecord record : records) {
                        int offerTryCount = 0;
                        if (record.value() == null || ((byte[])record.value()).length <= 2) continue;
                        while (!this.offerRecord(1000, TimeUnit.MILLISECONDS, (ConsumerRecord<byte[], byte[]>)record) && !this.consumerContext.isExited()) {
                            if (++offerTryCount % 10 != 0) continue;
                            log.info("KafkaRecordFetcher: offer kafka record has failed for a period (10s) [ " + record + "]");
                        }
                    }
                }
            }
            catch (Throwable e) {
                if (this.isErrorRecoverable(e) && haveTryTime++ < this.tryTime) {
                    log.warn("KafkaRecordFetcher: error meet cause " + e.getMessage() + ", recover time [" + haveTryTime + "]", e);
                    Util.sleepMS(this.tryBackTimeMS);
                    message = "reconnect";
                    continue;
                }
                log.error("KafkaRecordFetcher: unrecoverable error  " + e.getMessage() + ", have try time [" + haveTryTime + "]", e);
                this.consumerContext.exit();
            }
            finally {
                Util.swallowErrorClose(kafkaConsumerWrap);
            }
        }
    }

    private boolean offerRecord(int timeOut, TimeUnit timeUnit, ConsumerRecord<byte[], byte[]> record) {
        try {
            this.recordStoreInCountSensor.record(1.0);
            this.recordStoreInByteSensor.record((double)((byte[])record.value()).length);
            return this.toProcessRecord.offer(record, timeOut, timeUnit);
        }
        catch (Exception e) {
            log.error("UserRecordGenerator: offer record failed, record[" + record + "], cause " + e.getMessage(), (Throwable)e);
            return false;
        }
    }

    private boolean isErrorRecoverable(Throwable e) {
        return !(e instanceof TimestampSeekException);
    }

    private ConsumerWrap getConsumerWrap(String message) {
        ConsumerWrap kafkaConsumerWrap = this.getConsumerWrap();
        Checkpoint checkpoint = null;
        this.metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(kafkaConsumerWrap.getRawConsumer()));
        if (this.useCheckpointConfig.compareAndSet(true, false)) {
            log.info("RecordGenerator: force use initial checkpoint [{}] to start", (Object)checkpoint);
            checkpoint = this.initialCheckpoint;
        } else {
            checkpoint = this.getCheckpoint();
            if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
                checkpoint = this.initialCheckpoint;
                log.info("RecordGenerator: use initial checkpoint [{}] to start", (Object)checkpoint);
            } else {
                log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", (Object)checkpoint);
            }
        }
        switch (this.subscribeMode) {
            case SUBSCRIBE: {
                kafkaConsumerWrap.subscribeTopic(this.topicPartition, () -> {
                    Checkpoint ret = this.getSubscribeCheckpoint();
                    if (null == ret || Checkpoint.INVALID_STREAM_CHECKPOINT == ret) {
                        log.info("Subscribe checkpoint is null, use initialCheckpoint: " + this.initialCheckpoint);
                        ret = this.initialCheckpoint;
                    }
                    return ret;
                });
                break;
            }
            case ASSIGN: {
                kafkaConsumerWrap.assignTopic(this.topicPartition, checkpoint);
                break;
            }
            default: {
                throw new RuntimeException("RecordGenerator: unknown mode not support");
            }
        }
        log.info("RecordGenerator:" + message + ", checkpoint " + checkpoint);
        return kafkaConsumerWrap;
    }

    private ConsumerWrap getConsumerWrap() {
        Properties properties = this.consumerContext.getKafkaProperties();
        return new ConsumerWrap.DefaultConsumerWrap(properties, this.consumerContext);
    }

    private Checkpoint getCheckpoint() {
        Checkpoint checkpoint = this.metaStoreCenter.seek(USER_STORE_NAME, this.topicPartition, this.groupID);
        log.info("Firstly, try load checkpoint from user defined shared store: " + checkpoint);
        if (null == checkpoint) {
            checkpoint = this.metaStoreCenter.seek(this.composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, this.groupID), this.topicPartition, this.groupID);
            log.info("User defined shared store checkpoint is null, try load checkpoint from local: " + checkpoint);
        }
        if (null == checkpoint) {
            checkpoint = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
            log.info("User defined shared store checkpoint and local checkpoint is null, try load checkpoint from DStore: " + checkpoint);
        }
        return checkpoint;
    }

    private Checkpoint getSubscribeCheckpoint() {
        Checkpoint checkpoint = this.metaStoreCenter.seek(USER_STORE_NAME, this.topicPartition, this.groupID);
        log.info("Firstly, try load checkpoint from user defined shared store: " + checkpoint);
        if (null == checkpoint) {
            checkpoint = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
            log.info("User defined shared store checkpoint is null, try load checkpoint from DStore: " + checkpoint);
        }
        return checkpoint;
    }

    private void mayCommitCheckpoint() {
        if (null != this.toCommitCheckpoint && System.currentTimeMillis() >= this.nextCommitTimestamp) {
            this.commitCheckpoint(this.toCommitCheckpoint.getTopicPartition(), this.toCommitCheckpoint);
            this.toCommitCheckpoint = null;
            this.nextCommitTimestamp = System.currentTimeMillis() + this.consumerContext.getCheckpointCommitInterval();
        }
    }

    public void commitCheckpoint(TopicPartition topicPartition, Checkpoint checkpoint) {
        if (null != topicPartition && null != checkpoint) {
            this.metaStoreCenter.store(topicPartition, this.groupID, checkpoint);
        }
    }

    public void setToCommitCheckpoint(Checkpoint committedCheckpoint) {
        this.toCommitCheckpoint = committedCheckpoint;
    }

    private String composeLocalFileStoreName(String prefix, String sid) {
        return StringUtils.join((Object[])new String[]{prefix, "-", sid});
    }

    @Override
    public void close() throws IOException {
        this.consumerContext.exit();
    }
}

