/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dms.subscribe.clients;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.formats.avro.Record;
import com.aliyun.dts.subscribe.clients.record.DefaultUserRecord;
import com.aliyun.dts.subscribe.clients.recordfetcher.OffsetCommitCallBack;
import com.aliyun.dts.subscribe.clients.recordgenerator.UserRecordGenerator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UserRecordGeneratorWithDBMapping
extends UserRecordGenerator {
    private static final Logger log = LoggerFactory.getLogger(UserRecordGeneratorWithDBMapping.class);

    public UserRecordGeneratorWithDBMapping(ConsumerContext consumerContext, LinkedBlockingQueue<ConsumerRecord> toProcessRecord, LinkedBlockingQueue<DefaultUserRecord> processedRecord, OffsetCommitCallBack offsetCommitCallBack) {
        super(consumerContext, toProcessRecord, processedRecord, offsetCommitCallBack);
    }

    @Override
    public void run() {
        while (!this.consumerContext.isExited()) {
            ConsumerRecord toProcess = null;
            Record record = null;
            int fetchFailedCount = 0;
            try {
                while (null == (toProcess = (ConsumerRecord)this.toProcessRecord.peek()) && !this.consumerContext.isExited()) {
                    Util.sleepMS(5L);
                    if (++fetchFailedCount % 1000 != 0 || !this.consumerContext.hasValidTopicPartitions()) continue;
                    log.info("UserRecordGenerator: haven't receive records from generator for  5s");
                }
                if (this.consumerContext.isExited()) {
                    return;
                }
                ConsumerRecord consumerRecord = toProcess;
                consumerRecord.timestamp();
                record = this.fastDeserializer.deserialize((byte[])consumerRecord.value());
                log.debug("UserRecordGenerator: meet [{}] record type", (Object)record.getOperation());
                if (this.consumerContext.getDbMapper() != null && this.consumerContext.getDbMapper().isMapping()) {
                    record = this.consumerContext.getDbMapper().transform(record);
                }
                DefaultUserRecord defaultUserRecord = new DefaultUserRecord(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), record, (tp, commitRecord, offset, metadata) -> {
                    this.recordStoreOutCountSensor.record(1.0);
                    this.recordStoreOutByteSensor.record((double)((byte[])consumerRecord.value()).length);
                    this.commitCheckpoint = new Checkpoint(tp, commitRecord.getSourceTimestamp(), offset, metadata);
                    this.commit();
                });
                int offerTryCount = 0;
                while (!this.offerRecord(1000, TimeUnit.MILLISECONDS, defaultUserRecord) && !this.consumerContext.isExited()) {
                    if (++offerTryCount % 10 != 0) continue;
                    log.info("UserRecordGenerator: offer user record has failed for a period (10s) [ " + (Object)((Object)record) + "]");
                }
                this.toProcessRecord.poll();
            }
            catch (Exception e) {
                log.error("UserRecordGenerator: process record failed, raw consumer record [" + toProcess + "], parsed record [" + record + "], cause " + e.getMessage(), (Throwable)e);
                this.consumerContext.exit();
            }
        }
    }

    private void commit() {
        if (null != this.offsetCommitCallBack && this.commitCheckpoint.getTopicPartition() != null && this.commitCheckpoint.getOffset() != -1L) {
            this.offsetCommitCallBack.commit(this.commitCheckpoint.getTopicPartition(), this.commitCheckpoint.getTimeStamp(), this.commitCheckpoint.getOffset(), this.commitCheckpoint.getInfo());
        }
    }
}

