/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dts.subscribe.clients.recordgenerator;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.common.WorkThread;
import com.aliyun.dts.subscribe.clients.formats.avro.Record;
import com.aliyun.dts.subscribe.clients.record.DefaultUserRecord;
import com.aliyun.dts.subscribe.clients.recordfetcher.OffsetCommitCallBack;
import com.aliyun.dts.subscribe.clients.recordgenerator.AvroDeserializer;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.metrics.stats.Total;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserRecordGenerator
implements Runnable,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(UserRecordGenerator.class);
    private ConsumerContext consumerContext;
    private final LinkedBlockingQueue<ConsumerRecord> toProcessRecord;
    private final AvroDeserializer fastDeserializer;
    private final LinkedBlockingQueue<DefaultUserRecord> processedRecord;
    private volatile Checkpoint commitCheckpoint;
    private WorkThread commitThread;
    private final OffsetCommitCallBack offsetCommitCallBack;
    private Metrics metrics;
    private final Sensor recordStoreOutCountSensor;
    private final Sensor recordStoreOutByteSensor;

    public UserRecordGenerator(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> toProcessRecord, LinkedBlockingQueue<DefaultUserRecord> processedRecord, OffsetCommitCallBack offsetCommitCallBack) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = toProcessRecord;
        this.fastDeserializer = new AvroDeserializer();
        this.processedRecord = processedRecord;
        this.offsetCommitCallBack = offsetCommitCallBack;
        this.commitCheckpoint = new Checkpoint(null, -1L, -1L, "-1");
        this.metrics = consumerContext.getDtsMetrics().getCoreMetrics();
        this.metrics.addMetric(this.metrics.metricName("DStoreRecordQueue", "UserRecordGenerator"), (config, now) -> toProcessRecord.size());
        this.metrics.addMetric(this.metrics.metricName("DefaultUserRecordQueue", "UserRecordGenerator"), (config, now) -> processedRecord.size());
        this.recordStoreOutCountSensor = this.metrics.sensor("record-store-out-row");
        this.recordStoreOutCountSensor.add(this.metrics.metricName("outCounts", "recordstore"), (MeasurableStat)new Total());
        this.recordStoreOutCountSensor.add(this.metrics.metricName("outRps", "recordstore"), (MeasurableStat)new SimpleRate());
        this.recordStoreOutByteSensor = this.metrics.sensor("record-store-out-byte");
        this.recordStoreOutByteSensor.add(this.metrics.metricName("outBytes", "recordstore"), (MeasurableStat)new Total());
        this.recordStoreOutByteSensor.add(this.metrics.metricName("outBps", "recordstore"), (MeasurableStat)new SimpleRate());
    }

    @Override
    public void run() {
        while (!this.consumerContext.isExited()) {
            ConsumerRecord toProcess = null;
            Record record = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = this.toProcessRecord.peek()) && !this.consumerContext.isExited()) {
                    Util.sleepMS(5L);
                    if (++fetchFailedCount % 1000 != 0 || !this.consumerContext.hasValidTopicPartitions()) continue;
                    log.info("UserRecordGenerator: haven't receive records from generator for  5s");
                }
                if (this.consumerContext.isExited()) {
                    return;
                }
                ConsumerRecord consumerRecord = toProcess;
                consumerRecord.timestamp();
                record = this.fastDeserializer.deserialize((byte[])consumerRecord.value());
                log.debug("UserRecordGenerator: meet [{}] record type", (Object)record.getOperation());
                DefaultUserRecord defaultUserRecord = new DefaultUserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), record, (tp, commitRecord, offset, metadata) -> {
                    this.recordStoreOutCountSensor.record(1.0);
                    this.recordStoreOutByteSensor.record((double)((byte[])consumerRecord.value()).length);
                    this.commitCheckpoint = new Checkpoint(tp, commitRecord.getSourceTimestamp(), offset, metadata);
                    this.commit();
                });
                int offerTryCount = 0;
                while (!this.offerRecord(1000, TimeUnit.MILLISECONDS, defaultUserRecord) && !this.consumerContext.isExited()) {
                    if (++offerTryCount % 10 != 0) continue;
                    log.info("UserRecordGenerator: offer user record has failed for a period (10s) [ " + (Object)((Object)record) + "]");
                }
                this.toProcessRecord.poll();
            }
            catch (Exception e) {
                log.error("UserRecordGenerator: process record failed, raw consumer record [" + toProcess + "], parsed record [" + record + "], cause " + e.getMessage(), (Throwable)e);
                this.consumerContext.exit();
            }
        }
    }

    private boolean offerRecord(int timeOut, TimeUnit timeUnit, DefaultUserRecord defaultUserRecord) {
        try {
            return this.processedRecord.offer(defaultUserRecord, timeOut, timeUnit);
        }
        catch (Exception e) {
            log.error("UserRecordGenerator: offer record failed, record[" + defaultUserRecord + "], cause " + e.getMessage(), (Throwable)e);
            return false;
        }
    }

    @Override
    public void close() throws IOException {
        this.consumerContext.exit();
        this.commitThread.stop();
    }

    private void commit() {
        if (null != this.offsetCommitCallBack && this.commitCheckpoint.getTopicPartition() != null && this.commitCheckpoint.getOffset() != -1L) {
            this.offsetCommitCallBack.commit(this.commitCheckpoint.getTopicPartition(), this.commitCheckpoint.getTimeStamp(), this.commitCheckpoint.getOffset(), this.commitCheckpoint.getInfo());
        }
    }
}

