/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import java.time.Duration;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder$;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001i4A!\u0001\u0002\u0001+\tQ2\t[1oO\u0016dwnZ'pI\u0016LeNZ3sK:\u001cW\rV3ti*\u00111\u0001B\u0001\u0007gR\u0014X-Y7\u000b\u0005\u00151\u0011\u0001\u00039isNL7-\u00197\u000b\u0005\u001dA\u0011!\u0002:vY\u0016\u001c(BA\u0005\u000b\u0003\u0011\u0001H.\u00198\u000b\u0005-a\u0011a\u00029mC:tWM\u001d\u0006\u0003\u001b9\tQ\u0001^1cY\u0016T!a\u0004\t\u0002\u000b\u0019d\u0017N\\6\u000b\u0005E\u0011\u0012AB1qC\u000eDWMC\u0001\u0014\u0003\ry'oZ\u0002\u0001'\t\u0001a\u0003\u0005\u0002\u001855\t\u0001D\u0003\u0002\u001a\u0015\u0005)Q\u000f^5mg&\u00111\u0004\u0007\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\t\u000bu\u0001A\u0011\u0001\u0010\u0002\rqJg.\u001b;?)\u0005y\u0002C\u0001\u0011\u0001\u001b\u0005\u0011\u0001b\u0002\u0012\u0001\u0005\u0004%IaI\u0001\u0005kRLG.F\u0001%!\t9R%\u0003\u0002'1\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\"1\u0001\u0006\u0001Q\u0001\n\u0011\nQ!\u001e;jY\u0002BQA\u000b\u0001\u0005\u0002-\naAY3g_J,G#\u0001\u0017\u0011\u00055\u0002T\"\u0001\u0018\u000b\u0003=\nQa]2bY\u0006L!!\r\u0018\u0003\tUs\u0017\u000e\u001e\u0015\u0003SM\u0002\"\u0001N\u001e\u000e\u0003UR!AN\u001c\u0002\u0007\u0005\u0004\u0018N\u0003\u00029s\u00059!.\u001e9ji\u0016\u0014(B\u0001\u001e\u0013\u0003\u0015QWO\\5u\u0013\taTG\u0001\u0006CK\u001a|'/Z#bG\"DQA\u0010\u0001\u0005\u0002-\n!\u0002^3tiN+G.Z2uQ\ti\u0004\t\u0005\u00025\u0003&\u0011!)\u000e\u0002\u0005)\u0016\u001cH\u000fC\u0003E\u0001\u0011\u00051&A\nuKN$xJ\\3MKZ,Gn\u0012:pkB\u0014\u0015\u0010\u000b\u0002D\u0001\")q\t\u0001C\u0001W\u0005\tC/Z:u)^|G*\u001a<fY\u001e\u0013x.\u001e9Cs2{7-\u00197HY>\u0014\u0017\r\\(gM\"\u0012a\t\u0011\u0005\u0006\u0015\u0002!\taK\u0001!i\u0016\u001cH\u000fV<p\u0019\u00164X\r\\$s_V\u0004()\u001f'pG\u0006dw\t\\8cC2|e\u000e\u000b\u0002J\u0001\")Q\n\u0001C\u0001W\u0005\u0019C/Z:u)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"$U\rZ;qY&\u001c\u0017\r^3WS\u0016<\bF\u0001'A\u0011\u0015\u0001\u0006\u0001\"\u0001,\u0003u!Xm\u001d;UK6\u0004xN]1m\u0015>LgnV5uQ\u000eC\u0017M\\4fY><\u0007FA(A\u0011\u0015\u0019\u0006\u0001\"\u0001,\u0003)\"Xm\u001d;UK6\u0004xN]1m\u0015>LgnV5uQ:{g.R9vC2\u001cuN\u001c3ji&|gn\u00148LKfD#A\u0015!\t\u000bY\u0003A\u0011A\u0016\u0002OQ,7\u000f\u001e+f[B|'/\u00197K_&tw+\u001b;i\u000bF,\u0018\r\\\"p]\u0012LG/[8o\u001f:\\U-\u001f\u0015\u0003+\u0002CQ!\u0017\u0001\u0005\u0002-\nQ\u0005^3tiR+W\u000e]8sC2Tu.\u001b8XSRDgj\u001c8FcV\fGnQ8oI&$\u0018n\u001c8)\u0005a\u0003\u0005\"\u0002/\u0001\t\u0003Y\u0013\u0001\u0006;fgR<%o\\;q\u0005f<\u0016\u000e\u001e5V]&|g\u000e\u000b\u0002\\\u0001\")q\f\u0001C\u0001W\u0005IC/Z:u!J|\u0007/Y4bi\u0016,\u0006\u000fZ1uK.Kg\u000eZ!n_:<'+\u001a7O_\u0012,'\t\\8dWND#A\u0018!\t\u000b\t\u0004A\u0011A2\u0002YQ,7\u000f^#mS6Lg.\u0019;f\u0007\"\fgnZ3m_\u001etuN]7bY&TX\rZ(o+B\u001cXM\u001d;TS:\\W#\u0001\u0017)\u0005\u0005\u0004\u0005\"\u00024\u0001\t\u0003\u0019\u0017A\u000b;fgR\\U-\u001a9DQ\u0006tw-\u001a7pO:{'/\\1mSj,Gm\u00148O_:,\u0006o]3siNKgn\u001b\u0015\u0003K\u0002CQ!\u001b\u0001\u0005\u0002-\nA\u0006^3ti\u0016c\u0017.\\5oCR,7\t[1oO\u0016dwn\u001a(pe6\fG.\u001b>fI>sW\u000b]:feRTu.\u001b8)\u0005!\u0004\u0005\"\u00027\u0001\t\u0003Y\u0013A\u000b;fgR\\U-\u001a9DQ\u0006tw-\u001a7pO:{'/\\1mSj,Gm\u00148O_:,\u0006o]3si*{\u0017N\u001c\u0015\u0003W\u0002CQa\u001c\u0001\u0005\nA\f!'\u001e9tKJ$X*\u00198bO\u0016$G+\u00192mK^KG\u000f[\"iC:<W\r\\8h\u001d>\u0014X.\u00197ju\u0016$Vm\u001d;P]NKgn\u001b\u000b\u0003YEDQA\u001d8A\u0002M\f\u0001\"[:VaN,'\u000f\u001e\t\u0003[QL!!\u001e\u0018\u0003\u000f\t{w\u000e\\3b]\")q\u000f\u0001C\u0005q\u0006\u0011T\u000f]:feRl\u0015M\\1hK\u0012$\u0016M\u00197f/&$\bn\u00115b]\u001e,Gn\\4O_Jl\u0017\r\\5{KR+7\u000f^(o\u0015>Lg\u000e\u0006\u0002-s\")!O\u001ea\u0001g\u0002")
public class ChangelogModeInferenceTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @BeforeEach
    public void before() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyTable (\n                    | word STRING,\n                    | number INT\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE Orders (\n                    | amount INT,\n                    | currency STRING,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE ratesHistory (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    |  'connector' = 'COLLECTION',\n                    |  'is-bounded' = 'false',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        this.util().addTable(" CREATE VIEW DeduplicatedView AS SELECT currency, rate, rowtime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime DESC) AS rowNum    FROM ratesHistory  ) T   WHERE rowNum = 1");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE ratesChangelogStream (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime as rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE upsert_managed_table (\n                    | id INT,\n                    | col1 INT,\n                    | col2 STRING,\n                    | PRIMARY KEY(id) NOT ENFORCED\n                    |) WITH (\n                    |  'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE upsert_sink_table (\n                    | id INT,\n                    | col1 INT,\n                    | col2 STRING,\n                    | PRIMARY KEY(id) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'sink-changelog-mode-enforced' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE all_change_sink_table (\n                    | id INT,\n                    | col1 INT,\n                    | col2 STRING,\n                    | PRIMARY KEY(id) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'sink-changelog-mode-enforced' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
    }

    @Test
    public void testSelect() {
        this.util().verifyRelPlan("SELECT word, number FROM MyTable", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testOneLevelGroupBy() {
        this.util().verifyRelPlan("SELECT COUNT(number) FROM MyTable GROUP BY word", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTwoLevelGroupByLocalGlobalOff() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |  SELECT word, COUNT(number) as cnt FROM MyTable GROUP BY word\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTwoLevelGroupByLocalGlobalOn() {
        this.util().enableMiniBatch();
        this.util().tableEnv().getConfig().setIdleStateRetention(Duration.ofHours(1L));
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.TWO_PHASE.toString());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |  SELECT word, COUNT(number) as cnt FROM MyTable GROUP BY word\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithDeduplicateView() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN DeduplicatedView FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithChangelog() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithNonEqualConditionOnKey() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency and o.currency < 5\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithEqualConditionOnKey() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency and o.currency = 5\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinWithNonEqualCondition() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM Orders AS o\n        | JOIN ratesChangelogStream FOR SYSTEM_TIME AS OF o.rowtime AS r\n        | ON o.currency = r.currency and o.amount > 5 and r.rate > 100\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testGroupByWithUnion() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyTable2 (\n                    | word STRING,\n                    | cnt INT\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT cnt, COUNT(cnt) AS frequency FROM (\n        |   SELECT word, COUNT(number) AS cnt FROM MyTable GROUP BY word\n        |   UNION ALL\n        |   SELECT word, cnt FROM MyTable2\n        |) GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPropagateUpdateKindAmongRelNodeBlocks() {
        this.util().tableEnv().getConfig().set(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |create table sink1 (\n                    |  a INT,\n                    |  b VARCHAR\n                    |) with (\n                    |  'connector' = 'values',\n                    |  'sink-insert-only' = 'false'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |create table sink2 (\n                    |  a INT,\n                    |  b VARCHAR,\n                    |  primary key (b) not enforced\n                    |) with (\n                    |  'connector' = 'values',\n                    |  'sink-insert-only' = 'false'\n                    |)\n                    |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v1 AS\n                               |SELECT\n                               |  SUM(number) AS number, word\n                               |FROM MyTable\n                               |GROUP BY word\n                               |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v2 AS\n                               |SELECT number + 1 AS number, word FROM v1\n                               |UNION ALL\n                               |SELECT number - 1 AS number, word FROM v1\n                               |")).stripMargin());
        StatementSet statementSet = this.util().tableEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word > 'a'\n                                |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |INSERT INTO sink1 SELECT number, word FROM v2 WHERE word < 'a'\n                                |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |INSERT INTO sink2 SELECT * FROM v1\n                                |")).stripMargin());
        this.util().verifyRelPlan(statementSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testEliminateChangelogNormalizedOnUpsertSink() {
        this.upsertManagedTableWithChangelogNormalizeTestOnSink(true);
    }

    @Test
    public void testKeepChangelogNormalizedOnNonUpsertSink() {
        this.upsertManagedTableWithChangelogNormalizeTestOnSink(false);
    }

    @Test
    public void testEliminateChangelogNormalizedOnUpsertJoin() {
        this.upsertManagedTableWithChangelogNormalizeTestOnJoin(true);
    }

    @Test
    public void testKeepChangelogNormalizedOnNonUpsertJoin() {
        this.upsertManagedTableWithChangelogNormalizeTestOnJoin(false);
    }

    private void upsertManagedTableWithChangelogNormalizeTestOnSink(boolean isUpsert) {
        String sinkTableName = isUpsert ? "upsert_sink_table" : "all_change_sink_table";
        String sql = new StringBuilder(47).append("INSERT INTO ").append(sinkTableName).append(" SELECT * FROM upsert_managed_table").toString();
        this.util().verifyRelPlanInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    private void upsertManagedTableWithChangelogNormalizeTestOnJoin(boolean isUpsert) {
        String sinkTableName = isUpsert ? "upsert_sink_table" : "all_change_sink_table";
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(150).append("\n                 |INSERT INTO ").append(sinkTableName).append(" SELECT a.* FROM upsert_managed_table a\n                 |join upsert_managed_table b on a.id = b.id\n                 |").toString())).stripMargin();
        this.util().verifyRelPlanInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }
}

