/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.util;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.WritableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MiniClusterPipelineExecutorServiceLoader
implements PipelineExecutorServiceLoader {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterPipelineExecutorServiceLoader.class);
    public static final String NAME = "minicluster";
    private final MiniCluster miniCluster;

    public MiniClusterPipelineExecutorServiceLoader(MiniCluster miniCluster) {
        this.miniCluster = miniCluster;
    }

    public static Configuration updateConfigurationForMiniCluster(Configuration config, Collection<Path> jarFiles, Collection<URL> classPaths) {
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, PipelineOptions.JARS);
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, PipelineOptions.CLASSPATHS);
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, DeploymentOptions.TARGET);
        MiniClusterPipelineExecutorServiceLoader.checkOverridesOption(config, DeploymentOptions.ATTACHED);
        ConfigUtils.encodeCollectionToConfig((WritableConfig)config, (ConfigOption)PipelineOptions.JARS, jarFiles, MiniClusterPipelineExecutorServiceLoader::getAbsoluteURL);
        ConfigUtils.encodeCollectionToConfig((WritableConfig)config, (ConfigOption)PipelineOptions.CLASSPATHS, classPaths, URL::toString);
        config.set(DeploymentOptions.TARGET, (Object)NAME);
        config.set(DeploymentOptions.ATTACHED, (Object)true);
        return config;
    }

    private static void checkOverridesOption(Configuration config, ConfigOption<?> option) {
        if (config.contains(option)) {
            LOG.warn("Overriding config setting '{}' for MiniCluster.", (Object)option.key());
        }
    }

    private static String getAbsoluteURL(Path path) {
        FileSystem fs;
        try {
            fs = path.getFileSystem();
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Could not get FileSystem from %s", path), e);
        }
        try {
            return path.makeQualified(fs).toUri().toURL().toString();
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(String.format("Could not get URL from %s", path), e);
        }
    }

    public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
        return new MiniClusterPipelineExecutorFactory(this.miniCluster);
    }

    public Stream<String> getExecutorNames() {
        return Stream.of(NAME);
    }

    private static class MiniClusterExecutor
    implements PipelineExecutor {
        private final MiniCluster miniCluster;

        public MiniClusterExecutor(MiniCluster miniCluster) {
            this.miniCluster = miniCluster;
        }

        public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassLoader) throws Exception {
            JobGraph jobGraph = PipelineExecutorUtils.getJobGraph((Pipeline)pipeline, (Configuration)configuration);
            if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none() && pipeline instanceof StreamGraph) {
                jobGraph.setSavepointRestoreSettings(((StreamGraph)pipeline).getSavepointRestoreSettings());
            }
            return this.miniCluster.submitJob(jobGraph).thenApply(result -> new MiniClusterJobClient(result.getJobID(), this.miniCluster, userCodeClassLoader, MiniClusterJobClient.JobFinalizationBehavior.NOTHING));
        }
    }

    private static class MiniClusterPipelineExecutorFactory
    implements PipelineExecutorFactory {
        private final MiniCluster miniCluster;

        public MiniClusterPipelineExecutorFactory(MiniCluster miniCluster) {
            this.miniCluster = miniCluster;
        }

        public String getName() {
            return MiniClusterPipelineExecutorServiceLoader.NAME;
        }

        public boolean isCompatibleWith(Configuration configuration) {
            return true;
        }

        public PipelineExecutor getExecutor(Configuration configuration) {
            return new MiniClusterExecutor(this.miniCluster);
        }
    }
}

