/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.api;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.flink.FlinkVersion;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class CompiledPlanITCase
extends JsonPlanTestBase {
    private static final List<String> DATA = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
    private static final String[] COLUMNS_DEFINITION = new String[]{"a bigint", "b int", "c varchar"};

    CompiledPlanITCase() {
    }

    @Override
    @BeforeEach
    protected void setup() throws Exception {
        super.setup();
        String srcTableDdl = "CREATE TABLE MyTable (\n" + String.join((CharSequence)",", COLUMNS_DEFINITION) + ") with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        this.tableEnv.executeSql(srcTableDdl);
        String sinkTableDdl = "CREATE TABLE MySink (\n" + String.join((CharSequence)",", COLUMNS_DEFINITION) + ") with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tableEnv.executeSql(sinkTableDdl);
    }

    @Test
    void testCompilePlanSql() throws IOException {
        CompiledPlan compiledPlan = this.tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable");
        String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
        Assertions.assertThat((String)TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(TableTestUtil.getFormattedJson(compiledPlan.asJsonString())))).isEqualTo(TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(TableTestUtil.getFormattedJson(expected))));
    }

    @Test
    void testExecutePlanSql() throws Exception {
        File sinkPath = this.createSourceSinkTables();
        this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src").execute().await();
        this.assertResult(DATA, sinkPath);
    }

    @Test
    void testExecuteCtasPlanSql() throws Exception {
        this.createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        File sinkPath = TempDirUtils.newFolder((Path)this.tempFolder);
        Assertions.assertThatThrownBy(() -> this.tableEnv.compilePlanSql(String.format("CREATE TABLE sink\nWITH (\n  'connector' = 'filesystem',\n  'format' = 'testcsv',\n  'path' = '%s'\n) AS SELECT * FROM src", sinkPath.getAbsolutePath())).execute()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, (String)"Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT")});
    }

    @Test
    void testExecutePlanTable() throws Exception {
        File sinkPath = this.createSourceSinkTables();
        this.tableEnv.from("src").select(new Expression[]{Expressions.$((String)"*")}).insertInto("sink").compilePlan().execute().await();
        this.assertResult(DATA, sinkPath);
    }

    @Test
    void testCompileWriteToFileAndThenExecuteSql() throws Exception {
        Path planPath = Paths.get(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath(), "plan.json");
        File sinkPath = this.createSourceSinkTables();
        CompiledPlan plan = this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src");
        plan.writeToFile(planPath);
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath.toAbsolutePath())).await();
        this.assertResult(DATA, sinkPath);
    }

    @Test
    void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Exception {
        Path planPath = Paths.get(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath(), "plan.json");
        File sinkPath = this.createSourceSinkTables();
        this.tableEnv.executeSql(String.format("COMPILE PLAN 'file://%s' FOR INSERT INTO sink SELECT * FROM src", planPath.toAbsolutePath()));
        this.tableEnv.executeSql(String.format("EXECUTE PLAN 'file://%s'", planPath.toAbsolutePath())).await();
        this.assertResult(DATA, sinkPath);
    }

    @Test
    void testCompilePlan() throws Exception {
        Path planPath = Paths.get(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath(), "plan.json").toAbsolutePath();
        File sinkPath = this.createSourceSinkTables();
        TableResult tableResult = this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", planPath));
        Assertions.assertThat((Object)tableResult).isEqualTo((Object)TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat((File)planPath.toFile()).exists();
        Assertions.assertThatThrownBy(() -> this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", planPath))).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, (String)"Cannot overwrite the plan file")});
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath)).await();
        this.assertResult(DATA, sinkPath);
    }

    @Test
    void testCompilePlanWithStatementSet() throws Exception {
        Path planPath = Paths.get(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath(), "plan.json").toAbsolutePath();
        this.createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        File sinkAPath = this.createTestCsvSinkTable("sinkA", COLUMNS_DEFINITION);
        File sinkBPath = this.createTestCsvSinkTable("sinkB", COLUMNS_DEFINITION);
        TableResult tableResult = this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR STATEMENT SET BEGIN INSERT INTO sinkA SELECT * FROM src;INSERT INTO sinkB SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src;END", planPath));
        Assertions.assertThat((Object)tableResult).isEqualTo((Object)TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat((File)planPath.toFile()).exists();
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath)).await();
        this.assertResult(DATA, sinkAPath);
        this.assertResult(Arrays.asList("2,2,hi-something", "3,2,hello-something", "4,3,hello world-something"), sinkBPath);
    }

    @Test
    void testCompilePlanIfNotExists() throws Exception {
        Path planPath = Paths.get(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath(), "plan.json").toAbsolutePath();
        File sinkPath = this.createSourceSinkTables();
        TableResult tableResult = this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' IF NOT EXISTS FOR INSERT INTO sink SELECT * FROM src", planPath));
        Assertions.assertThat((Object)tableResult).isEqualTo((Object)TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat((File)planPath.toFile()).exists();
        Assertions.assertThat((Object)this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' IF NOT EXISTS FOR INSERT INTO sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src", planPath))).isEqualTo((Object)TableResultInternal.TABLE_RESULT_OK);
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath)).await();
        this.assertResult(DATA, sinkPath);
    }

    @Test
    void testCompilePlanOverwrite() throws Exception {
        this.tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, (Object)true);
        Path planPath = Paths.get(URI.create(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath()).getPath(), "plan.json").toAbsolutePath();
        List<String> expectedData = Arrays.asList("2,2,hi-something", "3,2,hello-something", "4,3,hello world-something");
        File sinkPath = this.createSourceSinkTables();
        TableResult tableResult = this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT IF(a > b, a, b) AS a, b + 1 AS b, SUBSTR(c, 1, 4) AS c FROM src WHERE a > 10", planPath));
        Assertions.assertThat((Object)tableResult).isEqualTo((Object)TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat((File)planPath.toFile()).exists();
        Assertions.assertThat((Object)this.tableEnv.executeSql(String.format("COMPILE PLAN '%s' FOR INSERT INTO sink SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src", planPath))).isEqualTo((Object)TableResultInternal.TABLE_RESULT_OK);
        Assertions.assertThat((boolean)TableTestUtil.isValidJson(FileUtils.readFileToString((File)planPath.toFile(), (Charset)StandardCharsets.UTF_8))).isTrue();
        this.tableEnv.executeSql(String.format("EXECUTE PLAN '%s'", planPath)).await();
        this.assertResult(expectedData, sinkPath);
    }

    @Test
    void testCompileAndExecutePlan() throws Exception {
        Path planPath = Paths.get(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath(), "plan.json").toAbsolutePath();
        File sinkPath = this.createSourceSinkTables();
        this.tableEnv.executeSql(String.format("COMPILE AND EXECUTE PLAN '%s' FOR INSERT INTO sink SELECT * FROM src", planPath)).await();
        Assertions.assertThat((File)planPath.toFile()).exists();
        this.assertResult(DATA, sinkPath);
    }

    @Test
    void testCompileAndExecutePlanWithStatementSet() throws Exception {
        Path planPath = Paths.get(TempDirUtils.newFolder((Path)this.tempFolder, (String[])new String[]{"plan"}).getPath(), "plan.json").toAbsolutePath();
        this.createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        File sinkAPath = this.createTestCsvSinkTable("sinkA", COLUMNS_DEFINITION);
        File sinkBPath = this.createTestCsvSinkTable("sinkB", COLUMNS_DEFINITION);
        this.tableEnv.executeSql(String.format("COMPILE AND EXECUTE PLAN '%s' FOR STATEMENT SET BEGIN INSERT INTO sinkA SELECT * FROM src;INSERT INTO sinkB SELECT a + 1, b + 1, CONCAT(c, '-something') FROM src;END", planPath)).await();
        Assertions.assertThat((File)planPath.toFile()).exists();
        this.assertResult(DATA, sinkAPath);
        this.assertResult(Arrays.asList("2,2,hi-something", "3,2,hello-something", "4,3,hello world-something"), sinkBPath);
    }

    @Test
    void testExplainPlan() throws IOException {
        String planFromResources = JsonTestUtils.setFlinkVersion(JsonTestUtils.readFromResource("/jsonplan/testGetJsonPlan.out"), FlinkVersion.current()).toString();
        String actual = this.tableEnv.loadPlan(PlanReference.fromJsonString((String)planFromResources)).explain(new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String expected = TableTestUtil.readFromResource("/explain/testExplainJsonPlan.out");
        Assertions.assertThat((String)TableTestUtil.replaceNodeIdInOperator(TableTestUtil.replaceStreamNodeId(actual))).isEqualTo(expected);
    }

    @Test
    void testPersistedConfigOption() throws Exception {
        List<String> data = Stream.concat(DATA.stream(), Stream.of("4,2,This string is long", "5,3,This is an even longer string")).collect(Collectors.toList());
        String[] sinkColumnDefinitions = new String[]{"a bigint", "b int", "c varchar(11)"};
        this.createTestCsvSourceTable("src", data, COLUMNS_DEFINITION);
        File sinkPath = this.createTestCsvSinkTable("sink", sinkColumnDefinitions);
        this.tableEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, (Object)ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD);
        CompiledPlan plan = this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src");
        this.tableEnv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, (Object)ExecutionConfigOptions.TypeLengthEnforcer.IGNORE);
        plan.execute().await();
        List<String> expected = Stream.concat(DATA.stream(), Stream.of("4,2,This string", "5,3,This is an ")).collect(Collectors.toList());
        this.assertResult(expected, sinkPath);
    }

    @Test
    void testBatchMode() {
        this.tableEnv = TableEnvironment.create((EnvironmentSettings)EnvironmentSettings.inBatchMode());
        String srcTableDdl = "CREATE TABLE src (\n  a bigint\n) with (\n  'connector' = 'values',\n  'bounded' = 'true')";
        this.tableEnv.executeSql(srcTableDdl);
        String sinkTableDdl = "CREATE TABLE sink (\n  a bigint\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tableEnv.executeSql(sinkTableDdl);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src")).isInstanceOf(UnsupportedOperationException.class)).hasMessage("The compiled plan feature is not supported in batch mode.");
    }

    private File createSourceSinkTables() throws IOException {
        this.createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
        return this.createTestCsvSinkTable("sink", COLUMNS_DEFINITION);
    }
}

