package com.aliyun.dts.subscribe.clients.recordfetcher;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.exception.TimestampSeekException;
import com.aliyun.dts.subscribe.clients.metastore.KafkaMetaStore;
import com.aliyun.dts.subscribe.clients.metastore.LocalFileMetaStore;
import com.aliyun.dts.subscribe.clients.metastore.MetaStoreCenter;
import com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.metrics.stats.Total;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/dts/subscribe/clients/recordfetcher/KafkaRecordFetcher.class */
public class KafkaRecordFetcher implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(KafkaRecordFetcher.class);
    private static final String LOCAL_FILE_STORE_NAME = "localCheckpointStore";
    private static final String KAFKA_STORE_NAME = "kafkaCheckpointStore";
    private static final String USER_STORE_NAME = "userCheckpointStore";
    private ConsumerContext consumerContext;
    private LinkedBlockingQueue<ConsumerRecord> toProcessRecord;
    private final AtomicBoolean useCheckpointConfig;
    private final Checkpoint initialCheckpoint;
    private final ConsumerContext.ConsumerSubscribeMode subscribeMode;
    private final TopicPartition topicPartition;
    private final String groupID;
    private long nextCommitTimestamp;
    private final Sensor recordStoreInCountSensor;
    private final Sensor recordStoreInByteSensor;
    private volatile Checkpoint toCommitCheckpoint = null;
    private final MetaStoreCenter metaStoreCenter = new MetaStoreCenter();
    private final int tryTime = 150;
    private final long tryBackTimeMS = 10000;

    public KafkaRecordFetcher(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> linkedBlockingQueue) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = linkedBlockingQueue;
        this.useCheckpointConfig = new AtomicBoolean(consumerContext.isForceUseCheckpoint());
        this.initialCheckpoint = consumerContext.getInitialCheckpoint();
        this.subscribeMode = consumerContext.getSubscribeMode();
        this.topicPartition = new TopicPartition(consumerContext.getTopic(), 0);
        this.groupID = consumerContext.getGroupID();
        this.metaStoreCenter.registerStore(LOCAL_FILE_STORE_NAME, new LocalFileMetaStore(LOCAL_FILE_STORE_NAME));
        if (consumerContext.getUserRegisteredStore() != null) {
            this.metaStoreCenter.registerStore(USER_STORE_NAME, consumerContext.getUserRegisteredStore());
        }
        log.info("RecordGenerator: try time [" + this.tryTime + "], try backTimeMS [" + this.tryBackTimeMS + "]");
        Metrics coreMetrics = consumerContext.getDtsMetrics().getCoreMetrics();
        this.recordStoreInCountSensor = coreMetrics.sensor("record-store-in-row");
        this.recordStoreInCountSensor.add(coreMetrics.metricName("inCounts", "recordstore"), new Total());
        this.recordStoreInCountSensor.add(coreMetrics.metricName("inRps", "recordstore"), new SimpleRate());
        this.recordStoreInByteSensor = coreMetrics.sensor("record-store-in-byte");
        this.recordStoreInByteSensor.add(coreMetrics.metricName("inBytes", "recordstore"), new Total());
        this.recordStoreInByteSensor.add(coreMetrics.metricName("inBps", "recordstore"), new SimpleRate());
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        String str = "first start";
        ConsumerWrap consumerWrap = null;
        while (!this.consumerContext.isExited()) {
            try {
                try {
                    consumerWrap = getConsumerWrap(str);
                    while (!this.consumerContext.isExited()) {
                        mayCommitCheckpoint();
                        Iterator it = consumerWrap.poll().iterator();
                        while (it.hasNext()) {
                            ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                            int i2 = 0;
                            if (consumerRecord.value() != null && ((byte[]) consumerRecord.value()).length > 2) {
                                while (!offerRecord(1000, TimeUnit.MILLISECONDS, consumerRecord) && !this.consumerContext.isExited()) {
                                    i2++;
                                    if (i2 % 10 == 0) {
                                        log.info("KafkaRecordFetcher: offer kafka record has failed for a period (10s) [ " + consumerRecord + "]");
                                    }
                                }
                            }
                        }
                    }
                    Util.swallowErrorClose(consumerWrap);
                } catch (Throwable th) {
                    if (isErrorRecoverable(th)) {
                        int i3 = i;
                        i++;
                        if (i3 < this.tryTime) {
                            log.warn("KafkaRecordFetcher: error meet cause " + th.getMessage() + ", recover time [" + i + "]", th);
                            Util.sleepMS(this.tryBackTimeMS);
                            str = "reconnect";
                            Util.swallowErrorClose(consumerWrap);
                        }
                    }
                    log.error("KafkaRecordFetcher: unrecoverable error  " + th.getMessage() + ", have try time [" + i + "]", th);
                    this.consumerContext.exit();
                    Util.swallowErrorClose(consumerWrap);
                }
            } catch (Throwable th2) {
                Util.swallowErrorClose(consumerWrap);
                throw th2;
            }
        }
    }

    private boolean offerRecord(int i, TimeUnit timeUnit, ConsumerRecord<byte[], byte[]> consumerRecord) {
        try {
            this.recordStoreInCountSensor.record(1.0d);
            this.recordStoreInByteSensor.record(((byte[]) consumerRecord.value()).length);
            return this.toProcessRecord.offer(consumerRecord, i, timeUnit);
        } catch (Exception e) {
            log.error("UserRecordGenerator: offer record failed, record[" + consumerRecord + "], cause " + e.getMessage(), e);
            return false;
        }
    }

    private boolean isErrorRecoverable(Throwable th) {
        return !(th instanceof TimestampSeekException);
    }

    private ConsumerWrap getConsumerWrap(String str) {
        Checkpoint checkpoint;
        ConsumerWrap consumerWrap = getConsumerWrap();
        this.metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(consumerWrap.getRawConsumer()));
        if (this.useCheckpointConfig.compareAndSet(true, false)) {
            log.info("RecordGenerator: force use initial checkpoint [{}] to start", (Object) null);
            checkpoint = this.initialCheckpoint;
        } else {
            checkpoint = getCheckpoint();
            if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
                checkpoint = this.initialCheckpoint;
                log.info("RecordGenerator: use initial checkpoint [{}] to start", checkpoint);
            } else {
                log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", checkpoint);
            }
        }
        switch (this.subscribeMode) {
            case SUBSCRIBE:
                consumerWrap.subscribeTopic(this.topicPartition, () -> {
                    Checkpoint subscribeCheckpoint = getSubscribeCheckpoint();
                    if (null == subscribeCheckpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == subscribeCheckpoint) {
                        log.info("Subscribe checkpoint is null, use initialCheckpoint: " + this.initialCheckpoint);
                        subscribeCheckpoint = this.initialCheckpoint;
                    }
                    return subscribeCheckpoint;
                });
                break;
            case ASSIGN:
                consumerWrap.assignTopic(this.topicPartition, checkpoint);
                break;
            default:
                throw new RuntimeException("RecordGenerator: unknown mode not support");
        }
        log.info("RecordGenerator:" + str + ", checkpoint " + checkpoint);
        return consumerWrap;
    }

    private ConsumerWrap getConsumerWrap() {
        return new ConsumerWrap.DefaultConsumerWrap(this.consumerContext.getKafkaProperties(), this.consumerContext);
    }

    private Checkpoint getCheckpoint() {
        Checkpoint seek = this.metaStoreCenter.seek(LOCAL_FILE_STORE_NAME, this.topicPartition, this.groupID);
        if (null == seek) {
            seek = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
        }
        if (null == seek) {
            seek = this.metaStoreCenter.seek(USER_STORE_NAME, this.topicPartition, this.groupID);
            log.info("DStore checkpoint is null, load checkpoint from user defined shared store: " + seek);
        }
        return seek;
    }

    private Checkpoint getSubscribeCheckpoint() {
        Checkpoint seek = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
        log.info("Load checkpoint from DStore: " + seek);
        if (null == seek) {
            seek = this.metaStoreCenter.seek(USER_STORE_NAME, this.topicPartition, this.groupID);
            log.info("DStore checkpoint is null, load checkpoint from user defined shared store: " + seek);
        }
        return seek;
    }

    private void mayCommitCheckpoint() {
        if (null == this.toCommitCheckpoint || System.currentTimeMillis() < this.nextCommitTimestamp) {
            return;
        }
        commitCheckpoint(this.toCommitCheckpoint.getTopicPartition(), this.toCommitCheckpoint);
        this.toCommitCheckpoint = null;
        this.nextCommitTimestamp = System.currentTimeMillis() + this.consumerContext.getCheckpointCommitInterval();
    }

    public void commitCheckpoint(TopicPartition topicPartition, Checkpoint checkpoint) {
        if (null == topicPartition || null == checkpoint) {
            return;
        }
        this.metaStoreCenter.store(topicPartition, this.groupID, checkpoint);
    }

    public void setToCommitCheckpoint(Checkpoint checkpoint) {
        this.toCommitCheckpoint = checkpoint;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.consumerContext.exit();
    }
}
