/*
 * Decompiled with CFR 0.152.
 */
package kafka.testkit;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.raft.KafkaRaftManager;
import kafka.raft.RaftManager;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRaftServer;
import kafka.server.MetaProperties;
import kafka.server.Server;
import kafka.testkit.BrokerNode;
import kafka.testkit.ControllerNode;
import kafka.testkit.TestKitNodes;
import kafka.tools.StorageTool;
import kafka.utils.Logging;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class KafkaClusterTestKit
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
    private final ExecutorService executorService;
    private final TestKitNodes nodes;
    private final Map<Integer, ControllerServer> controllers;
    private final Map<Integer, BrokerServer> brokers;
    private final Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers;
    private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
    private final File baseDirectory;

    private KafkaClusterTestKit(ExecutorService executorService, TestKitNodes nodes, Map<Integer, ControllerServer> controllers, Map<Integer, BrokerServer> brokers, Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers, ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager, File baseDirectory) {
        this.executorService = executorService;
        this.nodes = nodes;
        this.controllers = controllers;
        this.brokers = brokers;
        this.raftManagers = raftManagers;
        this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
        this.baseDirectory = baseDirectory;
    }

    public void format() throws Exception {
        ArrayList futures = new ArrayList();
        try {
            int nodeId;
            for (Map.Entry<Integer, ControllerServer> entry : this.controllers.entrySet()) {
                nodeId = entry.getKey();
                ControllerServer controller = entry.getValue();
                this.formatNodeAndLog(this.nodes.controllerProperties(nodeId), controller.config().metadataLogDir(), (Logging)controller, futures::add);
            }
            for (Map.Entry<Integer, ControllerServer> entry : this.brokers.entrySet()) {
                nodeId = entry.getKey();
                BrokerServer broker = (BrokerServer)entry.getValue();
                this.formatNodeAndLog(this.nodes.brokerProperties(nodeId), broker.config().metadataLogDir(), (Logging)broker, futures::add);
            }
            for (Future future : futures) {
                future.get();
            }
        }
        catch (Exception e) {
            for (Future future : futures) {
                future.cancel(true);
            }
            throw e;
        }
    }

    private void formatNodeAndLog(MetaProperties properties, String metadataLogDir, Logging loggingMixin, Consumer<Future<?>> futureConsumer) {
        futureConsumer.accept(this.executorService.submit(() -> {
            try (ByteArrayOutputStream stream = new ByteArrayOutputStream();){
                try {
                    block27: {
                        PrintStream out;
                        block28: {
                            out = new PrintStream(stream);
                            Throwable throwable = null;
                            try {
                                StorageTool.formatCommand((PrintStream)out, (scala.collection.immutable.Seq)JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(), (MetaProperties)properties, (boolean)false);
                                if (out == null) break block27;
                                if (throwable == null) break block28;
                            }
                            catch (Throwable throwable3) {
                                try {
                                    throwable = throwable3;
                                    throw throwable3;
                                }
                                catch (Throwable throwable4) {
                                    if (out == null) throw throwable4;
                                    if (throwable == null) {
                                        out.close();
                                        throw throwable4;
                                    }
                                    try {
                                        out.close();
                                        throw throwable4;
                                    }
                                    catch (Throwable throwable5) {
                                        throwable.addSuppressed(throwable5);
                                        throw throwable4;
                                    }
                                }
                            }
                            try {
                                out.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        out.close();
                    }
                }
                finally {
                    String[] stringArray = stream.toString().split(String.format("%n", new Object[0]));
                    int n = stringArray.length;
                    int n2 = 0;
                    while (n2 < n) {
                        String line = stringArray[n2];
                        loggingMixin.info(() -> line);
                        ++n2;
                    }
                    return;
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }));
    }

    public void startup() throws ExecutionException, InterruptedException {
        ArrayList<Object> futures = new ArrayList<Object>();
        try {
            for (ControllerServer controllerServer : this.controllers.values()) {
                futures.add(this.executorService.submit(() -> ((ControllerServer)controllerServer).startup()));
            }
            for (KafkaRaftManager kafkaRaftManager : this.raftManagers.values()) {
                futures.add(this.controllerQuorumVotersFutureManager.future.thenRunAsync(() -> ((KafkaRaftManager)kafkaRaftManager).startup()));
            }
            for (BrokerServer brokerServer : this.brokers.values()) {
                futures.add(this.executorService.submit(() -> ((BrokerServer)brokerServer).startup()));
            }
            for (Future future : futures) {
                future.get();
            }
        }
        catch (Exception e) {
            for (Future future : futures) {
                future.cancel(true);
            }
            throw e;
        }
    }

    public void waitForReadyBrokers() throws ExecutionException, InterruptedException {
        ControllerServer controllerServer = this.controllers.values().iterator().next();
        Controller controller = controllerServer.controller();
        controller.waitForReadyBrokers(this.brokers.size()).get();
    }

    public Properties controllerClientProperties() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        if (!this.controllers.isEmpty()) {
            List controllerNodes = RaftConfig.voterConnectionsToNodes((Map)((Map)this.controllerQuorumVotersFutureManager.future.get()));
            StringBuilder bld = new StringBuilder();
            String prefix = "";
            for (Node node : controllerNodes) {
                bld.append(prefix).append(node.id()).append('@');
                bld.append(node.host()).append(":").append(node.port());
                prefix = ",";
            }
            properties.setProperty("controller.quorum.voters", bld.toString());
            properties.setProperty("bootstrap.servers", controllerNodes.stream().map(n -> n.host() + ":" + n.port()).collect(Collectors.joining(",")));
        }
        return properties;
    }

    public Properties clientProperties() {
        Properties properties = new Properties();
        if (!this.brokers.isEmpty()) {
            StringBuilder bld = new StringBuilder();
            String prefix = "";
            for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                ListenerName listenerName;
                int brokerId = entry.getKey();
                BrokerServer broker = entry.getValue();
                int port = broker.boundPort(listenerName = this.nodes.externalListenerName());
                if (port <= 0) {
                    throw new RuntimeException("Broker " + brokerId + " does not yet have a bound port for " + listenerName + ".  Did you start the cluster yet?");
                }
                bld.append(prefix).append("localhost:").append(port);
                prefix = ",";
            }
            properties.setProperty("bootstrap.servers", bld.toString());
        }
        return properties;
    }

    public Map<Integer, ControllerServer> controllers() {
        return this.controllers;
    }

    public Map<Integer, BrokerServer> brokers() {
        return this.brokers;
    }

    public Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers() {
        return this.raftManagers;
    }

    public TestKitNodes nodes() {
        return this.nodes;
    }

    @Override
    public void close() throws Exception {
        ArrayList futureEntries = new ArrayList();
        try {
            this.controllerQuorumVotersFutureManager.close();
            for (Map.Entry<Integer, BrokerServer> entry : this.brokers.entrySet()) {
                int n = entry.getKey();
                BrokerServer broker = entry.getValue();
                futureEntries.add(new AbstractMap.SimpleImmutableEntry("broker" + n, this.executorService.submit(() -> ((BrokerServer)broker).shutdown())));
            }
            this.waitForAllFutures(futureEntries);
            futureEntries.clear();
            for (Map.Entry<Integer, BrokerServer> entry : this.controllers.entrySet()) {
                int n = entry.getKey();
                ControllerServer controller = (ControllerServer)entry.getValue();
                futureEntries.add(new AbstractMap.SimpleImmutableEntry("controller" + n, this.executorService.submit(() -> ((ControllerServer)controller).shutdown())));
            }
            this.waitForAllFutures(futureEntries);
            futureEntries.clear();
            for (Map.Entry<Integer, BrokerServer> entry : this.raftManagers.entrySet()) {
                int n = entry.getKey();
                KafkaRaftManager raftManager = (KafkaRaftManager)entry.getValue();
                futureEntries.add(new AbstractMap.SimpleImmutableEntry("raftManager" + n, this.executorService.submit(() -> ((KafkaRaftManager)raftManager).shutdown())));
            }
            this.waitForAllFutures(futureEntries);
            futureEntries.clear();
            Utils.delete((File)this.baseDirectory);
        }
        catch (Exception e) {
            for (Map.Entry entry : futureEntries) {
                ((Future)entry.getValue()).cancel(true);
            }
            throw e;
        }
        finally {
            this.executorService.shutdownNow();
            this.executorService.awaitTermination(5L, TimeUnit.MINUTES);
        }
    }

    private void waitForAllFutures(List<Map.Entry<String, Future<?>>> futureEntries) throws Exception {
        for (Map.Entry<String, Future<?>> entry : futureEntries) {
            log.debug("waiting for {} to shut down.", (Object)entry.getKey());
            entry.getValue().get();
            log.debug("{} successfully shut down.", (Object)entry.getKey());
        }
    }

    public static class Builder {
        private TestKitNodes nodes;
        private Map<String, String> configProps = new HashMap<String, String>();

        public Builder(TestKitNodes nodes) {
            this.nodes = nodes;
        }

        public Builder setConfigProp(String key, String value) {
            this.configProps.put(key, value);
            return this;
        }

        public KafkaClusterTestKit build() throws Exception {
            HashMap<Integer, ControllerServer> controllers = new HashMap<Integer, ControllerServer>();
            HashMap<Integer, BrokerServer> brokers = new HashMap<Integer, BrokerServer>();
            HashMap<Integer, KafkaRaftManager> raftManagers = new HashMap<Integer, KafkaRaftManager>();
            String uninitializedQuorumVotersString = this.nodes.controllerNodes().keySet().stream().map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).collect(Collectors.joining(","));
            int numOfExecutorThreads = (this.nodes.brokerNodes().size() + this.nodes.controllerNodes().size()) * 2;
            ExecutorService executorService = null;
            ControllerQuorumVotersFutureManager connectFutureManager = new ControllerQuorumVotersFutureManager(this.nodes.controllerNodes().size());
            File baseDirectory = null;
            try {
                KafkaRaftManager raftManager;
                TopicPartition metadataPartition;
                MetaProperties metaProperties;
                String threadNamePrefix;
                KafkaConfig config;
                HashMap<String, String> props;
                baseDirectory = TestUtils.tempDirectory();
                this.nodes = this.nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
                executorService = Executors.newFixedThreadPool(numOfExecutorThreads, ThreadUtils.createThreadFactory((String)"KafkaClusterTestKit%d", (boolean)false));
                for (ControllerNode controllerNode2 : this.nodes.controllerNodes().values()) {
                    props = new HashMap<String, String>(this.configProps);
                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
                    props.put(KafkaConfig$.MODULE$.NodeIdProp(), Integer.toString(controllerNode2.id()));
                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), controllerNode2.metadataDirectory());
                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "CONTROLLER:PLAINTEXT");
                    props.put(KafkaConfig$.MODULE$.ListenersProp(), "CONTROLLER://localhost:0");
                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
                    props.put("controller.quorum.voters", uninitializedQuorumVotersString);
                    Builder.setupNodeDirectories(baseDirectory, controllerNode2.metadataDirectory(), Collections.emptyList());
                    config = new KafkaConfig(props, false, Option.empty());
                    threadNamePrefix = String.format("controller%d_", controllerNode2.id());
                    metaProperties = MetaProperties.apply((String)this.nodes.clusterId().toString(), (int)controllerNode2.id());
                    metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
                    raftManager = new KafkaRaftManager(metaProperties, config, (RecordSerde)new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(), Time.SYSTEM, new Metrics(), Option.apply((Object)threadNamePrefix), connectFutureManager.future);
                    ControllerServer controller = new ControllerServer(this.nodes.controllerProperties(controllerNode2.id()), config, (RaftManager)raftManager, Time.SYSTEM, new Metrics(), Option.apply((Object)threadNamePrefix), connectFutureManager.future);
                    controllers.put(controllerNode2.id(), controller);
                    controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
                        if (e != null) {
                            connectFutureManager.fail((Throwable)e);
                        } else {
                            connectFutureManager.registerPort(controllerNode2.id(), (int)port);
                        }
                    });
                    raftManagers.put(controllerNode2.id(), raftManager);
                }
                for (BrokerNode brokerNode : this.nodes.brokerNodes().values()) {
                    props = new HashMap<String, String>(this.configProps);
                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
                    props.put(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.toString(brokerNode.id()));
                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(), brokerNode.metadataDirectory());
                    props.put(KafkaConfig$.MODULE$.LogDirsProp(), String.join((CharSequence)",", brokerNode.logDataDirectories()));
                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
                    props.put(KafkaConfig$.MODULE$.ListenersProp(), "EXTERNAL://localhost:0");
                    props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), this.nodes.interBrokerListenerName().value());
                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
                    Builder.setupNodeDirectories(baseDirectory, brokerNode.metadataDirectory(), brokerNode.logDataDirectories());
                    props.put("controller.quorum.voters", uninitializedQuorumVotersString);
                    props.putAll(brokerNode.propertyOverrides());
                    config = new KafkaConfig(props, false, Option.empty());
                    threadNamePrefix = String.format("broker%d_", brokerNode.id());
                    metaProperties = MetaProperties.apply((String)this.nodes.clusterId().toString(), (int)brokerNode.id());
                    metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
                    raftManager = new KafkaRaftManager(metaProperties, config, (RecordSerde)new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(), Time.SYSTEM, new Metrics(), Option.apply((Object)threadNamePrefix), connectFutureManager.future);
                    BrokerServer broker = new BrokerServer(config, this.nodes.brokerProperties(brokerNode.id()), (RaftManager)raftManager, Time.SYSTEM, new Metrics(), Option.apply((Object)threadNamePrefix), (Seq)JavaConverters.asScalaBuffer(Collections.emptyList()).toSeq(), connectFutureManager.future, Server.SUPPORTED_FEATURES());
                    brokers.put(brokerNode.id(), broker);
                    raftManagers.put(brokerNode.id(), raftManager);
                }
            }
            catch (Exception e2) {
                if (executorService != null) {
                    executorService.shutdownNow();
                    executorService.awaitTermination(5L, TimeUnit.MINUTES);
                }
                for (ControllerServer controller : controllers.values()) {
                    controller.shutdown();
                }
                for (BrokerServer brokerServer : brokers.values()) {
                    brokerServer.shutdown();
                }
                for (KafkaRaftManager raftManager : raftManagers.values()) {
                    raftManager.shutdown();
                }
                connectFutureManager.close();
                if (baseDirectory != null) {
                    Utils.delete((File)baseDirectory);
                }
                throw e2;
            }
            return new KafkaClusterTestKit(executorService, this.nodes, controllers, brokers, raftManagers, connectFutureManager, baseDirectory);
        }

        private static void setupNodeDirectories(File baseDirectory, String metadataDirectory, Collection<String> logDataDirectories) throws Exception {
            Files.createDirectories(new File(baseDirectory, "local").toPath(), new FileAttribute[0]);
            Files.createDirectories(Paths.get(metadataDirectory, new String[0]), new FileAttribute[0]);
            for (String logDataDirectory : logDataDirectories) {
                Files.createDirectories(Paths.get(logDataDirectory, new String[0]), new FileAttribute[0]);
            }
        }
    }

    private static class ControllerQuorumVotersFutureManager
    implements AutoCloseable {
        private final int expectedControllers;
        private final CompletableFuture<Map<Integer, RaftConfig.AddressSpec>> future = new CompletableFuture();
        private final Map<Integer, Integer> controllerPorts = new TreeMap<Integer, Integer>();

        ControllerQuorumVotersFutureManager(int expectedControllers) {
            this.expectedControllers = expectedControllers;
        }

        synchronized void registerPort(int nodeId, int port) {
            this.controllerPorts.put(nodeId, port);
            if (this.controllerPorts.size() >= this.expectedControllers) {
                this.future.complete(this.controllerPorts.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", (int)((Integer)entry.getValue()))))));
            }
        }

        void fail(Throwable e) {
            this.future.completeExceptionally(e);
        }

        @Override
        public void close() {
            this.future.cancel(true);
        }
    }
}

