/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.testutils;

import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.SqlTestStep;
import org.apache.flink.table.test.program.StatementSetTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.table.test.program.TableTestProgramRunner;
import org.apache.flink.table.test.program.TestStep;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@ExtendWith(value={MiniClusterExtension.class})
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
public abstract class RestoreTestBase
implements TableTestProgramRunner {
    private final Class<? extends ExecNode<?>> execNodeUnderTest;
    private final AfterRestoreSource afterRestoreSource;
    @TempDir
    private Path tmpDir;

    protected RestoreTestBase(Class<? extends ExecNode<?>> execNodeUnderTest) {
        this.execNodeUnderTest = execNodeUnderTest;
        this.afterRestoreSource = AfterRestoreSource.FINITE;
    }

    protected RestoreTestBase(Class<? extends ExecNode<?>> execNodeUnderTest, AfterRestoreSource state) {
        this.execNodeUnderTest = execNodeUnderTest;
        this.afterRestoreSource = state;
    }

    public EnumSet<TestStep.TestKind> supportedSetupSteps() {
        return EnumSet.of(TestStep.TestKind.CONFIG, TestStep.TestKind.FUNCTION, TestStep.TestKind.TEMPORAL_FUNCTION, TestStep.TestKind.SOURCE_WITH_RESTORE_DATA, TestStep.TestKind.SINK_WITH_RESTORE_DATA);
    }

    public EnumSet<TestStep.TestKind> supportedRunSteps() {
        return EnumSet.of(TestStep.TestKind.SQL, TestStep.TestKind.STATEMENT_SET);
    }

    @AfterEach
    public void clearData() {
        TestValuesTableFactory.clearAllData();
    }

    private List<ExecNodeMetadata> getAllMetadata() {
        return ExecNodeMetadataUtil.extractMetadataFromAnnotation(this.execNodeUnderTest);
    }

    private ExecNodeMetadata getLatestMetadata() {
        return ExecNodeMetadataUtil.latestAnnotation(this.execNodeUnderTest);
    }

    private Stream<Arguments> createSpecs() {
        return this.getAllMetadata().stream().flatMap(metadata -> this.supportedPrograms().stream().map(p -> Arguments.of((Object[])new Object[]{p, metadata})));
    }

    private void registerSinkObserver(List<CompletableFuture<?>> futures, SinkTestStep sinkTestStep, boolean ignoreAfter) {
        CompletableFuture future = new CompletableFuture();
        futures.add(future);
        String tableName = sinkTestStep.name;
        TestValuesTableFactory.registerLocalRawResultsObserver(tableName, (integer, strings) -> {
            List<String> expectedResults;
            boolean shouldComplete;
            ArrayList results = new ArrayList(sinkTestStep.getExpectedBeforeRestoreAsStrings());
            if (!ignoreAfter) {
                results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings());
            }
            if (shouldComplete = CollectionUtils.isEqualCollection(expectedResults = RestoreTestBase.getExpectedResults(sinkTestStep, tableName), results)) {
                future.complete(null);
            }
        });
    }

    @Disabled
    @ParameterizedTest
    @MethodSource(value={"supportedPrograms"})
    @Order(value=0)
    public void generateTestSetupFiles(TableTestProgram program) throws Exception {
        CompiledPlan compiledPlan;
        HashMap<String, String> options;
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, (Object)"rocksdb");
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)settings);
        program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
        tEnv.getConfig().set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, (Object)TableConfigOptions.CatalogPlanCompilation.SCHEMA);
        for (Object sourceTestStep : program.getSetupSourceTestSteps()) {
            String id = TestValuesTableFactory.registerData(((SourceTestStep)sourceTestStep).dataBeforeRestore);
            options = new HashMap<String, String>();
            options.put("connector", "values");
            options.put("data-id", id);
            options.put("terminating", "false");
            options.put("runtime-source", "NewSource");
            sourceTestStep.apply(tEnv, options);
        }
        ArrayList futures = new ArrayList();
        for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
            this.registerSinkObserver(futures, sinkTestStep, true);
            options = new HashMap();
            options.put("connector", "values");
            options.put("sink-insert-only", "false");
            sinkTestStep.apply(tEnv, options);
        }
        program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
        program.getSetupTemporalFunctionTestSteps().forEach(s -> s.apply(tEnv));
        if (((TestStep)program.runSteps.get(0)).getKind() == TestStep.TestKind.STATEMENT_SET) {
            StatementSetTestStep statementSetTestStep = program.getRunStatementSetTestStep();
            compiledPlan = statementSetTestStep.compiledPlan(tEnv);
        } else {
            SqlTestStep sqlTestStep = program.getRunSqlTestStep();
            compiledPlan = tEnv.compilePlanSql(sqlTestStep.sql);
        }
        compiledPlan.writeToFile(this.getPlanPath(program, this.getLatestMetadata()));
        TableResult tableResult = compiledPlan.execute();
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        JobClient jobClient = (JobClient)tableResult.getJobClient().get();
        String savepoint = (String)jobClient.stopWithSavepoint(false, this.tmpDir.toString(), SavepointFormatType.DEFAULT).get();
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.FINISHED));
        Path savepointPath = Paths.get(new URI(savepoint));
        Path savepointDirPath = this.getSavepointPath(program, this.getLatestMetadata());
        Files.createDirectories(savepointDirPath, new FileAttribute[0]);
        Files.move(savepointPath, savepointDirPath, StandardCopyOption.ATOMIC_MOVE);
    }

    @ParameterizedTest
    @MethodSource(value={"createSpecs"})
    @Order(value=1)
    void testRestore(TableTestProgram program, ExecNodeMetadata metadata) throws Exception {
        HashMap<String, String> options;
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath((String)this.getSavepointPath(program, metadata).toString(), (boolean)false, (RestoreMode)RestoreMode.NO_CLAIM);
        SavepointRestoreSettings.toConfiguration((SavepointRestoreSettings)restoreSettings, (Configuration)settings.getConfiguration());
        settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, (Object)"rocksdb");
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)settings);
        tEnv.getConfig().set(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, (Object)TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
        program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
        for (Object sourceTestStep : program.getSetupSourceTestSteps()) {
            String id = TestValuesTableFactory.registerData(((SourceTestStep)sourceTestStep).dataAfterRestore);
            options = new HashMap<String, String>();
            options.put("connector", "values");
            options.put("data-id", id);
            options.put("runtime-source", "NewSource");
            if (this.afterRestoreSource == AfterRestoreSource.INFINITE) {
                options.put("terminating", "false");
            }
            sourceTestStep.apply(tEnv, options);
        }
        ArrayList futures = new ArrayList();
        for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
            if (this.afterRestoreSource == AfterRestoreSource.INFINITE) {
                this.registerSinkObserver(futures, sinkTestStep, false);
            }
            options = new HashMap();
            options.put("connector", "values");
            options.put("disable-lookup", "true");
            options.put("sink-insert-only", "false");
            sinkTestStep.apply(tEnv, options);
        }
        program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
        program.getSetupTemporalFunctionTestSteps().forEach(s -> s.apply(tEnv));
        CompiledPlan compiledPlan = tEnv.loadPlan(PlanReference.fromFile((Path)this.getPlanPath(program, metadata)));
        if (this.afterRestoreSource == AfterRestoreSource.INFINITE) {
            TableResult tableResult = compiledPlan.execute();
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
            ((JobClient)tableResult.getJobClient().get()).cancel().get();
        } else {
            compiledPlan.execute().await();
            for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
                List<String> expectedResults = RestoreTestBase.getExpectedResults(sinkTestStep, sinkTestStep.name);
                Assertions.assertThat(expectedResults).containsExactlyInAnyOrder((Object[])Stream.concat(sinkTestStep.getExpectedBeforeRestoreAsStrings().stream(), sinkTestStep.getExpectedAfterRestoreAsStrings().stream()).toArray(String[]::new));
            }
        }
    }

    private Path getPlanPath(TableTestProgram program, ExecNodeMetadata metadata) {
        return Paths.get(this.getTestResourceDirectory(program, metadata) + "/plan/" + program.id + ".json", new String[0]);
    }

    private Path getSavepointPath(TableTestProgram program, ExecNodeMetadata metadata) {
        return Paths.get(this.getTestResourceDirectory(program, metadata) + "/savepoint/", new String[0]);
    }

    private String getTestResourceDirectory(TableTestProgram program, ExecNodeMetadata metadata) {
        return String.format("%s/src/test/resources/restore-tests/%s_%d/%s", System.getProperty("user.dir"), metadata.name(), metadata.version(), program.id);
    }

    private static List<String> getExpectedResults(SinkTestStep sinkTestStep, String tableName) {
        if (sinkTestStep.getTestChangelogData()) {
            return TestValuesTableFactory.getRawResultsAsStrings(tableName);
        }
        return TestValuesTableFactory.getResultsAsStrings(tableName);
    }

    protected static enum AfterRestoreSource {
        FINITE,
        INFINITE;

    }
}

