/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AkkaRpcServiceUtils {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
    private static final String AKKA_TCP = "akka.tcp";
    private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
    static final String SUPERVISOR_NAME = "rpc";
    private static final String SIMPLE_AKKA_CONFIG_TEMPLATE = "akka {remote {netty.tcp {maximum-frame-size = %s}}}";
    private static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
    private static final AtomicLong nextNameOffset = new AtomicLong(0L);

    public static AkkaRpcService createRemoteRpcService(Configuration configuration, @Nullable String externalAddress, String externalPortRange, @Nullable String bindAddress, Optional<Integer> bindPort) throws Exception {
        AkkaRpcServiceBuilder akkaRpcServiceBuilder = AkkaRpcServiceUtils.remoteServiceBuilder(configuration, externalAddress, externalPortRange);
        if (bindAddress != null) {
            akkaRpcServiceBuilder.withBindAddress(bindAddress);
        }
        bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);
        return akkaRpcServiceBuilder.createAndStart();
    }

    public static AkkaRpcServiceBuilder remoteServiceBuilder(Configuration configuration, @Nullable String externalAddress, String externalPortRange) {
        return new AkkaRpcServiceBuilder(configuration, LOG, externalAddress, externalPortRange);
    }

    @VisibleForTesting
    public static AkkaRpcServiceBuilder remoteServiceBuilder(Configuration configuration, @Nullable String externalAddress, int externalPort) {
        return AkkaRpcServiceUtils.remoteServiceBuilder(configuration, externalAddress, String.valueOf(externalPort));
    }

    public static AkkaRpcServiceBuilder localServiceBuilder(Configuration configuration) {
        return new AkkaRpcServiceBuilder(configuration, LOG);
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, HighAvailabilityServicesUtils.AddressResolution addressResolution, Configuration config) throws UnknownHostException {
        Preconditions.checkNotNull((Object)config, (String)"config is null");
        boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.isInternalSSLEnabled(config);
        return AkkaRpcServiceUtils.getRpcUrl(hostname, port, endpointName, addressResolution, sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, HighAvailabilityServicesUtils.AddressResolution addressResolution, AkkaProtocol akkaProtocol) throws UnknownHostException {
        Preconditions.checkNotNull((Object)hostname, (String)"hostname is null");
        Preconditions.checkNotNull((Object)endpointName, (String)"endpointName is null");
        Preconditions.checkArgument((boolean)NetUtils.isValidClientPort((int)port), (Object)"port must be in [1, 65535]");
        if (addressResolution == HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION) {
            InetAddress.getByName(hostname);
        }
        String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString((String)hostname, (int)port);
        return AkkaRpcServiceUtils.internalRpcUrl(endpointName, Optional.of(new RemoteAddressInformation(hostPort, akkaProtocol)));
    }

    public static String getLocalRpcUrl(String endpointName) {
        return AkkaRpcServiceUtils.internalRpcUrl(endpointName, Optional.empty());
    }

    private static String internalRpcUrl(String endpointName, Optional<RemoteAddressInformation> remoteAddressInformation) {
        String protocolPrefix = remoteAddressInformation.map(rai -> AkkaRpcServiceUtils.akkaProtocolToString(((RemoteAddressInformation)rai).getAkkaProtocol())).orElse("akka");
        Optional<String> optionalHostnameAndPort = remoteAddressInformation.map(rec$ -> ((RemoteAddressInformation)rec$).getHostnameAndPort());
        StringBuilder url = new StringBuilder(String.format("%s://flink", protocolPrefix));
        optionalHostnameAndPort.ifPresent(hostPort -> url.append("@").append((String)hostPort));
        url.append("/user/").append(SUPERVISOR_NAME).append("/").append(endpointName);
        return url.toString();
    }

    private static String akkaProtocolToString(AkkaProtocol akkaProtocol) {
        return akkaProtocol == AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
    }

    public static String createRandomName(String prefix) {
        long nameOffset;
        Preconditions.checkNotNull((Object)prefix, (String)"Prefix must not be null.");
        while (!nextNameOffset.compareAndSet(nameOffset = nextNameOffset.get(), nameOffset + 1L)) {
        }
        return prefix + '_' + nameOffset;
    }

    public static String createWildcardName(String prefix) {
        return prefix + "_*";
    }

    public static long extractMaximumFramesize(Configuration configuration) {
        String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
        String akkaConfigStr = String.format(SIMPLE_AKKA_CONFIG_TEMPLATE, maxFrameSizeStr);
        Config akkaConfig = ConfigFactory.parseString((String)akkaConfigStr);
        return akkaConfig.getBytes(MAXIMUM_FRAME_SIZE_PATH);
    }

    private AkkaRpcServiceUtils() {
    }

    public static class AkkaRpcServiceBuilder {
        private final Configuration configuration;
        private final Logger logger;
        @Nullable
        private final String externalAddress;
        @Nullable
        private final String externalPortRange;
        private String actorSystemName = AkkaUtils.getFlinkActorSystemName();
        @Nullable
        private BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration = null;
        @Nullable
        private Config customConfig = null;
        private String bindAddress = NetUtils.getWildcardIPAddress();
        @Nullable
        private Integer bindPort = null;

        private AkkaRpcServiceBuilder(Configuration configuration, Logger logger, @Nullable String externalAddress, String externalPortRange) {
            this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
            this.logger = (Logger)Preconditions.checkNotNull((Object)logger);
            this.externalAddress = externalAddress == null ? InetAddress.getLoopbackAddress().getHostAddress() : externalAddress;
            this.externalPortRange = (String)Preconditions.checkNotNull((Object)externalPortRange);
        }

        private AkkaRpcServiceBuilder(Configuration configuration, Logger logger) {
            this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
            this.logger = (Logger)Preconditions.checkNotNull((Object)logger);
            this.externalAddress = null;
            this.externalPortRange = null;
        }

        public AkkaRpcServiceBuilder withActorSystemName(String actorSystemName) {
            this.actorSystemName = (String)Preconditions.checkNotNull((Object)actorSystemName);
            return this;
        }

        public AkkaRpcServiceBuilder withActorSystemExecutorConfiguration(BootstrapTools.ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) {
            this.actorSystemExecutorConfiguration = actorSystemExecutorConfiguration;
            return this;
        }

        public AkkaRpcServiceBuilder withCustomConfig(Config customConfig) {
            this.customConfig = customConfig;
            return this;
        }

        public AkkaRpcServiceBuilder withBindAddress(String bindAddress) {
            this.bindAddress = (String)Preconditions.checkNotNull((Object)bindAddress);
            return this;
        }

        public AkkaRpcServiceBuilder withBindPort(int bindPort) {
            Preconditions.checkArgument((boolean)NetUtils.isValidHostPort((int)bindPort), (Object)("Invalid port number: " + bindPort));
            this.bindPort = bindPort;
            return this;
        }

        public AkkaRpcService createAndStart() throws Exception {
            return this.createAndStart(AkkaRpcService::new);
        }

        public AkkaRpcService createAndStart(BiFunction<ActorSystem, AkkaRpcServiceConfiguration, AkkaRpcService> constructor) throws Exception {
            if (this.actorSystemExecutorConfiguration == null) {
                this.actorSystemExecutorConfiguration = BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(this.configuration);
            }
            ActorSystem actorSystem = this.externalAddress == null ? BootstrapTools.startLocalActorSystem(this.configuration, this.actorSystemName, this.logger, this.actorSystemExecutorConfiguration, this.customConfig) : BootstrapTools.startRemoteActorSystem(this.configuration, this.actorSystemName, this.externalAddress, this.externalPortRange, this.bindAddress, Optional.ofNullable(this.bindPort), this.logger, this.actorSystemExecutorConfiguration, this.customConfig);
            return constructor.apply(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(this.configuration));
        }
    }

    public static enum AkkaProtocol {
        TCP,
        SSL_TCP;

    }

    private static final class RemoteAddressInformation {
        private final String hostnameAndPort;
        private final AkkaProtocol akkaProtocol;

        private RemoteAddressInformation(String hostnameAndPort, AkkaProtocol akkaProtocol) {
            this.hostnameAndPort = hostnameAndPort;
            this.akkaProtocol = akkaProtocol;
        }

        private String getHostnameAndPort() {
            return this.hostnameAndPort;
        }

        private AkkaProtocol getAkkaProtocol() {
            return this.akkaProtocol;
        }
    }
}

