/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.utils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public abstract class AtomicCtasITCaseBase {
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension();
    protected TableEnvironment tEnv;

    protected abstract TableEnvironment getTableEnvironment();

    @BeforeEach
    void setup() {
        this.tEnv = this.getTableEnvironment();
        List<Row> sourceData = Collections.singletonList(Row.of((Object[])new Object[]{1, "ZM"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
        this.tEnv.executeSql(sourceDDL);
    }

    @AfterEach
    void clean() {
        TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
        TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
    }

    @Test
    void testAtomicCtas(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForAtomicCtas("atomic_ctas_table", false, temporaryFolder.toFile());
    }

    @Test
    void testAtomicCtasIfNotExists(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForAtomicCtas("atomic_ctas_if_not_exists_table", true, temporaryFolder.toFile());
    }

    private void commonTestForAtomicCtas(String tableName, boolean ifNotExists, File tmpDataFolder) throws Exception {
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, (Object)true);
        String dataDir = tmpDataFolder.getAbsolutePath();
        String sqlFragment = ifNotExists ? " if not exists " + tableName : tableName;
        this.tEnv.executeSql("create table " + sqlFragment + " with ('connector' = 'test-staging', 'data-dir' = '" + dataDir + "') as select * from t1").await();
        Assertions.assertThat((Object[])this.tEnv.listTables()).doesNotContain((Object[])new String[]{tableName});
        this.verifyDataFile(dataDir, "data");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).contains((Object[])new String[]{"begin", "commit"});
        Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(1);
        if (ifNotExists) {
            Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).contains((Object[])new SupportsStaging.StagingPurpose[]{SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS});
        } else {
            Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).contains((Object[])new SupportsStaging.StagingPurpose[]{SupportsStaging.StagingPurpose.CREATE_TABLE_AS});
        }
    }

    @Test
    void testAtomicCtasWithException(@TempDir Path temporaryFolder) throws Exception {
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, (Object)true);
        String dataDir = temporaryFolder.toFile().getAbsolutePath();
        Assertions.assertThatCode(() -> this.tEnv.executeSql("create table atomic_ctas_table_fail with ('connector' = 'test-staging', 'data-dir' = '" + dataDir + "', 'sink-fail' = '" + true + "') as select * from t1").await()).hasRootCauseMessage("Test StagedTable abort method.");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).contains((Object[])new String[]{"begin", "abort"});
    }

    @Test
    void testWithoutAtomicCtas(@TempDir Path temporaryFolder) throws Exception {
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, (Object)false);
        String dataDir = temporaryFolder.toFile().getAbsolutePath();
        this.tEnv.executeSql("create table atomic_ctas_table with ('connector' = 'test-staging', 'data-dir' = '" + dataDir + "') as select * from t1").await();
        Assertions.assertThat((Object[])this.tEnv.listTables()).contains((Object[])new String[]{"atomic_ctas_table"});
        this.verifyDataFile(dataDir, "_data");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(0);
        Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(0);
    }

    private void verifyDataFile(String dataDir, String fileName) throws IOException {
        File dataFile = new File(dataDir, fileName);
        Assertions.assertThat((File)dataFile).exists();
        Assertions.assertThat((File)dataFile).isFile();
        Assertions.assertThat((String)FileUtils.readFileUtf8((File)dataFile)).isEqualTo("1,ZM");
    }
}

