/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageFactory;
import org.apache.flink.runtime.state.ConfigurableCheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

@Internal
public class CheckpointStorageLoader {
    private static final String JOB_MANAGER_STORAGE_NAME = "jobmanager";
    private static final String FILE_SYSTEM_STORAGE_NAME = "filesystem";

    public static Optional<CheckpointStorage> fromConfig(ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) throws IllegalStateException, DynamicCodeLoadingException {
        CheckpointStorageFactory factory;
        Preconditions.checkNotNull((Object)config, (String)"config");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        String storageName = (String)config.get(CheckpointingOptions.CHECKPOINT_STORAGE);
        if (storageName == null) {
            if (logger != null) {
                logger.debug("The configuration {} has not be set in the current sessions flink-conf.yaml. Falling back to a default CheckpointStorage type. Users are strongly encouraged explicitly set this configuration so they understand how their applications are checkpointing snapshots for fault-tolerance.", (Object)CheckpointingOptions.CHECKPOINT_STORAGE.key());
            }
            return Optional.empty();
        }
        switch (storageName.toLowerCase()) {
            case "jobmanager": {
                return Optional.of(CheckpointStorageLoader.createJobManagerCheckpointStorage(config, classLoader, logger));
            }
            case "filesystem": {
                return Optional.of(CheckpointStorageLoader.createFileSystemCheckpointStorage(config, classLoader, logger));
            }
        }
        if (logger != null) {
            logger.info("Loading state backend via factory '{}'", (Object)storageName);
        }
        try {
            Class<CheckpointStorageFactory> clazz = Class.forName(storageName, false, classLoader).asSubclass(CheckpointStorageFactory.class);
            factory = clazz.newInstance();
        }
        catch (ClassNotFoundException e) {
            throw new DynamicCodeLoadingException("Cannot find configured state backend factory class: " + storageName, (Throwable)e);
        }
        catch (ClassCastException | IllegalAccessException | InstantiationException e) {
            throw new DynamicCodeLoadingException("The class configured under '" + CheckpointingOptions.CHECKPOINT_STORAGE.key() + "' is not a valid checkpoint storage factory (" + storageName + ')', (Throwable)e);
        }
        return Optional.of(factory.createFromConfig(config, classLoader));
    }

    public static CheckpointStorage load(@Nullable CheckpointStorage fromApplication, @Nullable Path defaultSavepointDirectory, StateBackend configuredStateBackend, Configuration config, ClassLoader classLoader, @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException {
        StateBackend rootStateBackend;
        Preconditions.checkNotNull((Object)config, (String)"config");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        Preconditions.checkNotNull((Object)configuredStateBackend, (String)"statebackend");
        if (defaultSavepointDirectory != null) {
            config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)defaultSavepointDirectory.toString());
        }
        StateBackend stateBackend = rootStateBackend = configuredStateBackend instanceof DelegatingStateBackend ? ((DelegatingStateBackend)configuredStateBackend).getDelegatedStateBackend() : configuredStateBackend;
        if (rootStateBackend instanceof CheckpointStorage) {
            if (logger != null) {
                logger.info("Using legacy state backend {} as Job checkpoint storage", (Object)rootStateBackend);
            }
            return (CheckpointStorage)((Object)rootStateBackend);
        }
        if (fromApplication instanceof ConfigurableCheckpointStorage) {
            if (logger != null) {
                logger.info("Using job/cluster config to configure application-defined checkpoint storage: {}", (Object)fromApplication);
            }
            return ((ConfigurableCheckpointStorage)fromApplication).configure((ReadableConfig)config, classLoader);
        }
        if (fromApplication != null) {
            if (logger != null) {
                logger.info("Using application defined checkpoint storage: {}", (Object)fromApplication);
            }
            return fromApplication;
        }
        return CheckpointStorageLoader.fromConfig((ReadableConfig)config, classLoader, logger).orElseGet(() -> CheckpointStorageLoader.createDefaultCheckpointStorage((ReadableConfig)config, classLoader, logger));
    }

    private static CheckpointStorage createDefaultCheckpointStorage(ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) {
        if (config.getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY).isPresent()) {
            return CheckpointStorageLoader.createFileSystemCheckpointStorage(config, classLoader, logger);
        }
        return CheckpointStorageLoader.createJobManagerCheckpointStorage(config, classLoader, logger);
    }

    private static CheckpointStorage createFileSystemCheckpointStorage(ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) {
        FileSystemCheckpointStorage storage = FileSystemCheckpointStorage.createFromConfig(config, classLoader);
        if (logger != null) {
            logger.info("Checkpoint storage is set to '{}': (checkpoints \"{}\")", (Object)FILE_SYSTEM_STORAGE_NAME, (Object)storage.getCheckpointPath());
        }
        return storage;
    }

    private static CheckpointStorage createJobManagerCheckpointStorage(ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) {
        if (logger != null) {
            logger.info("Checkpoint storage is set to '{}'", (Object)JOB_MANAGER_STORAGE_NAME);
        }
        return JobManagerCheckpointStorage.createFromConfig(config, classLoader);
    }
}

