/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.executors;

import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.util.Preconditions;

@Internal
public class LocalExecutor
implements PipelineExecutor {
    public static final String NAME = "local";

    public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
        Preconditions.checkNotNull((Object)pipeline);
        Preconditions.checkNotNull((Object)configuration);
        Preconditions.checkState((boolean)configuration.getBoolean(DeploymentOptions.ATTACHED));
        JobGraph jobGraph = this.getJobGraph(pipeline, configuration);
        MiniCluster miniCluster = this.startMiniCluster(jobGraph, configuration);
        MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);
        CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);
        ((CompletableFuture)jobIdFuture.thenCompose(clusterClient::requestJobResult)).thenAccept(jobResult -> clusterClient.shutDownCluster());
        return jobIdFuture.thenApply(jobID -> new ClusterClientJobClientAdapter(() -> clusterClient, (JobID)jobID));
    }

    private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
        if (pipeline instanceof Plan) {
            Plan plan = (Plan)pipeline;
            int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
            int numTaskManagers = configuration.getInteger("local.number-taskmanager", 1);
            plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
        }
        return FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, 1);
    }

    private MiniCluster startMiniCluster(JobGraph jobGraph, Configuration configuration) throws Exception {
        if (!configuration.contains(RestOptions.BIND_PORT)) {
            configuration.setString(RestOptions.BIND_PORT, "0");
        }
        int numTaskManagers = configuration.getInteger("local.number-taskmanager", 1);
        int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
        MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumTaskManagers(numTaskManagers).setRpcServiceSharing(RpcServiceSharing.SHARED).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build();
        MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
        miniCluster.start();
        configuration.setInteger(RestOptions.PORT, ((URI)miniCluster.getRestAddress().get()).getPort());
        return miniCluster;
    }

    private void shutdownMiniCluster(MiniCluster miniCluster) {
        try {
            if (miniCluster != null) {
                miniCluster.close();
            }
        }
        catch (Exception e) {
            throw new CompletionException(e);
        }
    }
}

