/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.slf4j.Logger;

public class InternalTopicManager {
    private static final String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
    private final Logger log;
    private final long windowChangeLogAdditionalRetention;
    private final Map<String, String> defaultTopicConfigs = new HashMap<String, String>();
    private final short replicationFactor;
    private final AdminClient adminClient;
    private final int retries;

    public InternalTopicManager(AdminClient adminClient, StreamsConfig streamsConfig) {
        this.adminClient = adminClient;
        LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
        this.log = logContext.logger(this.getClass());
        this.replicationFactor = streamsConfig.getInt("replication.factor").shortValue();
        this.windowChangeLogAdditionalRetention = streamsConfig.getLong("windowstore.changelog.additional.retention.ms");
        this.retries = new AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt("retries");
        this.log.debug("Configs:" + Utils.NL, new Object[]{"\t{} = {}" + Utils.NL, "\t{} = {}" + Utils.NL, "\t{} = {}", "retries", this.retries, "replication.factor", this.replicationFactor, "windowstore.changelog.additional.retention.ms", this.windowChangeLogAdditionalRetention});
        for (Map.Entry entry : streamsConfig.originalsWithPrefix("topic.").entrySet()) {
            if (entry.getValue() == null) continue;
            this.defaultTopicConfigs.put((String)entry.getKey(), entry.getValue().toString());
        }
    }

    public void makeReady(Map<String, InternalTopicConfig> topics) {
        Map<String, Integer> existingTopicPartitions = this.getNumPartitions(topics.keySet());
        Set<InternalTopicConfig> topicsToBeCreated = this.validateTopicPartitions(topics.values(), existingTopicPartitions);
        if (topicsToBeCreated.size() > 0) {
            HashSet<NewTopic> newTopics = new HashSet<NewTopic>();
            for (InternalTopicConfig internalTopicConfig : topicsToBeCreated) {
                Map<String, String> topicConfig = internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
                this.log.debug("Going to create topic {} with {} partitions and config {}.", new Object[]{internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), topicConfig});
                newTopics.add(new NewTopic(internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), this.replicationFactor).configs(topicConfig));
            }
            int remainingRetries = this.retries;
            do {
                boolean retry = false;
                CreateTopicsResult createTopicsResult = this.adminClient.createTopics(newTopics);
                HashSet createTopicNames = new HashSet();
                for (Map.Entry createTopicResult : createTopicsResult.values().entrySet()) {
                    try {
                        ((KafkaFuture)createTopicResult.getValue()).get();
                        createTopicNames.add(createTopicResult.getKey());
                    }
                    catch (ExecutionException couldNotCreateTopic) {
                        Throwable cause = couldNotCreateTopic.getCause();
                        String topicName = (String)createTopicResult.getKey();
                        if (cause instanceof TimeoutException) {
                            retry = true;
                            this.log.debug("Could not get number of partitions for topic {} due to timeout. Will try again (remaining retries {}).", (Object)topicName, (Object)(remainingRetries - 1));
                            continue;
                        }
                        if (cause instanceof TopicExistsException) {
                            createTopicNames.add(createTopicResult.getKey());
                            this.log.info("Topic {} exist already: {}", (Object)topicName, (Object)couldNotCreateTopic.toString());
                            continue;
                        }
                        throw new StreamsException(String.format("Could not create topic %s.", topicName), couldNotCreateTopic);
                    }
                    catch (InterruptedException fatalException) {
                        Thread.currentThread().interrupt();
                        this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable)fatalException);
                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
                    }
                }
                if (retry) {
                    Iterator it = newTopics.iterator();
                    while (it.hasNext()) {
                        if (!createTopicNames.contains(((NewTopic)it.next()).name())) continue;
                        it.remove();
                    }
                } else {
                    return;
                }
            } while (remainingRetries-- > 0);
            String timeoutAndRetryError = "Could not create topics. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.";
            this.log.error("Could not create topics. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.");
            throw new StreamsException("Could not create topics. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.");
        }
    }

    protected Map<String, Integer> getNumPartitions(Set<String> topics) {
        this.log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
        int remainingRetries = this.retries;
        do {
            boolean retry = false;
            DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(topics);
            Map futures = describeTopicsResult.values();
            HashMap<String, Integer> existingNumberOfPartitionsPerTopic = new HashMap<String, Integer>();
            for (Map.Entry topicFuture : futures.entrySet()) {
                try {
                    TopicDescription topicDescription = (TopicDescription)((KafkaFuture)topicFuture.getValue()).get();
                    existingNumberOfPartitionsPerTopic.put((String)topicFuture.getKey(), topicDescription.partitions().size());
                }
                catch (InterruptedException fatalException) {
                    Thread.currentThread().interrupt();
                    this.log.error(INTERRUPTED_ERROR_MESSAGE, (Throwable)fatalException);
                    throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
                }
                catch (ExecutionException couldNotDescribeTopicException) {
                    Throwable cause = couldNotDescribeTopicException.getCause();
                    if (cause instanceof TimeoutException) {
                        retry = true;
                        this.log.debug("Could not get number of partitions for topic {} due to timeout. Will try again (remaining retries {}).", topicFuture.getKey(), (Object)(remainingRetries - 1));
                        continue;
                    }
                    String error = "Could not get number of partitions for topic {} due to {}";
                    this.log.debug("Could not get number of partitions for topic {} due to {}", topicFuture.getKey(), (Object)cause.toString());
                }
            }
            if (!retry) {
                return existingNumberOfPartitionsPerTopic;
            }
            topics.removeAll(existingNumberOfPartitionsPerTopic.keySet());
        } while (remainingRetries-- > 0);
        return Collections.emptyMap();
    }

    private Set<InternalTopicConfig> validateTopicPartitions(Collection<InternalTopicConfig> topicsPartitionsMap, Map<String, Integer> existingTopicNamesPartitions) {
        HashSet<InternalTopicConfig> topicsToBeCreated = new HashSet<InternalTopicConfig>();
        for (InternalTopicConfig topic : topicsPartitionsMap) {
            int numberOfPartitions = topic.numberOfPartitions();
            if (existingTopicNamesPartitions.containsKey(topic.name())) {
                if (existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions)) continue;
                String errorMsg = String.format("Existing internal topic %s has invalid partitions: expected: %d; actual: %d. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", topic.name(), numberOfPartitions, existingTopicNamesPartitions.get(topic.name()));
                this.log.error(errorMsg);
                throw new StreamsException(errorMsg);
            }
            topicsToBeCreated.add(topic);
        }
        return topicsToBeCreated;
    }
}

