package com.aliyun.dts.subscribe.clients.check.util;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

/* loaded from: input_file:com/aliyun/dts/subscribe/clients/check/util/NodeCommandClient.class */
public class NodeCommandClient {
    public static final ConfigDef DEFAULT_CLIENT_CONFIG = NodeCommandClientConfig.configDef().withClientSaslSupport().withClientSslSupport();

    /* loaded from: input_file:com/aliyun/dts/subscribe/clients/check/util/NodeCommandClient$CommandClient.class */
    public static final class CommandClient {
        private final KafkaClient kafkaClient;
        private final CommandClientConfig commandClientConfig;
        private final List<MetricsReporter> reporters;
        private static final int MAX_INFLIGHT_REQUESTS = 100;
        private static final AtomicInteger incrementCounter = new AtomicInteger(MAX_INFLIGHT_REQUESTS);

        public CommandClient(CommandClientConfig commandClientConfig) {
            this.commandClientConfig = commandClientConfig;
            SystemTime systemTime = new SystemTime();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            String string = this.commandClientConfig.getString(NodeCommandClientConfig.CLIENT_ID_CONFIG);
            linkedHashMap.put("client-id", string);
            Metadata metadata = new Metadata(this.commandClientConfig.getLong(NodeCommandClientConfig.RETRY_BACKOFF_MS_CONFIG).longValue(), this.commandClientConfig.getLong(NodeCommandClientConfig.METADATA_MAX_AGE_CONFIG).longValue(), false);
            metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(this.commandClientConfig.getList(NodeCommandClientConfig.BOOTSTRAP_SERVERS_CONFIG))), Collections.EMPTY_SET, systemTime.milliseconds());
            MetricConfig tags = new MetricConfig().samples(this.commandClientConfig.getInt(NodeCommandClientConfig.METRICS_NUM_SAMPLES_CONFIG).intValue()).timeWindow(this.commandClientConfig.getLong(NodeCommandClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS).tags(linkedHashMap);
            this.reporters = this.commandClientConfig.getConfiguredInstances(NodeCommandClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
            this.reporters.add(new JmxReporter("kafka.admin.client"));
            Metrics metrics = new Metrics(tags, this.reporters, systemTime);
            ChannelBuilder createChannelBuilder = ClientUtils.createChannelBuilder(this.commandClientConfig);
            LogContext createLogContext = NodeCommandClient.createLogContext(string);
            this.kafkaClient = new NetworkClient(new Selector(this.commandClientConfig.getLong(NodeCommandClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG).longValue(), metrics, systemTime, "kafka-client", createChannelBuilder, createLogContext), metadata, string, MAX_INFLIGHT_REQUESTS, this.commandClientConfig.getLong(NodeCommandClientConfig.RECONNECT_BACKOFF_MS_CONFIG).longValue(), this.commandClientConfig.getLong(NodeCommandClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG).longValue(), this.commandClientConfig.getInt(NodeCommandClientConfig.SEND_BUFFER_CONFIG).intValue(), this.commandClientConfig.getInt(NodeCommandClientConfig.RECEIVE_BUFFER_CONFIG).intValue(), this.commandClientConfig.getInt(NodeCommandClientConfig.REQUEST_TIMEOUT_MS_CONFIG).intValue(), systemTime, true, new ApiVersions(), createLogContext);
        }

        public void close() {
            if (null != this.kafkaClient) {
                try {
                    this.kafkaClient.close();
                } catch (Exception e) {
                }
            }
        }

        private Node ensureOneNodeIsReady(List<Node> list) {
            Node node = null;
            long milliseconds = Time.SYSTEM.milliseconds() + this.commandClientConfig.getInt(NodeCommandClientConfig.REQUEST_TIMEOUT_MS_CONFIG).intValue();
            boolean z = false;
            while (!z && Time.SYSTEM.milliseconds() < milliseconds) {
                for (Node node2 : list) {
                    if (this.kafkaClient.ready(node2, Time.SYSTEM.milliseconds())) {
                        node = node2;
                        z = true;
                        break;
                    }
                }
                try {
                    this.kafkaClient.poll(this.commandClientConfig.getLong(NodeCommandClientConfig.POLL_MS_CONFIG).longValue(), Time.SYSTEM.milliseconds());
                } catch (Exception e) {
                    throw new CommandClientProviderException("Could not poll.", e);
                }
            }
            if (node == null) {
                throw new CommandClientProviderException("Could not find any available broker. Check your NodeCommandClientConfig setting 'bootstrap.servers'. This error might also occur, if you try to connect to pre-0.10 brokers. Kafka Streams requires broker version 0.10.1.x or higher.");
            }
            return node;
        }

        private ClientResponse sendRequestSync(ClientRequest clientRequest) {
            try {
                this.kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
                long milliseconds = Time.SYSTEM.milliseconds() + this.commandClientConfig.getInt(NodeCommandClientConfig.REQUEST_TIMEOUT_MS_CONFIG).intValue();
                while (Time.SYSTEM.milliseconds() < milliseconds) {
                    try {
                        List poll = this.kafkaClient.poll(this.commandClientConfig.getLong(NodeCommandClientConfig.POLL_MS_CONFIG).longValue(), Time.SYSTEM.milliseconds());
                        if (!poll.isEmpty()) {
                            if (poll.size() > 1) {
                                throw new CommandClientProviderException("Sent one request but received multiple or no responses.");
                            }
                            ClientResponse clientResponse = (ClientResponse) poll.get(0);
                            if (clientResponse.requestHeader().correlationId() == clientRequest.correlationId()) {
                                return clientResponse;
                            }
                            throw new CommandClientProviderException("Inconsistent response received from the broker " + clientRequest.destination() + ", expected correlation id " + clientRequest.correlationId() + ", but received " + clientResponse.requestHeader().correlationId());
                        }
                    } catch (IllegalStateException e) {
                        throw new CommandClientProviderException("Could not poll.", e);
                    }
                }
                throw new CommandClientProviderException("Failed to get response from broker within timeout");
            } catch (Exception e2) {
                throw new CommandClientProviderException("Could not send request.", e2);
            }
        }

        private <T extends AbstractResponse> T doRequest(Node node, AbstractRequest.Builder builder, Class cls, String str) {
            ensureOneNodeIsReady(Arrays.asList(node));
            if (null == node || node.isEmpty()) {
                throw new CommandClientProviderException("destination is required");
            }
            ClientResponse sendRequestSync = sendRequestSync(this.kafkaClient.newClientRequest(node.idString(), builder, Time.SYSTEM.milliseconds(), true));
            if (!sendRequestSync.hasResponse()) {
                throw new CommandClientProviderException("Empty response for client request.");
            }
            if (cls.isAssignableFrom(sendRequestSync.responseBody().getClass())) {
                return (T) sendRequestSync.responseBody();
            }
            throw new CommandClientProviderException("Inconsistent response type for internal topic  " + str + "Request. Expected " + str + "Response but received " + sendRequestSync.responseBody().getClass().getName());
        }

        public MetadataResponse fetchMetadata() {
            return doRequest(getAnyReadyBrokerId(), MetadataRequest.Builder.allTopics(), MetadataResponse.class, "MetaData");
        }

        private Node getAnyReadyBrokerId() {
            Metadata metadata = new Metadata(this.commandClientConfig.getLong(NodeCommandClientConfig.RETRY_BACKOFF_MS_CONFIG).longValue(), this.commandClientConfig.getLong(NodeCommandClientConfig.METADATA_MAX_AGE_CONFIG).longValue(), false);
            metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(this.commandClientConfig.getList(NodeCommandClientConfig.BOOTSTRAP_SERVERS_CONFIG))), Collections.EMPTY_SET, Time.SYSTEM.milliseconds());
            return ensureOneNodeIsReady(metadata.fetch().nodes());
        }
    }

    /* loaded from: input_file:com/aliyun/dts/subscribe/clients/check/util/NodeCommandClient$CommandClientConfig.class */
    public static class CommandClientConfig extends AbstractConfig {
        public static CommandClientConfig fromNodeCommandClientConfig(NodeCommandClientConfig nodeCommandClientConfig) {
            return new CommandClientConfig(nodeCommandClientConfig.originals());
        }

        public CommandClientConfig(Map<?, ?> map) {
            super(NodeCommandClient.DEFAULT_CLIENT_CONFIG, map);
        }
    }

    /* loaded from: input_file:com/aliyun/dts/subscribe/clients/check/util/NodeCommandClient$CommandClientProviderException.class */
    public static final class CommandClientProviderException extends RuntimeException {
        public CommandClientProviderException(String str, Exception exc) {
            super(str, exc);
        }

        public CommandClientProviderException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static LogContext createLogContext(String str) {
        return new LogContext("[NodeCommandClient clientId=" + str + "] ");
    }
}
