package com.aliyun.dts.subscribe.clients.recordfetcher;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.common.Util;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/dts/subscribe/clients/recordfetcher/ConsumerWrap.class */
public abstract class ConsumerWrap implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerWrap.class);

    /* loaded from: input_file:com/aliyun/dts/subscribe/clients/recordfetcher/ConsumerWrap$DefaultConsumerWrap.class */
    public static class DefaultConsumerWrap extends ConsumerWrap {
        private AtomicBoolean firstStart = new AtomicBoolean(true);
        private KafkaConsumer<byte[], byte[]> consumer;
        private final long poolTimeOut;
        private final ConsumerContext consumerContext;

        public DefaultConsumerWrap(Properties properties, ConsumerContext consumerContext) {
            Properties properties2 = new Properties();
            Util.mergeSourceKafkaProperties(properties, properties2);
            checkConfig(properties2);
            this.consumer = new KafkaConsumer<>(properties2);
            this.poolTimeOut = Long.valueOf(properties.getProperty(Names.POLL_TIME_OUT, "500")).longValue();
            this.consumerContext = consumerContext;
        }

        @Override // com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap
        public void setFetchOffsetByOffset(TopicPartition topicPartition, Checkpoint checkpoint) {
            this.consumer.seek(topicPartition, checkpoint.getOffset());
        }

        @Override // com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap
        public void setFetchOffsetByTimestamp(TopicPartition topicPartition, Checkpoint checkpoint) {
            long timeStamp = checkpoint.getTimeStamp();
            OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) this.consumer.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(timeStamp))).get(topicPartition);
            if (null != offsetAndTimestamp) {
                ConsumerWrap.log.info("RecordFetcher: seek for {} with checkpoint {}", topicPartition, checkpoint);
                this.consumer.seek(topicPartition, offsetAndTimestamp.offset());
            } else {
                ConsumerWrap.log.warn("Failed seek timestamp for topic [" + topicPartition + "] with timestamp [" + timeStamp + "] failed, set to beginning");
                ConsumerWrap.log.warn("Set to beginning");
                this.consumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        }

        @Override // com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap
        public void assignTopic(TopicPartition topicPartition, Checkpoint checkpoint) {
            this.consumer.assign(Arrays.asList(topicPartition));
            this.consumerContext.setTopicPartitions(Collections.singleton(topicPartition));
            ConsumerWrap.log.info("RecordGenerator:  assigned for {} with checkpoint {}", topicPartition, checkpoint);
            setFetchOffsetByTimestamp(topicPartition, checkpoint);
        }

        @Override // com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap
        public void subscribeTopic(final TopicPartition topicPartition, final Supplier<Checkpoint> supplier) {
            this.consumer.subscribe(Arrays.asList(topicPartition.topic()), new ConsumerRebalanceListener() { // from class: com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap.DefaultConsumerWrap.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    ConsumerWrap.log.info("RecordFetcher consumer: partition revoked for [{}]", StringUtils.join(collection, ","));
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    ConsumerWrap.log.info("RecordFetcher consumer: partition assigned for [{}]", StringUtils.join(collection, ","));
                    DefaultConsumerWrap.this.consumerContext.setTopicPartitions(collection);
                    if (!DefaultConsumerWrap.this.consumerContext.hasValidTopicPartitions()) {
                        ConsumerWrap.log.warn("In subscribe mode, recordFetcher consumer dose not assigned any partition, probably this client is a backup...");
                    }
                    if (collection.contains(topicPartition)) {
                        if (!DefaultConsumerWrap.this.firstStart.compareAndSet(true, false)) {
                            ConsumerWrap.log.info("RecordFetcher consumer:  subscribe for [{}]  reassign, do nothing", topicPartition);
                            return;
                        }
                        Checkpoint checkpoint = (Checkpoint) supplier.get();
                        DefaultConsumerWrap.this.setFetchOffsetByTimestamp(topicPartition, checkpoint);
                        ConsumerWrap.log.info("RecordFetcher consumer:  subscribe for [{}] with checkpoint [{}] first start", topicPartition, checkpoint);
                    }
                }
            });
        }

        @Override // com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap
        public ConsumerRecords<byte[], byte[]> poll() {
            return this.consumer.poll(this.poolTimeOut);
        }

        @Override // com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap
        public KafkaConsumer getRawConsumer() {
            return this.consumer;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            if (null != this.consumer) {
                this.consumer.close();
            }
        }

        private void checkConfig(Properties properties) {
        }
    }

    public abstract void setFetchOffsetByOffset(TopicPartition topicPartition, Checkpoint checkpoint);

    public abstract void setFetchOffsetByTimestamp(TopicPartition topicPartition, Checkpoint checkpoint);

    public abstract void assignTopic(TopicPartition topicPartition, Checkpoint checkpoint);

    public abstract void subscribeTopic(TopicPartition topicPartition, Supplier<Checkpoint> supplier);

    public abstract ConsumerRecords<byte[], byte[]> poll();

    public abstract KafkaConsumer getRawConsumer();
}
