/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.akka;

import akka.actor.ActorNotFound;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString$;
import akka.actor.RobustActorSystem$;
import akka.pattern.AskableActorRef$;
import akka.pattern.package$;
import akka.util.Timeout;
import akka.util.Timeout$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.EscalatingSupervisorStrategy;
import org.apache.flink.runtime.akka.RemoteAddressExtension;
import org.apache.flink.runtime.akka.RemoteAddressExtension$;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.shaded.akka.org.jboss.netty.channel.ChannelException;
import org.apache.flink.shaded.akka.org.jboss.netty.logging.InternalLoggerFactory;
import org.apache.flink.shaded.akka.org.jboss.netty.logging.Slf4JLoggerFactory;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Try;
import scala.util.Try$;

public final class AkkaUtils$ {
    public static final AkkaUtils$ MODULE$;
    private final Logger LOG;
    private final String FLINK_ACTOR_SYSTEM_NAME;

    static {
        new AkkaUtils$();
    }

    public Logger LOG() {
        return this.LOG;
    }

    public String FLINK_ACTOR_SYSTEM_NAME() {
        return this.FLINK_ACTOR_SYSTEM_NAME;
    }

    public String getFlinkActorSystemName() {
        return this.FLINK_ACTOR_SYSTEM_NAME();
    }

    public ActorSystem createLocalActorSystem(Configuration configuration) {
        Config akkaConfig = this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)None$.MODULE$);
        return this.createActorSystem(akkaConfig);
    }

    public ActorSystem createActorSystem(Configuration configuration, String hostname, int port) {
        return this.createActorSystem(configuration, (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)hostname, (Object)BoxesRunTime.boxToInteger((int)port))));
    }

    public ActorSystem createActorSystem(Configuration configuration, Option<Tuple2<String, Object>> listeningAddress) {
        Config akkaConfig = this.getAkkaConfig(configuration, listeningAddress);
        return this.createActorSystem(akkaConfig);
    }

    public ActorSystem createActorSystem(Config akkaConfig) {
        return this.createActorSystem(this.FLINK_ACTOR_SYSTEM_NAME(), akkaConfig);
    }

    public ActorSystem createActorSystem(String actorSystemName, Config akkaConfig) {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        return RobustActorSystem$.MODULE$.create(actorSystemName, akkaConfig);
    }

    public ActorSystem createDefaultActorSystem() {
        return this.createActorSystem(this.getDefaultAkkaConfig());
    }

    public Config getAkkaConfig(Configuration configuration, String hostname, int port, Config executorConfig) {
        return this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)hostname, (Object)BoxesRunTime.boxToInteger((int)port))), (Option<Tuple2<String, Object>>)None$.MODULE$, executorConfig);
    }

    public Config getAkkaConfig(Configuration configuration, String hostname, int port) {
        return this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)hostname, (Object)BoxesRunTime.boxToInteger((int)port))));
    }

    public Config getAkkaConfig(Configuration configuration) {
        return this.getAkkaConfig(configuration, (Option<Tuple2<String, Object>>)None$.MODULE$);
    }

    public Config getAkkaConfig(Configuration configuration, Option<Tuple2<String, Object>> externalAddress) throws UnknownHostException {
        return this.getAkkaConfig(configuration, externalAddress, (Option<Tuple2<String, Object>>)None$.MODULE$, this.getForkJoinExecutorConfig(BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(configuration)));
    }

    public Config getAkkaConfig(Configuration configuration, Option<Tuple2<String, Object>> externalAddress, Option<Tuple2<String, Object>> bindAddress, Config executorConfig) throws UnknownHostException {
        Option<Tuple2<String, Object>> option;
        block7: {
            Config config;
            block6: {
                Config defaultConfig;
                block2: {
                    Option<Tuple2<String, Object>> option2;
                    block5: {
                        Config config2;
                        block4: {
                            int externalPort;
                            String externalHostname;
                            block3: {
                                Some some;
                                Tuple2 tuple2;
                                Some some2;
                                Tuple2 tuple22;
                                defaultConfig = this.getBasicAkkaConfig(configuration).withFallback((ConfigMergeable)executorConfig);
                                option = externalAddress;
                                if (!(option instanceof Some) || (tuple22 = (Tuple2)(some2 = (Some)option).x()) == null) break block2;
                                externalHostname = (String)tuple22._1();
                                externalPort = tuple22._2$mcI$sp();
                                option2 = bindAddress;
                                if (!(option2 instanceof Some) || (tuple2 = (Tuple2)(some = (Some)option2).x()) == null) break block3;
                                String bindHostname = (String)tuple2._1();
                                int bindPort = tuple2._2$mcI$sp();
                                Config remoteConfig = this.getRemoteAkkaConfig(configuration, bindHostname, bindPort, externalHostname, externalPort);
                                config2 = remoteConfig.withFallback((ConfigMergeable)defaultConfig);
                                break block4;
                            }
                            if (!None$.MODULE$.equals(option2)) break block5;
                            Config remoteConfig = this.getRemoteAkkaConfig(configuration, NetUtils.getWildcardIPAddress(), externalPort, externalHostname, externalPort);
                            config2 = remoteConfig.withFallback((ConfigMergeable)defaultConfig);
                        }
                        config = config2;
                        break block6;
                    }
                    throw new MatchError(option2);
                }
                if (!None$.MODULE$.equals(option)) break block7;
                config = defaultConfig;
            }
            return config;
        }
        throw new MatchError(option);
    }

    public Config getDefaultAkkaConfig() {
        return this.getAkkaConfig(new Configuration(), (Option<Tuple2<String, Object>>)new Some((Object)new Tuple2((Object)"", (Object)BoxesRunTime.boxToInteger((int)0))));
    }

    private Config getBasicAkkaConfig(Configuration configuration) {
        int akkaThroughput = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT);
        boolean lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS);
        String jvmExitOnFatalError = configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR) ? "on" : "off";
        String logLifecycleEvents = lifecycleEvents ? "on" : "off";
        String logLevel = this.getLogLevel();
        String supervisorStrategy2 = EscalatingSupervisorStrategy.class.getCanonicalName();
        String config = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n        |akka {\n        | daemonic = off\n        |\n        | loggers = [\"akka.event.slf4j.Slf4jLogger\"]\n        | logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"\n        | log-config-on-start = off\n        | logger-startup-timeout = 30s\n        |\n        | jvm-exit-on-fatal-error = ", "\n        |\n        | serialize-messages = off\n        |\n        | loglevel = ", "\n        | stdout-loglevel = OFF\n        |\n        | log-dead-letters = ", "\n        | log-dead-letters-during-shutdown = ", "\n        |\n        | actor {\n        |   guardian-supervisor-strategy = ", "\n        |\n        |   warn-about-java-serializer-usage = off\n        |\n        |   default-dispatcher {\n        |     throughput = ", "\n        |   }\n        |\n        |   supervisor-dispatcher {\n        |     type = Dispatcher\n        |     executor = \"thread-pool-executor\"\n        |     thread-pool-executor {\n        |       core-pool-size-min = 1\n        |       core-pool-size-max = 1\n        |     }\n        |   }\n        | }\n        |}\n      "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jvmExitOnFatalError, logLevel, logLifecycleEvents, logLifecycleEvents, supervisorStrategy2, BoxesRunTime.boxToInteger((int)akkaThroughput)})))).stripMargin();
        return ConfigFactory.parseString((String)config);
    }

    public Config getThreadPoolExecutorConfig(BootstrapTools.FixedThreadPoolExecutorConfiguration configuration) {
        int threadPriority = configuration.getThreadPriority();
        int minNumThreads = configuration.getMinNumThreads();
        int maxNumThreads = configuration.getMaxNumThreads();
        String configString = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n       |akka {\n       |  actor {\n       |    default-dispatcher {\n       |      type = akka.dispatch.PriorityThreadsDispatcher\n       |      executor = \"thread-pool-executor\"\n       |      thread-priority = ", "\n       |      thread-pool-executor {\n       |        core-pool-size-min = ", "\n       |        core-pool-size-max = ", "\n       |      }\n       |    }\n       |  }\n       |}\n        "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)threadPriority), BoxesRunTime.boxToInteger((int)minNumThreads), BoxesRunTime.boxToInteger((int)maxNumThreads)})))).stripMargin();
        return ConfigFactory.parseString((String)configString);
    }

    public Config getForkJoinExecutorConfig(BootstrapTools.ForkJoinExecutorConfiguration configuration) {
        double forkJoinExecutorParallelismFactor = configuration.getParallelismFactor();
        int forkJoinExecutorParallelismMin = configuration.getMinParallelism();
        int forkJoinExecutorParallelismMax = configuration.getMaxParallelism();
        String configString = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n       |akka {\n       |  actor {\n       |    default-dispatcher {\n       |      executor = \"fork-join-executor\"\n       |      fork-join-executor {\n       |        parallelism-factor = ", "\n       |        parallelism-min = ", "\n       |        parallelism-max = ", "\n       |      }\n       |    }\n       |  }\n       |}"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToDouble((double)forkJoinExecutorParallelismFactor), BoxesRunTime.boxToInteger((int)forkJoinExecutorParallelismMin), BoxesRunTime.boxToInteger((int)forkJoinExecutorParallelismMax)})))).stripMargin();
        return ConfigFactory.parseString((String)configString);
    }

    public Config testDispatcherConfig() {
        String config = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |akka {\n         |  actor {\n         |    default-dispatcher {\n         |      fork-join-executor {\n         |        parallelism-factor = 1.0\n         |        parallelism-min = 2\n         |        parallelism-max = 4\n         |      }\n         |    }\n         |  }\n         |}\n      "})).s((Seq)Nil$.MODULE$))).stripMargin();
        return ConfigFactory.parseString((String)config);
    }

    private Config getRemoteAkkaConfig(Configuration configuration, String bindAddress, int port, String externalHostname, int externalPort) {
        String normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString((String)externalHostname);
        java.time.Duration akkaAskTimeout = this.getTimeout(configuration);
        String startupTimeout = TimeUtils.getStringInMillis((java.time.Duration)TimeUtils.parseDuration((String)configuration.getString(AkkaOptions.STARTUP_TIMEOUT, TimeUtils.getStringInMillis((java.time.Duration)akkaAskTimeout.multipliedBy(10L)))));
        String akkaTCPTimeout = TimeUtils.getStringInMillis((java.time.Duration)TimeUtils.parseDuration((String)configuration.getString(AkkaOptions.TCP_TIMEOUT)));
        String akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE);
        boolean lifecycleEvents = configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS);
        String logLifecycleEvents = lifecycleEvents ? "on" : "off";
        boolean akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.isInternalSSLEnabled(configuration);
        long retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
        String akkaEnableSSL = akkaEnableSSLConfig ? "on" : "off";
        String akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE, configuration.getString(SecurityOptions.SSL_KEYSTORE));
        String akkaSSLKeyStorePassword = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD));
        String akkaSSLKeyPassword = configuration.getString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, configuration.getString(SecurityOptions.SSL_KEY_PASSWORD));
        String akkaSSLTrustStore = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, configuration.getString(SecurityOptions.SSL_TRUSTSTORE));
        String akkaSSLTrustStorePassword = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD));
        String akkaSSLCertFingerprintString = configuration.getString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT);
        String akkaSSLCertFingerprints = akkaSSLCertFingerprintString == null ? "[]" : Predef$.MODULE$.refArrayOps((Object[])akkaSSLCertFingerprintString.split(",")).toList().mkString("[\"", "\",\"", "\"]");
        String akkaSSLProtocol = configuration.getString(SecurityOptions.SSL_PROTOCOL);
        String akkaSSLAlgorithmsString = configuration.getString(SecurityOptions.SSL_ALGORITHMS);
        String akkaSSLAlgorithms = Predef$.MODULE$.refArrayOps((Object[])akkaSSLAlgorithmsString.split(",")).toList().mkString("[", ",", "]");
        int clientSocketWorkerPoolPoolSizeMin = configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
        int clientSocketWorkerPoolPoolSizeMax = configuration.getInteger(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
        double clientSocketWorkerPoolPoolSizeFactor = configuration.getDouble(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
        int serverSocketWorkerPoolPoolSizeMin = configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
        int serverSocketWorkerPoolPoolSizeMax = configuration.getInteger(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
        double serverSocketWorkerPoolPoolSizeFactor = configuration.getDouble(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
        String configString = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |akka {\n         |  actor {\n         |    provider = \"akka.remote.RemoteActorRefProvider\"\n         |  }\n         |\n         |  remote {\n         |    startup-timeout = ", "\n         |\n         |    # disable the transport failure detector by setting very high values\n         |    transport-failure-detector{\n         |      acceptable-heartbeat-pause = 6000 s\n         |      heartbeat-interval = 1000 s\n         |      threshold = 300\n         |    }\n         |\n         |    netty {\n         |      tcp {\n         |        transport-class = \"akka.remote.transport.netty.NettyTransport\"\n         |        port = ", "\n         |        bind-port = ", "\n         |        connection-timeout = ", "\n         |        maximum-frame-size = ", "\n         |        tcp-nodelay = on\n         |\n         |        client-socket-worker-pool {\n         |          pool-size-min = ", "\n         |          pool-size-max = ", "\n         |          pool-size-factor = ", "\n         |        }\n         |\n         |        server-socket-worker-pool {\n         |          pool-size-min = ", "\n         |          pool-size-max = ", "\n         |          pool-size-factor = ", "\n         |        }\n         |      }\n         |    }\n         |\n         |    log-remote-lifecycle-events = ", "\n         |\n         |    retry-gate-closed-for = ", "\n         |  }\n         |}\n       "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{startupTimeout, BoxesRunTime.boxToInteger((int)externalPort), BoxesRunTime.boxToInteger((int)port), akkaTCPTimeout, akkaFramesize, BoxesRunTime.boxToInteger((int)clientSocketWorkerPoolPoolSizeMin), BoxesRunTime.boxToInteger((int)clientSocketWorkerPoolPoolSizeMax), BoxesRunTime.boxToDouble((double)clientSocketWorkerPoolPoolSizeFactor), BoxesRunTime.boxToInteger((int)serverSocketWorkerPoolPoolSizeMin), BoxesRunTime.boxToInteger((int)serverSocketWorkerPoolPoolSizeMax), BoxesRunTime.boxToDouble((double)serverSocketWorkerPoolPoolSizeFactor), logLifecycleEvents, new StringBuilder().append(retryGateClosedFor).append((Object)" ms").toString()})))).stripMargin();
        String effectiveHostname = normalizedExternalHostname != null && new StringOps(Predef$.MODULE$.augmentString(normalizedExternalHostname)).nonEmpty() ? normalizedExternalHostname : "";
        String hostnameConfigString = new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |akka {\n         |  remote {\n         |    netty {\n         |      tcp {\n         |        hostname = \"", "\"\n         |        bind-hostname = \"", "\"\n         |      }\n         |    }\n         |  }\n         |}\n       "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{effectiveHostname, bindAddress})))).stripMargin();
        String sslConfigString = akkaEnableSSLConfig ? new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |akka {\n         |  remote {\n         |\n         |    enabled-transports = [\"akka.remote.netty.ssl\"]\n         |\n         |    netty {\n         |\n         |      ssl = ${akka.remote.netty.tcp}\n         |\n         |      ssl {\n         |\n         |        enable-ssl = ", "\n         |        ssl-engine-provider = org.apache.flink.runtime.akka.CustomSSLEngineProvider\n         |        security {\n         |          key-store = \"", "\"\n         |          key-store-password = \"", "\"\n         |          key-password = \"", "\"\n         |          trust-store = \"", "\"\n         |          trust-store-password = \"", "\"\n         |          protocol = ", "\n         |          enabled-algorithms = ", "\n         |          random-number-generator = \"\"\n         |          require-mutual-authentication = on\n         |          cert-fingerprints = ", "\n         |        }\n         |      }\n         |    }\n         |  }\n         |}\n       "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{akkaEnableSSL, akkaSSLKeyStore, akkaSSLKeyStorePassword, akkaSSLKeyPassword, akkaSSLTrustStore, akkaSSLTrustStorePassword, akkaSSLProtocol, akkaSSLAlgorithms, akkaSSLCertFingerprints})))).stripMargin() : "";
        return ConfigFactory.parseString((String)new StringBuilder().append((Object)configString).append((Object)hostnameConfigString).append((Object)sslConfigString).toString()).resolve();
    }

    public String getLogLevel() {
        return this.LOG().isTraceEnabled() ? "TRACE" : (this.LOG().isDebugEnabled() ? "DEBUG" : (this.LOG().isInfoEnabled() ? "INFO" : (this.LOG().isWarnEnabled() ? "WARNING" : (this.LOG().isErrorEnabled() ? "ERROR" : "OFF"))));
    }

    public Future<ActorRef> getChild(ActorRef parent, String child, ActorSystem system, FiniteDuration timeout) {
        return system.actorSelection(parent.path().$div(child)).resolveOne(Timeout$.MODULE$.durationToTimeout(timeout));
    }

    public Future<ActorRef> getActorRefFuture(String path, ActorSystem system, FiniteDuration timeout) {
        return system.actorSelection(path).resolveOne(Timeout$.MODULE$.durationToTimeout(timeout));
    }

    public ActorRef getActorRef(String path, ActorSystem system, FiniteDuration timeout) throws IOException {
        try {
            Future<ActorRef> future = this.getActorRefFuture(path, system, timeout);
            return (ActorRef)Await$.MODULE$.result(future, (Duration)timeout);
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ActorNotFound ? true : throwable2 instanceof TimeoutException;
            if (bl) {
                throw new IOException(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Actor at ", " not reachable. "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{path}))).append((Object)"Please make sure that the actor is running and its port is reachable.").toString(), throwable2);
            }
            if (throwable2 instanceof IOException) {
                IOException iOException = (IOException)throwable2;
                throw new IOException(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Could not connect to the actor at ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{path})), iOException);
            }
            throw throwable;
        }
    }

    public <T> Future<T> retry(Function0<T> body2, int tries, ExecutionContext executionContext) {
        return Future$.MODULE$.apply(body2, executionContext).recoverWith((PartialFunction)new Serializable(body2, tries, executionContext){
            public static final long serialVersionUID = 0L;
            private final Function0 body$1;
            private final int tries$1;
            private final ExecutionContext executionContext$1;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x1;
                if (A1 != null) {
                    A1 A12 = A1;
                    object = this.tries$1 > 0 ? AkkaUtils$.MODULE$.retry(this.body$1, this.tries$1 - 1, this.executionContext$1) : Future$.MODULE$.failed(A12);
                } else {
                    object = function1.apply(x1);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x1) {
                Throwable throwable = x1;
                boolean bl = throwable != null;
                return bl;
            }
            {
                this.body$1 = body$1;
                this.tries$1 = tries$1;
                this.executionContext$1 = executionContext$1;
            }
        }, executionContext);
    }

    public <T> Future<T> retry(Callable<T> callable, int tries, ExecutionContext executionContext) {
        return this.retry((Function0<T>)new Serializable(callable){
            public static final long serialVersionUID = 0L;
            private final Callable callable$1;

            public final T apply() {
                return (T)this.callable$1.call();
            }
            {
                this.callable$1 = callable$1;
            }
        }, tries, executionContext);
    }

    public Future<Object> retry(ActorRef target, Object message, int tries, ExecutionContext executionContext, FiniteDuration timeout) {
        ActorRef qual$1 = package$.MODULE$.ask(target);
        Object x$1 = message;
        Timeout x$2 = Timeout$.MODULE$.durationToTimeout(timeout);
        ActorRef x$3 = AskableActorRef$.MODULE$.$qmark$default$3$extension(qual$1, x$1);
        return AskableActorRef$.MODULE$.$qmark$extension1(qual$1, x$1, x$2, x$3).recoverWith((PartialFunction)new Serializable(target, message, tries, executionContext, timeout){
            public static final long serialVersionUID = 0L;
            private final ActorRef target$1;
            private final Object message$1;
            private final int tries$2;
            private final ExecutionContext executionContext$2;
            private final FiniteDuration timeout$1;

            public final <A1 extends Throwable, B1> B1 applyOrElse(A1 x2, Function1<A1, B1> function1) {
                Object object;
                A1 A1 = x2;
                if (A1 != null) {
                    A1 A12 = A1;
                    object = this.tries$2 > 0 ? AkkaUtils$.MODULE$.retry(this.target$1, this.message$1, this.tries$2 - 1, this.executionContext$2, this.timeout$1) : Future$.MODULE$.failed(A12);
                } else {
                    object = function1.apply(x2);
                }
                return (B1)object;
            }

            public final boolean isDefinedAt(Throwable x2) {
                Throwable throwable = x2;
                boolean bl = throwable != null;
                return bl;
            }
            {
                this.target$1 = target$1;
                this.message$1 = message$1;
                this.tries$2 = tries$2;
                this.executionContext$2 = executionContext$2;
                this.timeout$1 = timeout$1;
            }
        }, executionContext);
    }

    public java.time.Duration getTimeout(Configuration config) {
        return TimeUtils.parseDuration((String)config.getString(AkkaOptions.ASK_TIMEOUT));
    }

    public Time getTimeoutAsTime(Configuration config) {
        try {
            java.time.Duration duration = this.getTimeout(config);
            return Time.milliseconds((long)duration.toMillis());
        }
        catch (NumberFormatException numberFormatException) {
            throw new IllegalConfigurationException(this.formatDurationParsingErrorMessage());
        }
    }

    public Time getDefaultTimeout() {
        java.time.Duration duration = TimeUtils.parseDuration((String)((String)AkkaOptions.ASK_TIMEOUT.defaultValue()));
        return Time.milliseconds((long)duration.toMillis());
    }

    public java.time.Duration getLookupTimeout(Configuration config) {
        return TimeUtils.parseDuration((String)config.getString(AkkaOptions.LOOKUP_TIMEOUT));
    }

    public Address getAddress(ActorSystem system) {
        return ((RemoteAddressExtension)RemoteAddressExtension$.MODULE$.apply(system)).address();
    }

    public String getAkkaURL(ActorSystem system, ActorRef actor) {
        Address address = this.getAddress(system);
        return actor.path().toStringWithAddress(address);
    }

    public String getAkkaURL(ActorSystem system, String path) {
        Address address = this.getAddress(system);
        return new StringBuilder().append((Object)address.toString()).append((Object)path).toString();
    }

    public InetSocketAddress getInetSocketAddressFromAkkaURL(String akkaURL) throws Exception {
        try {
            Address address = this.getAddressFromAkkaURL(akkaURL);
            Tuple2 tuple2 = new Tuple2((Object)address.host(), (Object)address.port());
            if (tuple2 != null) {
                Option option = (Option)tuple2._1();
                Option option2 = (Option)tuple2._2();
                if (option instanceof Some) {
                    Some some = (Some)option;
                    String hostname = (String)some.x();
                    if (option2 instanceof Some) {
                        Some some2 = (Some)option2;
                        int portValue = BoxesRunTime.unboxToInt((Object)some2.x());
                        InetSocketAddress inetSocketAddress = new InetSocketAddress(hostname, portValue);
                        return inetSocketAddress;
                    }
                }
            }
            throw new MalformedURLException();
        }
        catch (MalformedURLException malformedURLException) {
            throw new Exception(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Could not retrieve InetSocketAddress from Akka URL ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{akkaURL})));
        }
    }

    public Address getAddressFromAkkaURL(String akkaURL) throws MalformedURLException {
        return AddressFromURIString$.MODULE$.apply(akkaURL);
    }

    public String formatDurationParsingErrorMessage() {
        return "Duration format must be \"val unit\", where 'val' is a number and 'unit' is (d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|(\u00b5s|micro|microsecond)|(ns|nano|nanosecond)";
    }

    public String getLocalAkkaURL(String actorName) {
        return new StringBuilder().append((Object)"akka://flink/user/").append((Object)actorName).toString();
    }

    public <T> Try<T> retryOnBindException(Function0<T> fn, Function0<Object> stopCond, long maxSleepBetweenRetries) {
        Try try_;
        block5: {
            Try try_2;
            block6: {
                Failure failure;
                block7: {
                    Exception exception;
                    while (true) {
                        Throwable x;
                        boolean bl = false;
                        Failure failure2 = null;
                        try_2 = Try$.MODULE$.apply(fn);
                        if (try_2 instanceof Failure) {
                            bl = true;
                            failure2 = (Failure)try_2;
                            Throwable x2 = failure2.exception();
                            if (x2 instanceof BindException) {
                                BindException bindException = (BindException)x2;
                                if (stopCond.apply$mcZ$sp()) {
                                    try_ = new Failure((Throwable)bindException);
                                    break block5;
                                }
                                this.sleepBeforeRetry$1(maxSleepBetweenRetries);
                                maxSleepBetweenRetries = this.retryOnBindException$default$3();
                                continue;
                            }
                        }
                        if (!bl || !((x = failure2.exception()) instanceof Exception)) break block6;
                        exception = (Exception)x;
                        Throwable throwable = exception.getCause();
                        if (!(throwable instanceof ChannelException)) break;
                        if (stopCond.apply$mcZ$sp()) {
                            failure = new Failure((Throwable)new RuntimeException("Unable to do further retries starting the actor system"));
                            break block7;
                        }
                        this.sleepBeforeRetry$1(maxSleepBetweenRetries);
                        maxSleepBetweenRetries = this.retryOnBindException$default$3();
                    }
                    failure = new Failure((Throwable)exception);
                }
                try_ = failure;
                break block5;
            }
            try_ = try_2;
        }
        return try_;
    }

    public <T> long retryOnBindException$default$3() {
        return 0L;
    }

    public CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem) {
        return FutureUtils.toJava(actorSystem.terminate()).thenAccept(FunctionUtils.ignoreFn());
    }

    private final void sleepBeforeRetry$1(long maxSleepBetweenRetries$1) {
        if (maxSleepBetweenRetries$1 > 0L) {
            long sleepTime = (long)(Math.random() * (double)maxSleepBetweenRetries$1);
            this.LOG().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Retrying after bind exception. Sleeping for ", " ms."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)sleepTime)})));
            Thread.sleep(sleepTime);
        }
    }

    private AkkaUtils$() {
        MODULE$ = this;
        this.LOG = LoggerFactory.getLogger(this.getClass());
        this.FLINK_ACTOR_SYSTEM_NAME = "flink";
    }
}

