/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table;

import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.junit.Assert;
import org.junit.Test;

public class FileSystemTableSinkTest {
    @Test
    public void testExceptionWhenSettingParallelismWithUpdatingQuery() {
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        String testSourceTableName = "test_source_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSourceTableSql("test_source_table", false));
        String testSinkTableName = "test_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_sink_table", 10, false));
        String sql = String.format("INSERT INTO %s SELECT DISTINCT * FROM %s", "test_sink_table", "test_source_table");
        CommonTestUtils.assertThrows((String)"filesystem sink doesn't support setting parallelism (10) by 'sink.parallelism' when the input stream is not INSERT only.", ValidationException.class, () -> tEnv.explainSql(sql, new ExplainDetail[0]));
    }

    @Test
    public void testFileSystemTableSinkWithParallelismInStreaming() {
        int parallelism = 5;
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inStreamingMode());
        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)8);
        String testSourceTableName = "test_source_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSourceTableSql("test_source_table", false));
        String testSinkTableName = "test_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_sink_table", 5, false));
        String sql0 = FileSystemTableSinkTest.buildInsertIntoSql("test_sink_table", "test_source_table");
        String actualNormal = tEnv.explainSql(sql0, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expectedNormal = TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInStreamingSql0.out");
        Assert.assertEquals((Object)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(expectedNormal))), (Object)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(actualNormal))));
        String testCompactSinkTableName = "test_compact_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_compact_sink_table", 5, true));
        String sql1 = FileSystemTableSinkTest.buildInsertIntoSql("test_compact_sink_table", "test_source_table");
        String actualCompact = tEnv.explainSql(sql1, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expectedCompact = TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInStreamingSql1.out");
        Assert.assertEquals((Object)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(expectedCompact))), (Object)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(actualCompact))));
    }

    @Test
    public void testFileSystemTableSinkWithParallelismInBatch() {
        int parallelism = 5;
        TableEnvironment tEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inBatchMode());
        tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)8);
        String testSourceTableName = "test_source_table";
        String testSinkTableName = "test_sink_table";
        tEnv.executeSql(FileSystemTableSinkTest.buildSourceTableSql("test_source_table", true));
        tEnv.executeSql(FileSystemTableSinkTest.buildSinkTableSql("test_sink_table", 5, false));
        String sql = FileSystemTableSinkTest.buildInsertIntoSql("test_sink_table", "test_source_table");
        String actual = tEnv.explainSql(sql, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expected = TableTestUtil.readFromResource("/explain/filesystem/testFileSystemTableSinkWithParallelismInBatch.out");
        Assert.assertEquals((Object)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(expected))), (Object)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(TableTestUtil.replaceStageId(actual))));
    }

    private static String buildSourceTableSql(String testSourceTableName, boolean bounded) {
        return String.format("CREATE TABLE %s ( id BIGINT, real_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 4)) WITH ( 'connector' = 'values', 'bounded' = '%s')", testSourceTableName, bounded);
    }

    private static String buildSinkTableSql(String tableName, int parallelism, boolean autoCompaction) {
        return String.format("CREATE TABLE %s ( id BIGINT, real_col FLOAT, double_col DOUBLE, decimal_col DECIMAL(10, 4)) WITH ( 'connector' = 'filesystem', 'path' = '/tmp', 'auto-compaction' = '%s', 'format' = 'testcsv', 'sink.parallelism' = '%s')", tableName, autoCompaction, parallelism);
    }

    private static String buildInsertIntoSql(String sinkTable, String sourceTable) {
        return String.format("INSERT INTO %s SELECT * FROM %s", sinkTable, sourceTable);
    }
}

