/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink.testutils;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.connector.kafka.sink.testutils.KafkaDataReader;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSinkExternalContext
implements DataStreamSinkV2ExternalContext<String> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class);
    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
    private static final long DEFAULT_TIMEOUT = 30L;
    private static final int RANDOM_STRING_MAX_LENGTH = 50;
    private static final int NUM_RECORDS_UPPER_BOUND = 500;
    private static final int NUM_RECORDS_LOWER_BOUND = 100;
    private static final int DEFAULT_TRANSACTION_TIMEOUT_IN_MS = 900000;
    protected String bootstrapServers;
    protected final String topicName;
    private final List<ExternalSystemDataReader<String>> readers = new ArrayList<ExternalSystemDataReader<String>>();
    protected int numSplits = 0;
    private List<URL> connectorJarPaths;
    protected final AdminClient kafkaAdminClient;

    public KafkaSinkExternalContext(String bootstrapServers, List<URL> connectorJarPaths) {
        this.bootstrapServers = bootstrapServers;
        this.connectorJarPaths = connectorJarPaths;
        this.topicName = "kafka-single-topic-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
        this.kafkaAdminClient = this.createAdminClient();
    }

    private void createTopic(String topicName, int numPartitions, short replicationFactor) {
        LOG.debug("Creating new Kafka topic {} with {} partitions and {} replicas", new Object[]{topicName, numPartitions, replicationFactor});
        NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
        try {
            this.kafkaAdminClient.createTopics(Collections.singletonList(newTopic)).all().get(30L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e);
        }
    }

    private void deleteTopic(String topicName) {
        block2: {
            LOG.debug("Deleting Kafka topic {}", (Object)topicName);
            try {
                this.kafkaAdminClient.deleteTopics(Collections.singletonList(topicName)).all().get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                if (!(ExceptionUtils.getRootCause((Throwable)e) instanceof UnknownTopicOrPartitionException)) break block2;
                throw new RuntimeException(String.format("Cannot delete unknown Kafka topic '%s'", topicName), e);
            }
        }
    }

    private AdminClient createAdminClient() {
        Properties config = new Properties();
        config.setProperty("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create((Properties)config);
    }

    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
        if (!this.topicExists(this.topicName)) {
            this.createTopic(this.topicName, 4, (short)1);
        }
        KafkaSinkBuilder builder = KafkaSink.builder();
        Properties properties = new Properties();
        properties.put("transaction.timeout.ms", (Object)900000);
        builder.setBootstrapServers(this.bootstrapServers).setDeliveryGuarantee(this.toDeliveryGuarantee(sinkSettings.getCheckpointingMode())).setTransactionalIdPrefix("testingFramework").setKafkaProducerConfig(properties).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(this.topicName).setValueSerializationSchema((SerializationSchema)new SimpleStringSchema()).build());
        return builder.build();
    }

    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
        LOG.info("Fetching information for topic: {}", (Object)this.topicName);
        Map<String, TopicDescription> topicMetadata = this.getTopicMetadata(Arrays.asList(this.topicName));
        HashSet<TopicPartition> subscribedPartitions = new HashSet<TopicPartition>();
        for (TopicDescription topic : topicMetadata.values()) {
            for (TopicPartitionInfo partition : topic.partitions()) {
                subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition()));
            }
        }
        Properties properties = new Properties();
        properties.setProperty("group.id", "flink-kafka-test" + subscribedPartitions.hashCode());
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        properties.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        if (CheckpointingMode.EXACTLY_ONCE.equals((Object)sinkSettings.getCheckpointingMode())) {
            properties.setProperty("isolation.level", "read_committed");
        }
        properties.setProperty("auto.offset.reset", "earliest");
        this.readers.add(new KafkaDataReader(properties, subscribedPartitions));
        return this.readers.get(this.readers.size() - 1);
    }

    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
        Random random = new Random(seed);
        ArrayList<String> randomStringRecords = new ArrayList<String>();
        int recordNum = random.nextInt(400) + 100;
        for (int i = 0; i < recordNum; ++i) {
            int stringLength = random.nextInt(50) + 1;
            randomStringRecords.add(RandomStringUtils.random((int)stringLength, (boolean)true, (boolean)true));
        }
        return randomStringRecords;
    }

    protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {
        try {
            return (Map)this.kafkaAdminClient.describeTopics(topics).all().get();
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Failed to get metadata for topics %s.", topics), e);
        }
    }

    private boolean topicExists(String topic) {
        try {
            this.kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
            return true;
        }
        catch (Exception e) {
            return false;
        }
    }

    public void close() {
        if (this.numSplits != 0) {
            this.deleteTopic(this.topicName);
        }
        this.readers.stream().filter(Objects::nonNull).forEach(reader -> {
            try {
                reader.close();
            }
            catch (Exception e) {
                if (this.kafkaAdminClient != null) {
                    this.kafkaAdminClient.close();
                }
                throw new RuntimeException("Cannot close split writer", e);
            }
        });
        this.readers.clear();
        if (this.kafkaAdminClient != null) {
            this.kafkaAdminClient.close();
        }
    }

    public String toString() {
        return "Single-topic Kafka";
    }

    public List<URL> getConnectorJarPaths() {
        return this.connectorJarPaths;
    }

    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

    private DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpointingMode) {
        switch (checkpointingMode) {
            case EXACTLY_ONCE: {
                return DeliveryGuarantee.EXACTLY_ONCE;
            }
            case AT_LEAST_ONCE: {
                return DeliveryGuarantee.AT_LEAST_ONCE;
            }
        }
        throw new IllegalArgumentException(String.format("Only exactly-once and al-least-once checkpointing mode are supported, but actual is %s.", checkpointingMode));
    }
}

