package com.aliyun.dts.subscribe.clients.metastore;

import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/dts/subscribe/clients/metastore/KafkaMetaStore.class */
public class KafkaMetaStore implements MetaStore<Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(KafkaMetaStore.class);
    private volatile KafkaConsumer kafkaConsumer;

    public KafkaMetaStore(KafkaConsumer kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
    }

    public void resetKafkaConsumer(KafkaConsumer kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
    }

    @Override // com.aliyun.dts.subscribe.clients.metastore.MetaStore
    public Future<Checkpoint> serializeTo(final TopicPartition topicPartition, final String str, final Checkpoint checkpoint) {
        final KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        if (null != this.kafkaConsumer) {
            this.kafkaConsumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(checkpoint.getOffset(), String.valueOf(checkpoint.getTimeStamp()))), new OffsetCommitCallback() { // from class: com.aliyun.dts.subscribe.clients.metastore.KafkaMetaStore.1
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                    if (null != exc) {
                        KafkaMetaStore.log.warn("KafkaMetaStore: Commit offset for group[" + str + "] topicPartition[" + topicPartition.toString() + "] " + checkpoint.toString() + " failed cause " + exc.getMessage(), exc);
                        kafkaFutureImpl.completeExceptionally(exc);
                    } else {
                        KafkaMetaStore.log.debug("KafkaMetaStore:Commit offset success for group[{}] topicPartition [{}] {}", new Object[]{str, topicPartition, checkpoint});
                        kafkaFutureImpl.complete(checkpoint);
                    }
                }
            });
        } else {
            log.warn("KafkaMetaStore: kafka consumer not set, ignore report");
            kafkaFutureImpl.complete(checkpoint);
        }
        return kafkaFutureImpl;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.aliyun.dts.subscribe.clients.metastore.MetaStore
    public Checkpoint deserializeFrom(TopicPartition topicPartition, String str) {
        if (null == this.kafkaConsumer) {
            log.warn("KafkaMetaStore: kafka consumer not set, ignore fetch offset");
            throw new KafkaException("KafkaMetaStore: kafka consumer not set, ignore fetch offset for group[" + str + "] and tp [" + topicPartition + "]");
        }
        OffsetAndMetadata committed = this.kafkaConsumer.committed(topicPartition);
        if (null == committed || !StringUtils.isNotEmpty(committed.metadata())) {
            return null;
        }
        return new Checkpoint(topicPartition, Long.valueOf(committed.metadata()).longValue(), committed.offset(), committed.metadata());
    }
}
