/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.controller.ControllerBrokerStateInfo;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.QueueItem;
import kafka.controller.RequestSendThread;
import kafka.server.KafkaConfig;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.HashMap;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005Mf\u0001B\u0001\u0003\u0001\u001d\u0011\u0001dQ8oiJ|G\u000e\\3s\u0007\"\fgN\\3m\u001b\u0006t\u0017mZ3s\u0015\t\u0019A!\u0001\u0006d_:$(o\u001c7mKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001B\u0004\t\u0003\u00131i\u0011A\u0003\u0006\u0002\u0017\u0005)1oY1mC&\u0011QB\u0003\u0002\u0007\u0003:L(+\u001a4\u0011\u0005=\u0011R\"\u0001\t\u000b\u0005E!\u0011!B;uS2\u001c\u0018BA\n\u0011\u0005\u001daunZ4j]\u001eD\u0001\"\u0006\u0001\u0003\u0002\u0003\u0006IAF\u0001\u0012G>tGO]8mY\u0016\u00148i\u001c8uKb$\bCA\f\u0019\u001b\u0005\u0011\u0011BA\r\u0003\u0005E\u0019uN\u001c;s_2dWM]\"p]R,\u0007\u0010\u001e\u0005\t7\u0001\u0011\t\u0011)A\u00059\u000511m\u001c8gS\u001e\u0004\"!\b\u0011\u000e\u0003yQ!a\b\u0003\u0002\rM,'O^3s\u0013\t\tcDA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\u0002C\u0012\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0013\u0002\tQLW.\u001a\t\u0003K9j\u0011A\n\u0006\u0003#\u001dR!\u0001K\u0015\u0002\r\r|W.\\8o\u0015\t)!F\u0003\u0002,Y\u00051\u0011\r]1dQ\u0016T\u0011!L\u0001\u0004_J<\u0017BA\u0018'\u0005\u0011!\u0016.\\3\t\u0011E\u0002!\u0011!Q\u0001\nI\nq!\\3ue&\u001c7\u000f\u0005\u00024k5\tAG\u0003\u00022O%\u0011a\u0007\u000e\u0002\b\u001b\u0016$(/[2t\u0011!A\u0004A!A!\u0002\u0013I\u0014\u0001\u0005;ie\u0016\fGMT1nKB\u0013XMZ5y!\rI!\bP\u0005\u0003w)\u0011aa\u00149uS>t\u0007CA\u001fE\u001d\tq$\t\u0005\u0002@\u00155\t\u0001I\u0003\u0002B\r\u00051AH]8pizJ!a\u0011\u0006\u0002\rA\u0013X\rZ3g\u0013\t)eI\u0001\u0004TiJLgn\u001a\u0006\u0003\u0007*AQ\u0001\u0013\u0001\u0005\u0002%\u000ba\u0001P5oSRtDC\u0002&L\u00196su\n\u0005\u0002\u0018\u0001!)Qc\u0012a\u0001-!)1d\u0012a\u00019!)1e\u0012a\u0001I!)\u0011g\u0012a\u0001e!9\u0001h\u0012I\u0001\u0002\u0004I\u0004bB)\u0001\u0005\u0004%\tBU\u0001\u0010EJ|7.\u001a:Ti\u0006$X-\u00138g_V\t1\u000b\u0005\u0003U3nsV\"A+\u000b\u0005Y;\u0016aB7vi\u0006\u0014G.\u001a\u0006\u00031*\t!bY8mY\u0016\u001cG/[8o\u0013\tQVKA\u0004ICNDW*\u00199\u0011\u0005%a\u0016BA/\u000b\u0005\rIe\u000e\u001e\t\u0003/}K!\u0001\u0019\u0002\u00033\r{g\u000e\u001e:pY2,'O\u0011:pW\u0016\u00148\u000b^1uK&sgm\u001c\u0005\u0007E\u0002\u0001\u000b\u0011B*\u0002!\t\u0014xn[3s'R\fG/Z%oM>\u0004\u0003b\u00023\u0001\u0005\u0004%I!Z\u0001\u000bEJ|7.\u001a:M_\u000e\\W#\u00014\u0011\u0005\u001ddW\"\u00015\u000b\u0005%T\u0017\u0001\u00027b]\u001eT\u0011a[\u0001\u0005U\u00064\u0018-\u0003\u0002nQ\n1qJ\u00196fGRDaa\u001c\u0001!\u0002\u00131\u0017a\u00032s_.,'\u000fT8dW\u0002BQ!\u001d\u0001\u0005\u0002I\fqa\u001d;beR,\b\u000fF\u0001t!\tIA/\u0003\u0002v\u0015\t!QK\\5u\u0011\u00159\b\u0001\"\u0001s\u0003!\u0019\b.\u001e;e_^t\u0007\"B=\u0001\t\u0003Q\u0018aC:f]\u0012\u0014V-];fgR$ra]>~\u0003\u0017\tY\u0004C\u0003}q\u0002\u00071,\u0001\u0005ce>\\WM]%e\u0011\u0015q\b\u00101\u0001\u0000\u0003\u0019\t\u0007/[&fsB!\u0011\u0011AA\u0004\u001b\t\t\u0019AC\u0002\u0002\u0006\u001d\n\u0001\u0002\u001d:pi>\u001cw\u000e\\\u0005\u0005\u0003\u0013\t\u0019AA\u0004Ba&\\U-_:\t\u000f\u00055\u0001\u00101\u0001\u0002\u0010\u00059!/Z9vKN$\b\u0007BA\t\u0003S\u0001b!a\u0005\u0002 \u0005\u0015b\u0002BA\u000b\u00037i!!a\u0006\u000b\u0007\u0005eq%\u0001\u0005sKF,Xm\u001d;t\u0013\u0011\ti\"a\u0006\u0002\u001f\u0005\u00137\u000f\u001e:bGR\u0014V-];fgRLA!!\t\u0002$\t9!)^5mI\u0016\u0014(\u0002BA\u000f\u0003/\u0001B!a\n\u0002*1\u0001A\u0001DA\u0016\u0003\u0017\t\t\u0011!A\u0003\u0002\u00055\"aA0%cE!\u0011qFA\u001b!\rI\u0011\u0011G\u0005\u0004\u0003gQ!a\u0002(pi\"Lgn\u001a\t\u0005\u0003+\t9$\u0003\u0003\u0002:\u0005]!aD!cgR\u0014\u0018m\u0019;SKF,Xm\u001d;\t\u0013\u0005u\u0002\u0010%AA\u0002\u0005}\u0012\u0001C2bY2\u0014\u0017mY6\u0011\r%\t\t%!\u0012t\u0013\r\t\u0019E\u0003\u0002\n\rVt7\r^5p]F\u0002B!!\u0006\u0002H%!\u0011\u0011JA\f\u0005A\t%m\u001d;sC\u000e$(+Z:q_:\u001cX\rC\u0004\u0002N\u0001!\t!a\u0014\u0002\u0013\u0005$GM\u0011:pW\u0016\u0014HcA:\u0002R!A\u00111KA&\u0001\u0004\t)&\u0001\u0004ce>\\WM\u001d\t\u0005\u0003/\ni&\u0004\u0002\u0002Z)\u0019\u00111\f\u0003\u0002\u000f\rdWo\u001d;fe&!\u0011qLA-\u0005\u0019\u0011%o\\6fe\"9\u00111\r\u0001\u0005\u0002\u0005\u0015\u0014\u0001\u0004:f[>4XM\u0011:pW\u0016\u0014HcA:\u0002h!1A0!\u0019A\u0002mCq!a\u001b\u0001\t\u0013\ti'\u0001\u0007bI\u0012tUm\u001e\"s_.,'\u000fF\u0002t\u0003_B\u0001\"a\u0015\u0002j\u0001\u0007\u0011Q\u000b\u0005\b\u0003g\u0002A\u0011BA;\u0003Q\u0011X-\\8wK\u0016C\u0018n\u001d;j]\u001e\u0014%o\\6feR\u00191/a\u001e\t\u000f\u0005e\u0014\u0011\u000fa\u0001=\u0006Y!M]8lKJ\u001cF/\u0019;f\u0011\u001d\ti\b\u0001C\t\u0003\u007f\nac\u001d;beR\u0014V-];fgR\u001cVM\u001c3UQJ,\u0017\r\u001a\u000b\u0004g\u0006\u0005\u0005B\u0002?\u0002|\u0001\u00071\fC\u0005\u0002\u0006\u0002\t\n\u0011\"\u0001\u0002\b\u0006)2/\u001a8e%\u0016\fX/Z:uI\u0011,g-Y;mi\u0012\"TCAAEU\u0011\ty$a#,\u0005\u00055\u0005\u0003BAH\u00033k!!!%\u000b\t\u0005M\u0015QS\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a&\u000b\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u00037\u000b\tJA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016<\u0011\"a(\u0003\u0003\u0003E\t!!)\u00021\r{g\u000e\u001e:pY2,'o\u00115b]:,G.T1oC\u001e,'\u000fE\u0002\u0018\u0003G3\u0001\"\u0001\u0002\u0002\u0002#\u0005\u0011QU\n\u0004\u0003GC\u0001b\u0002%\u0002$\u0012\u0005\u0011\u0011\u0016\u000b\u0003\u0003CC!\"!,\u0002$F\u0005I\u0011AAX\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%kU\u0011\u0011\u0011\u0017\u0016\u0004s\u0005-\u0005")
public class ControllerChannelManager
implements Logging {
    private final ControllerContext controllerContext;
    private final KafkaConfig config;
    private final Time time;
    private final Metrics metrics;
    private final Option<String> threadNamePrefix;
    private final HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo;
    private final Object brokerLock;
    private final String loggerName;
    private Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    public static Option<String> $lessinit$greater$default$5() {
        return ControllerChannelManager$.MODULE$.$lessinit$greater$default$5();
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging.trace$(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging.swallowTrace$(this, action);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging.debug$(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging.swallowDebug$(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging.info$(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging.swallowInfo$(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging.warn$(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging.swallowWarn$(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging.swallow$(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging.error$(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging.swallowError$(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging.fatal$(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        ControllerChannelManager controllerChannelManager = this;
        synchronized (controllerChannelManager) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        return !this.bitmap$0 ? this.logger$lzycompute() : this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    @Override
    public final void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    public HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo() {
        return this.brokerStateInfo;
    }

    private Object brokerLock() {
        return this.brokerLock;
    }

    public void startup() {
        Object object = this.brokerLock();
        synchronized (object) {
            this.brokerStateInfo().foreach((Function1 & Serializable & scala.Serializable)brokerState -> {
                this.startRequestSendThread(brokerState._1$mcI$sp());
                return BoxedUnit.UNIT;
            });
        }
    }

    public void shutdown() {
        Object object = this.brokerLock();
        synchronized (object) {
            this.brokerStateInfo().values().foreach((Function1 & Serializable & scala.Serializable)brokerState -> {
                this.removeExistingBroker(brokerState);
                return BoxedUnit.UNIT;
            });
        }
    }

    public void sendRequest(int brokerId, ApiKeys apiKey, AbstractRequest.Builder<? extends AbstractRequest> request, Function1<AbstractResponse, BoxedUnit> callback) {
        Object object = this.brokerLock();
        synchronized (object) {
            Option stateInfoOpt;
            Option option = stateInfoOpt = this.brokerStateInfo().get((Object)BoxesRunTime.boxToInteger((int)brokerId));
            if (option instanceof Some) {
                Some some = (Some)option;
                ControllerBrokerStateInfo stateInfo = (ControllerBrokerStateInfo)some.value();
                stateInfo.messageQueue().put(new QueueItem(apiKey, request, callback));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else if (None$.MODULE$.equals(option)) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Not sending request %s to broker %d, since it is offline.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{request, BoxesRunTime.boxToInteger((int)brokerId)})));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                throw new MatchError((Object)option);
            }
        }
    }

    public Function1<AbstractResponse, BoxedUnit> sendRequest$default$4() {
        return null;
    }

    public void addBroker(Broker broker) {
        Object object = this.brokerLock();
        synchronized (object) {
            if (!this.brokerStateInfo().contains((Object)BoxesRunTime.boxToInteger((int)broker.id()))) {
                this.addNewBroker(broker);
                this.startRequestSendThread(broker.id());
            }
        }
    }

    public void removeBroker(int brokerId) {
        Object object = this.brokerLock();
        synchronized (object) {
            this.removeExistingBroker((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId)));
        }
    }

    private void addNewBroker(Broker broker) {
        String string;
        LinkedBlockingQueue<QueueItem> messageQueue = new LinkedBlockingQueue<QueueItem>();
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("Controller %d trying to connect to broker %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)$this.config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())})));
        BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(this.config.interBrokerListenerName());
        Node brokerNode = new Node(broker.id(), brokerEndPoint.host(), brokerEndPoint.port());
        ChannelBuilder channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)this.config.interBrokerSecurityProtocol(), (LoginType)LoginType.SERVER, (java.util.Map)this.config.values(), (String)this.config.saslMechanismInterBrokerProtocol(), (boolean)this.config.saslInterBrokerHandshakeRequestEnable());
        Selector selector = new Selector(-1, -1L, this.metrics, this.time, "controller-channel", (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"broker-id"), (Object)((Object)BoxesRunTime.boxToInteger((int)broker.id())).toString())}))).asJava(), false, channelBuilder);
        NetworkClient networkClient = new NetworkClient((Selectable)selector, (MetadataUpdater)new ManualMetadataUpdater((List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Node[]{brokerNode}))).asJava()), ((Object)BoxesRunTime.boxToInteger((int)this.config.brokerId())).toString(), 1, 0L, -1, -1, Predef$.MODULE$.Integer2int(this.config.requestTimeoutMs()), this.time, false);
        Option<String> option = this.threadNamePrefix;
        if (None$.MODULE$.equals(option)) {
            string = new StringOps(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())}));
        } else if (option instanceof Some) {
            Some some = (Some)option;
            String name = (String)some.value();
            string = new StringOps(Predef$.MODULE$.augmentString("%s:Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{name, BoxesRunTime.boxToInteger((int)this.config.brokerId()), BoxesRunTime.boxToInteger((int)broker.id())}));
        } else {
            throw new MatchError(option);
        }
        String threadName = string;
        RequestSendThread requestThread = new RequestSendThread(this.config.brokerId(), this.controllerContext, messageQueue, networkClient, brokerNode, this.config, this.time, threadName);
        requestThread.setDaemon(false);
        this.brokerStateInfo().put((Object)BoxesRunTime.boxToInteger((int)broker.id()), (Object)new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread));
    }

    private void removeExistingBroker(ControllerBrokerStateInfo brokerState) {
        try {
            brokerState.requestSendThread().shutdown();
            brokerState.networkClient().close();
            brokerState.messageQueue().clear();
            this.brokerStateInfo().remove((Object)BoxesRunTime.boxToInteger((int)brokerState.brokerNode().id()));
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Error while removing broker by the controller", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
        }
    }

    public void startRequestSendThread(int brokerId) {
        block0: {
            RequestSendThread requestThread = ((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId))).requestSendThread();
            Thread.State state = requestThread.getState();
            Thread.State state2 = Thread.State.NEW;
            if (state != null ? !((Object)((Object)state)).equals((Object)state2) : state2 != null) break block0;
            requestThread.start();
        }
    }

    public ControllerChannelManager(ControllerContext controllerContext, KafkaConfig config, Time time, Metrics metrics, Option<String> threadNamePrefix) {
        this.controllerContext = controllerContext;
        this.config = config;
        this.time = time;
        this.metrics = metrics;
        this.threadNamePrefix = threadNamePrefix;
        Logging.$init$(this);
        this.brokerStateInfo = new HashMap();
        this.brokerLock = new Object();
        this.logIdent_$eq("[Channel manager on controller " + config.brokerId() + "]: ");
        controllerContext.liveBrokers().foreach((Function1 & Serializable & scala.Serializable)broker -> {
            this.addNewBroker(broker);
            return BoxedUnit.UNIT;
        });
    }
}

