/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder$;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001Y4A!\u0001\u0002\u0001+\tQ2\t[1oO\u0016dwnZ'pI\u0016LeNZ3sK:\u001cW\rV3ti*\u00111\u0001B\u0001\u0007gR\u0014X-Y7\u000b\u0005\u00151\u0011\u0001\u00039isNL7-\u00197\u000b\u0005\u001dA\u0011!\u0002:vY\u0016\u001c(BA\u0005\u000b\u0003\u0011\u0001H.\u00198\u000b\u0005-a\u0011a\u00029mC:tWM\u001d\u0006\u0003\u001b9\tQ\u0001^1cY\u0016T!a\u0004\t\u0002\u000b\u0019d\u0017N\\6\u000b\u0005E\u0011\u0012AB1qC\u000eDWMC\u0001\u0014\u0003\ry'oZ\u0002\u0001'\t\u0001a\u0003\u0005\u0002\u001855\t\u0001D\u0003\u0002\u001a\u0015\u0005)Q\u000f^5mg&\u00111\u0004\u0007\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\t\u000bu\u0001A\u0011\u0001\u0010\u0002\rqJg.\u001b;?)\u0005y\u0002C\u0001\u0011\u0001\u001b\u0005\u0011\u0001b\u0002\u0012\u0001\u0005\u0004%IaI\u0001\u0005kRLG.F\u0001%!\t9R%\u0003\u0002'1\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\"1\u0001\u0006\u0001Q\u0001\n\u0011\nQ!\u001e;jY\u0002BQA\u000b\u0001\u0005\u0002-\naAY3g_J,G#\u0001\u0017\u0011\u00055\u0002T\"\u0001\u0018\u000b\u0003=\nQa]2bY\u0006L!!\r\u0018\u0003\tUs\u0017\u000e\u001e\u0015\u0003SM\u0002\"\u0001N\u001c\u000e\u0003UR!A\u000e\n\u0002\u000b),h.\u001b;\n\u0005a*$A\u0002\"fM>\u0014X\rC\u0003;\u0001\u0011\u00051&\u0001\u0006uKN$8+\u001a7fGRD#!\u000f\u001f\u0011\u0005Qj\u0014B\u0001 6\u0005\u0011!Vm\u001d;\t\u000b\u0001\u0003A\u0011A\u0016\u0002'Q,7\u000f^(oK2+g/\u001a7He>,\bOQ=)\u0005}b\u0004\"B\"\u0001\t\u0003Y\u0013!\t;fgR$vo\u001c'fm\u0016dwI]8va\nKHj\\2bY\u001ecwNY1m\u001f\u001a4\u0007F\u0001\"=\u0011\u00151\u0005\u0001\"\u0001,\u0003\u0001\"Xm\u001d;Uo>dUM^3m\u000fJ|W\u000f\u001d\"z\u0019>\u001c\u0017\r\\$m_\n\fGn\u00148)\u0005\u0015c\u0004\"B%\u0001\t\u0003Y\u0013a\t;fgR$V-\u001c9pe\u0006d'j\\5o/&$\b\u000eR3ekBd\u0017nY1uKZKWm\u001e\u0015\u0003\u0011rBQ\u0001\u0014\u0001\u0005\u0002-\nQ\u0004^3tiR+W\u000e]8sC2Tu.\u001b8XSRD7\t[1oO\u0016dwn\u001a\u0015\u0003\u0017rBQa\u0014\u0001\u0005\u0002-\n!\u0006^3tiR+W\u000e]8sC2Tu.\u001b8XSRDgj\u001c8FcV\fGnQ8oI&$\u0018n\u001c8P].+\u0017\u0010\u000b\u0002Oy!)!\u000b\u0001C\u0001W\u00059C/Z:u)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\",\u0015/^1m\u0007>tG-\u001b;j_:|enS3zQ\t\tF\bC\u0003V\u0001\u0011\u00051&A\u0013uKN$H+Z7q_J\fGNS8j]^KG\u000f\u001b(p]\u0016\u000bX/\u00197D_:$\u0017\u000e^5p]\"\u0012A\u000b\u0010\u0005\u00061\u0002!\taK\u0001\u0015i\u0016\u001cHo\u0012:pkB\u0014\u0015pV5uQVs\u0017n\u001c8)\u0005]c\u0004\"B.\u0001\t\u0003Y\u0013!\u000b;fgR\u0004&o\u001c9bO\u0006$X-\u00169eCR,7*\u001b8e\u00036|gn\u001a*fY:{G-\u001a\"m_\u000e\\7\u000f\u000b\u0002[y!)a\f\u0001C\u0001?\u0006aC/Z:u\u000b2LW.\u001b8bi\u0016\u001c\u0005.\u00198hK2|wMT8s[\u0006d\u0017N_3e\u001f:,\u0006o]3siNKgn[\u000b\u0002Y!\u0012Q\f\u0010\u0005\u0006E\u0002!\taX\u0001+i\u0016\u001cHoS3fa\u000eC\u0017M\\4fY><gj\u001c:nC2L'0\u001a3P]:{g.\u00169tKJ$8+\u001b8lQ\t\tG\bC\u0003f\u0001\u0011\u00051&\u0001\u0017uKN$X\t\\5nS:\fG/Z\"iC:<W\r\\8h\u001d>\u0014X.\u00197ju\u0016$wJ\\+qg\u0016\u0014HOS8j]\"\u0012A\r\u0010\u0005\u0006Q\u0002!\taK\u0001+i\u0016\u001cHoS3fa\u000eC\u0017M\\4fY><gj\u001c:nC2L'0\u001a3P]:{g.\u00169tKJ$(j\\5oQ\t9G\bC\u0003l\u0001\u0011%A.\u0001\u001avaN,'\u000f^'b]\u0006<W\r\u001a+bE2,w+\u001b;i\u0007\"\fgnZ3m_\u001etuN]7bY&TX\rV3ti>s7+\u001b8l)\taS\u000eC\u0003oU\u0002\u0007q.\u0001\u0005jgV\u00038/\u001a:u!\ti\u0003/\u0003\u0002r]\t9!i\\8mK\u0006t\u0007\"B:\u0001\t\u0013!\u0018AM;qg\u0016\u0014H/T1oC\u001e,G\rV1cY\u0016<\u0016\u000e\u001e5DQ\u0006tw-\u001a7pO:{'/\\1mSj,G+Z:u\u001f:Tu.\u001b8\u0015\u00051*\b\"\u00028s\u0001\u0004y\u0007")
public class ChangelogModeInferenceTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Before
    public void before() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyTable (\n                    | word STRING,\n                    | number INT\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE Orders (\n                    | amount INT,\n                    | currency STRING,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE ratesHistory (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    |  'connector' = 'COLLECTION',\n                    |  'is-bounded' = 'false',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        this.util().addTable(" CREATE VIEW DeduplicatedView AS SELECT currency, rate, rowtime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime DESC) AS rowNum    FROM ratesHistory  ) T   WHERE rowNum = 1");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE ratesChangelogStream (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime as rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE upsert_managed_table (\n                    | id INT,\n                    | col1 INT,\n                    | col2 STRING,\n                    | PRIMARY KEY(id) NOT ENFORCED\n                    |) WITH (\n                    |  'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE upsert_sink_table (\n                    | id INT,\n                    | col1 INT,\n                    | col2 STRING,\n                    | PRIMARY KEY(id) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'sink-changelog-mode-enforced' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE all_change_sink_table (\n                    | id INT,\n                    | col1 INT,\n                    | col2 STRING,\n                    | PRIMARY KEY(id) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'sink-changelog-mode-enforced' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
    }

    @Test
    public void testSelect() {
        this.util().verifyRelPlan("SELECT word, number FROM MyTable", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testOneLevelGroupBy() {
        this.util().verifyRelPlan("SELECT COUNT(number) FROM MyTable GROUP BY word", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTwoLevelGroupByLocalGlobalOff() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |  SELECT word, COUNT(number) as cnt FROM MyTable GROUP BY word\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTwoLevelGroupByLocalGlobalOn() {
        this.util().enableMiniBatch();
        this.util().tableEnv().getConfig().setIdleStateRetentionTime(Time.hours((long)1L), Time.hours((long)2L));
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.TWO_PHASE.toString());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |  SELECT word, COUNT(number) as cnt FROM MyTable GROUP BY word\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithDeduplicateView() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN DeduplicatedView FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithChangelog() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithNonEqualConditionOnKey() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency and o.currency < 5\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithEqualConditionOnKey() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency and o.currency = 5\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithNonEqualCondition() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency and o.amount > 5 and r.rate > 100\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testGroupByWithUnion() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyTable2 (\n                    | word STRING,\n                    | cnt INT\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |   SELECT word, COUNT(number) AS cnt FROM MyTable GROUP BY word\n        |   UNION ALL\n        |   SELECT word, cnt FROM MyTable2\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPropagateUpdateKindAmongRelNodeBlocks() {
        this.util().tableEnv().getConfig().set(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |create table sink1 (\n                    |  a INT,\n                    |  b VARCHAR\n                    |) with (\n                    |  'connector' = 'values',\n                    |  'sink-insert-only' = 'false'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |create table sink2 (\n                    |  a INT,\n                    |  b VARCHAR,\n                    |  primary key (b) not enforced\n                    |) with (\n                    |  'connector' = 'values',\n                    |  'sink-insert-only' = 'false'\n                    |)\n                    |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v1 AS\n                               |SELECT\n                               |  SUM(number) AS number, word\n                               |FROM MyTable\n                               |GROUP BY word\n                               |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v2 AS\n                               |SELECT number + 1 AS number, word FROM v1\n                               |UNION ALL\n                               |SELECT number - 1 AS number, word FROM v1\n                               |")).stripMargin());
        StatementSet statementSet = this.util().tableEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word > 'a'\n                                |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word < 'a'\n                                |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |INSERT INTO sink2 SELECT * FROM v1\n                                |")).stripMargin());
        this.util().verifyRelPlan(statementSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testEliminateChangelogNormalizedOnUpsertSink() {
        this.upsertManagedTableWithChangelogNormalizeTestOnSink(true);
    }

    @Test
    public void testKeepChangelogNormalizedOnNonUpsertSink() {
        this.upsertManagedTableWithChangelogNormalizeTestOnSink(false);
    }

    @Test
    public void testEliminateChangelogNormalizedOnUpsertJoin() {
        this.upsertManagedTableWithChangelogNormalizeTestOnJoin(true);
    }

    @Test
    public void testKeepChangelogNormalizedOnNonUpsertJoin() {
        this.upsertManagedTableWithChangelogNormalizeTestOnJoin(false);
    }

    private void upsertManagedTableWithChangelogNormalizeTestOnSink(boolean isUpsert) {
        String sinkTableName = isUpsert ? "upsert_sink_table" : "all_change_sink_table";
        String sql = new StringBuilder(47).append("INSERT INTO ").append(sinkTableName).append(" SELECT * FROM upsert_managed_table").toString();
        this.util().verifyRelPlanInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    private void upsertManagedTableWithChangelogNormalizeTestOnJoin(boolean isUpsert) {
        String sinkTableName = isUpsert ? "upsert_sink_table" : "all_change_sink_table";
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(150).append("\n                 |INSERT INTO ").append(sinkTableName).append(" SELECT a.* FROM upsert_managed_table a\n                 |join upsert_managed_table b on a.id = b.id\n                 |").toString())).stripMargin();
        this.util().verifyRelPlanInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }
}

