/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dts.subscribe.clients.metastore;

import com.aliyun.dts.subscribe.clients.common.Checkpoint;
import com.aliyun.dts.subscribe.clients.metastore.MetaStore;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMetaStore
implements MetaStore<Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(KafkaMetaStore.class);
    private volatile KafkaConsumer kafkaConsumer;

    public KafkaMetaStore(KafkaConsumer kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
    }

    public void resetKafkaConsumer(KafkaConsumer newConsumer) {
        this.kafkaConsumer = newConsumer;
    }

    @Override
    public Future<Checkpoint> serializeTo(final TopicPartition topicPartition, final String group, final Checkpoint value) {
        final KafkaFutureImpl ret = new KafkaFutureImpl();
        if (null != this.kafkaConsumer) {
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(value.getOffset(), String.valueOf(value.getTimeStamp()));
            this.kafkaConsumer.commitAsync(Collections.singletonMap(topicPartition, offsetAndMetadata), new OffsetCommitCallback(){

                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (null != exception) {
                        log.warn("KafkaMetaStore: Commit offset for group[" + group + "] topicPartition[" + topicPartition.toString() + "] " + value.toString() + " failed cause " + exception.getMessage(), (Throwable)exception);
                        ret.completeExceptionally((Throwable)exception);
                    } else {
                        log.debug("KafkaMetaStore:Commit offset success for group[{}] topicPartition [{}] {}", new Object[]{group, topicPartition, value});
                        ret.complete((Object)value);
                    }
                }
            });
        } else {
            log.warn("KafkaMetaStore: kafka consumer not set, ignore report");
            ret.complete((Object)value);
        }
        return ret;
    }

    @Override
    public Checkpoint deserializeFrom(TopicPartition topicPartition, String group) {
        if (null != this.kafkaConsumer) {
            OffsetAndMetadata offsetAndMetadata = this.kafkaConsumer.committed(topicPartition);
            if (null != offsetAndMetadata) {
                return new Checkpoint(topicPartition, Long.valueOf(offsetAndMetadata.metadata()), offsetAndMetadata.offset(), offsetAndMetadata.metadata());
            }
            return null;
        }
        log.warn("KafkaMetaStore: kafka consumer not set, ignore fetch offset");
        throw new KafkaException("KafkaMetaStore: kafka consumer not set, ignore fetch offset for group[" + group + "] and tp [" + topicPartition + "]");
    }
}

