package com.aliyun.dts.subscribe.clients.recordgenerator;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.common.WorkThread;
import com.aliyun.dts.subscribe.clients.formats.avro.Record;
import com.aliyun.dts.subscribe.clients.record.DefaultUserRecord;
import com.aliyun.dts.subscribe.clients.recordfetcher.OffsetCommitCallBack;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.metrics.stats.Total;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/dts/subscribe/clients/recordgenerator/UserRecordGenerator.class */
public class UserRecordGenerator implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(UserRecordGenerator.class);
    private ConsumerContext consumerContext;
    private final LinkedBlockingQueue<ConsumerRecord> toProcessRecord;
    private final LinkedBlockingQueue<DefaultUserRecord> processedRecord;
    private WorkThread commitThread;
    private final OffsetCommitCallBack offsetCommitCallBack;
    private Metrics metrics;
    private final Sensor recordStoreOutCountSensor;
    private final Sensor recordStoreOutByteSensor;
    private final AvroDeserializer fastDeserializer = new AvroDeserializer();
    private volatile Checkpoint commitCheckpoint = new Checkpoint(null, -1, -1, "-1");

    public UserRecordGenerator(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> linkedBlockingQueue, LinkedBlockingQueue<DefaultUserRecord> linkedBlockingQueue2, OffsetCommitCallBack offsetCommitCallBack) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = linkedBlockingQueue;
        this.processedRecord = linkedBlockingQueue2;
        this.offsetCommitCallBack = offsetCommitCallBack;
        this.metrics = consumerContext.getDtsMetrics().getCoreMetrics();
        this.metrics.addMetric(this.metrics.metricName("DStoreRecordQueue", "UserRecordGenerator"), (metricConfig, j) -> {
            return linkedBlockingQueue.size();
        });
        this.metrics.addMetric(this.metrics.metricName("DefaultUserRecordQueue", "UserRecordGenerator"), (metricConfig2, j2) -> {
            return linkedBlockingQueue2.size();
        });
        this.recordStoreOutCountSensor = this.metrics.sensor("record-store-out-row");
        this.recordStoreOutCountSensor.add(this.metrics.metricName("outCounts", "recordstore"), new Total());
        this.recordStoreOutCountSensor.add(this.metrics.metricName("outRps", "recordstore"), new SimpleRate());
        this.recordStoreOutByteSensor = this.metrics.sensor("record-store-out-byte");
        this.recordStoreOutByteSensor.add(this.metrics.metricName("outBytes", "recordstore"), new Total());
        this.recordStoreOutByteSensor.add(this.metrics.metricName("outBps", "recordstore"), new SimpleRate());
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.consumerContext.isExited()) {
            ConsumerRecord consumerRecord = null;
            int i = 0;
            while (true) {
                try {
                    ConsumerRecord peek = this.toProcessRecord.peek();
                    consumerRecord = peek;
                    if (null != peek || this.consumerContext.isExited()) {
                        break;
                    }
                    Util.sleepMS(5L);
                    i++;
                    if (i % 1000 == 0 && this.consumerContext.hasValidTopicPartitions()) {
                        log.info("UserRecordGenerator: haven't receive records from generator for  5s");
                    }
                } catch (Exception e) {
                    log.error("UserRecordGenerator: process record failed, raw consumer record [" + consumerRecord + "], parsed record [" + ((Object) null) + "], cause " + e.getMessage(), e);
                    this.consumerContext.exit();
                }
            }
            if (this.consumerContext.isExited()) {
                return;
            }
            consumerRecord.timestamp();
            Record deserialize = this.fastDeserializer.deserialize((byte[]) consumerRecord.value());
            log.debug("UserRecordGenerator: meet [{}] record type", deserialize.getOperation());
            DefaultUserRecord defaultUserRecord = new DefaultUserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), deserialize, (topicPartition, record, j, str) -> {
                this.recordStoreOutCountSensor.record(1.0d);
                this.recordStoreOutByteSensor.record(((byte[]) consumerRecord.value()).length);
                this.commitCheckpoint = new Checkpoint(topicPartition, record.getSourceTimestamp().longValue(), j, str);
                commit();
            });
            int i2 = 0;
            while (!offerRecord(1000, TimeUnit.MILLISECONDS, defaultUserRecord) && !this.consumerContext.isExited()) {
                i2++;
                if (i2 % 10 == 0) {
                    log.info("UserRecordGenerator: offer user record has failed for a period (10s) [ " + deserialize + "]");
                }
            }
            this.toProcessRecord.poll();
        }
    }

    private boolean offerRecord(int i, TimeUnit timeUnit, DefaultUserRecord defaultUserRecord) {
        try {
            return this.processedRecord.offer(defaultUserRecord, i, timeUnit);
        } catch (Exception e) {
            log.error("UserRecordGenerator: offer record failed, record[" + defaultUserRecord + "], cause " + e.getMessage(), e);
            return false;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.consumerContext.exit();
        this.commitThread.stop();
    }

    private void commit() {
        if (null == this.offsetCommitCallBack || this.commitCheckpoint.getTopicPartition() == null || this.commitCheckpoint.getOffset() == -1) {
            return;
        }
        this.offsetCommitCallBack.commit(this.commitCheckpoint.getTopicPartition(), this.commitCheckpoint.getTimeStamp(), this.commitCheckpoint.getOffset(), this.commitCheckpoint.getInfo());
    }
}
