/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.analyze;

import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf;
import org.apache.flink.table.planner.utils.PlanKind;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Enumeration;
import scala.Function0;
import scala.runtime.BoxedUnit;

class NonDeterministicUpdateAnalyzerTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(TableConfig.getDefault());

    NonDeterministicUpdateAnalyzerTest() {
    }

    @BeforeEach
    void before() {
        this.util.getTableEnv().executeSql("create temporary table cdc (\n  a int,\n  b bigint,\n  c string,\n  d bigint,\n  primary key (a) not enforced\n) with (\n  'connector' = 'values',\n  'changelog-mode' = 'I,UA,UB,D'\n)");
        this.util.getTableEnv().executeSql("create temporary table cdc_with_meta (\n  a int,\n  b bigint,\n  c string,\n  d boolean,\n  metadata_1 int metadata,\n  metadata_2 string metadata,\n  metadata_3 bigint metadata,\n  primary key (a) not enforced\n) with (\n  'connector' = 'values',\n  'changelog-mode' = 'I,UA,UB,D',\n  'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n)");
        this.util.getTableEnv().executeSql("create temporary table sink_with_pk (\n  a int,\n  b bigint,\n  c string,\n  primary key (a) not enforced\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false'\n)");
        this.util.getTableEnv().executeSql("create temporary table sink_without_pk (\n  a int,\n  b bigint,\n  c string\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false'\n)");
        this.util.getTableEnv().executeSql("create temporary table dim_with_pk (\n a int,\n b bigint,\n c string,\n primary key (a) not enforced\n) with (\n 'connector' = 'values'\n)");
        this.util.getTableEnv().createTemporaryFunction("ndFunc", (UserDefinedFunction)new TestNonDeterministicUdf());
    }

    @Test
    void testCdcWithMetaRenameSinkWithCompositePk() {
        this.util.getTableEnv().executeSql("create temporary table cdc_with_meta_rename (\n  a int,\n  b bigint,\n  c string,\n  d boolean,\n  metadata_3 bigint metadata,\n  e as metadata_3,\n  primary key (a) not enforced\n) with (\n  'connector' = 'values',\n  'changelog-mode' = 'I,UA,UB,D',\n  'readable-metadata' = 'metadata_3:BIGINT'\n)");
        this.util.getTableEnv().executeSql("create temporary table sink_with_composite_pk (\n  a int,\n  b bigint,\n  c string,\n  d bigint,\n  primary key (a,d) not enforced\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false'\n)");
        this.util.doVerifyPlanInsert("insert into sink_with_composite_pk\nselect a, b, c, e from cdc_with_meta_rename", new ExplainDetail[]{ExplainDetail.PLAN_ADVICE}, false, new Enumeration.Value[]{PlanKind.OPT_REL_WITH_ADVICE()});
    }

    @Test
    void testSourceWithComputedColumnSinkWithPk() {
        this.util.getTableEnv().executeSql("create temporary table cdc_with_computed_col (\n  a int,\n  b bigint,\n  c string,\n  d int,\n  `day` as DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),\n  primary key(a, c) not enforced\n) with (\n 'connector' = 'values',\n 'changelog-mode' = 'I,UA,UB,D'\n)");
        this.util.doVerifyPlanInsert("insert into sink_with_pk\nselect a, b, `day`\nfrom cdc_with_computed_col\nwhere b > 100", new ExplainDetail[]{ExplainDetail.PLAN_ADVICE}, false, new Enumeration.Value[]{PlanKind.OPT_REL_WITH_ADVICE()});
    }

    @Test
    void testCdcJoinDimWithPkNonDeterministicLocalCondition() {
        this.util.doVerifyPlanInsert("insert into sink_without_pk\nselect t1.a, t1.b, t1.c\nfrom (\n  select *, proctime() proctime from cdc\n) t1 join dim_with_pk for system_time as of t1.proctime as t2\non t1.a = t2.a and ndFunc(t2.b) > 100", new ExplainDetail[]{ExplainDetail.PLAN_ADVICE}, false, new Enumeration.Value[]{PlanKind.OPT_REL_WITH_ADVICE()});
    }

    @Test
    void testCdcWithMetaSinkWithPk() {
        this.util.doVerifyPlanInsert("insert into sink_with_pk\nselect a, metadata_3, c\nfrom cdc_with_meta", new ExplainDetail[]{ExplainDetail.PLAN_ADVICE}, false, new Enumeration.Value[]{PlanKind.OPT_REL_WITH_ADVICE()});
    }

    @Test
    void testGroupByNonDeterministicFuncWithCdcSource() {
        this.util.doVerifyPlanInsert("insert into sink_with_pk\nselect\n  a, count(*) cnt, `day`\nfrom (\n  select *, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') `day` from cdc\n) t\ngroup by `day`, a", new ExplainDetail[]{ExplainDetail.PLAN_ADVICE}, false, new Enumeration.Value[]{PlanKind.OPT_REL_WITH_ADVICE()});
    }

    @Test
    void testMultiSinkOnJoinedView() {
        this.util.getTableEnv().executeSql("create temporary table src1 (\n  a int,\n  b bigint,\n  c string,\n  d int,\n  primary key(a, c) not enforced\n) with (\n 'connector' = 'values',\n 'changelog-mode' = 'I,UA,UB,D'\n)");
        this.util.getTableEnv().executeSql("create temporary table src2 (\n  a int,\n  b bigint,\n  c string,\n  d int,\n  primary key(a, c) not enforced\n) with (\n 'connector' = 'values',\n 'changelog-mode' = 'I,UA,UB,D'\n)");
        this.util.getTableEnv().executeSql("create temporary table sink1 (\n  a int,\n  b string,\n  c bigint,\n  d bigint\n) with (\n 'connector' = 'values',\n 'sink-insert-only' = 'false'\n)");
        this.util.getTableEnv().executeSql("create temporary table sink2 (\n  a int,\n  b string,\n  c bigint,\n  d string\n) with (\n 'connector' = 'values',\n 'sink-insert-only' = 'false'\n)");
        this.util.getTableEnv().executeSql("create temporary view v1 as\nselect\n  t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\nfrom (\n  select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n  from src1\n ) t1\njoin (\n  select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n  from src2\n) t2\n on t1.a = t2.d");
        StatementSet stmtSet = this.util.getTableEnv().createStatementSet();
        stmtSet.addInsertSql("insert into sink1\n  select a, `day`, sum(b), count(distinct c)\n  from v1\n  group by a, `day`");
        stmtSet.addInsertSql("insert into sink2\n  select a, `day`, b, c\n  from v1\n  where b > 100");
        this.util.doVerifyPlan(stmtSet, new ExplainDetail[]{ExplainDetail.PLAN_ADVICE}, false, new Enumeration.Value[]{PlanKind.OPT_REL_WITH_ADVICE()}, (Function0<BoxedUnit>)((Function0)() -> BoxedUnit.UNIT), false);
    }

    @Test
    void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() {
        this.util.doVerifyPlanInsert("insert into sink_without_pk\n  select t1.a, t2.b, t1.c\n  from (\n    select *, proctime() proctime from cdc\n  ) t1 join dim_with_pk for system_time as of t1.proctime as t2\n  on t1.a = t2.a", new ExplainDetail[]{ExplainDetail.PLAN_ADVICE}, false, new Enumeration.Value[]{PlanKind.OPT_REL_WITH_ADVICE()});
    }
}

