/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.utils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public abstract class AtomicRtasITCaseBase {
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension();
    protected TableEnvironment tEnv;

    protected abstract TableEnvironment getTableEnvironment();

    @BeforeEach
    void setup() {
        this.tEnv = this.getTableEnvironment();
        List<Row> sourceData = Collections.singletonList(Row.of((Object[])new Object[]{1, "ZM"}));
        TestCollectionTableFactory.reset();
        TestCollectionTableFactory.initData(sourceData);
        String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
        this.tEnv.executeSql(sourceDDL);
    }

    @AfterEach
    void clean() {
        TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
        TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
    }

    @Test
    void testAtomicReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForAtomicReplaceTableAs("atomic_replace_table", false, true, temporaryFolder.toFile());
    }

    @Test
    void testAtomicReplaceTableAsWithReplacedTableNotExists(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForAtomicReplaceTableAs("atomic_replace_table_not_exists", false, false, temporaryFolder.toFile());
    }

    @Test
    void testAtomicCreateOrReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForAtomicReplaceTableAs("atomic_create_or_replace_table", true, true, temporaryFolder.toFile());
    }

    @Test
    void testAtomicCreateOrReplaceTableAsWithReplacedTableNotExists(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForAtomicReplaceTableAs("atomic_create_or_replace_table_not_exists", true, false, temporaryFolder.toFile());
    }

    private void commonTestForAtomicReplaceTableAs(String tableName, boolean isCreateOrReplace, boolean isCreateReplacedTable, File tmpDataFolder) throws Exception {
        if (isCreateReplacedTable) {
            this.tEnv.executeSql("create table " + tableName + " (a int) with ('connector' = 'PRINT')");
        }
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, (Object)true);
        String dataDir = tmpDataFolder.getAbsolutePath();
        String sqlFragment = this.getCreateOrReplaceSqlFragment(isCreateOrReplace, tableName);
        String sql = sqlFragment + " with ('connector' = 'test-staging', 'data-dir' = '" + dataDir + "') as select * from t1";
        if (!isCreateOrReplace && !isCreateReplacedTable) {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql(sql)).isInstanceOf(TableException.class)).hasMessage("The table `default_catalog`.`default_database`.`" + tableName + "` to be replaced doesn't exist. You can try to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement.");
        } else {
            this.tEnv.executeSql(sql).await();
            if (isCreateReplacedTable) {
                Assertions.assertThat((Object[])this.tEnv.listTables()).contains((Object[])new String[]{tableName});
            } else {
                Assertions.assertThat((Object[])this.tEnv.listTables()).doesNotContain((Object[])new String[]{tableName});
            }
            this.verifyDataFile(dataDir, "data");
            Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
            Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).contains((Object[])new String[]{"begin", "commit"});
            Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(1);
            if (isCreateOrReplace) {
                Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).contains((Object[])new SupportsStaging.StagingPurpose[]{SupportsStaging.StagingPurpose.CREATE_OR_REPLACE_TABLE_AS});
            } else {
                Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).contains((Object[])new SupportsStaging.StagingPurpose[]{SupportsStaging.StagingPurpose.REPLACE_TABLE_AS});
            }
        }
    }

    @Test
    void testAtomicReplaceTableAsWithException(@TempDir Path temporaryFolder) {
        this.commonTestForAtomicReplaceTableAsWithException("atomic_replace_table_fail", false, temporaryFolder.toFile());
    }

    @Test
    void testAtomicCreateOrReplaceTableAsWithException(@TempDir Path temporaryFolder) {
        this.commonTestForAtomicReplaceTableAsWithException("atomic_create_or_replace_table_fail", true, temporaryFolder.toFile());
    }

    private void commonTestForAtomicReplaceTableAsWithException(String tableName, boolean isCreateOrReplace, File tmpDataFolder) {
        this.tEnv.executeSql("create table " + tableName + " (a int) with ('connector' = 'PRINT')");
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, (Object)true);
        String dataDir = tmpDataFolder.getAbsolutePath();
        String sqlFragment = this.getCreateOrReplaceSqlFragment(isCreateOrReplace, tableName);
        Assertions.assertThatCode(() -> this.tEnv.executeSql(sqlFragment + " with ('connector' = 'test-staging', 'data-dir' = '" + dataDir + "', 'sink-fail' = '" + true + "') as select * from t1").await()).hasRootCauseMessage("Test StagedTable abort method.");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).contains((Object[])new String[]{"begin", "abort"});
    }

    @Test
    void testWithoutAtomicReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForWithoutAtomicReplaceTableAs("non_atomic_replace_table", false, temporaryFolder.toFile());
    }

    @Test
    void testWithoutAtomicCreateOrReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
        this.commonTestForWithoutAtomicReplaceTableAs("non_atomic_create_or_replace_table", true, temporaryFolder.toFile());
    }

    private void commonTestForWithoutAtomicReplaceTableAs(String tableName, boolean isCreateOrReplace, File tmpDataFolder) throws Exception {
        this.tEnv.executeSql("create table " + tableName + " (a int) with ('connector' = 'PRINT')");
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, (Object)false);
        String dataDir = tmpDataFolder.getAbsolutePath();
        String sqlFragment = this.getCreateOrReplaceSqlFragment(isCreateOrReplace, tableName);
        this.tEnv.executeSql(sqlFragment + " with ('connector' = 'test-staging', 'data-dir' = '" + dataDir + "') as select * from t1").await();
        Assertions.assertThat((Object[])this.tEnv.listTables()).contains((Object[])new String[]{tableName});
        this.verifyDataFile(dataDir, "_data");
        Assertions.assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(0);
        Assertions.assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(0);
    }

    private void verifyDataFile(String dataDir, String fileName) throws IOException {
        File dataFile = new File(dataDir, fileName);
        Assertions.assertThat((File)dataFile).exists();
        Assertions.assertThat((File)dataFile).isFile();
        Assertions.assertThat((String)FileUtils.readFileUtf8((File)dataFile)).isEqualTo("1,ZM");
    }

    private String getCreateOrReplaceSqlFragment(boolean isCreateOrReplace, String tableName) {
        return isCreateOrReplace ? " create or replace table " + tableName : " replace table " + tableName;
    }
}

