/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class WindowJoinJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        String srcTable1Ddl = "CREATE TABLE MyTable (\n a INT,\n b BIGINT,\n c VARCHAR,\n `rowtime` AS TO_TIMESTAMP(c),\n proctime as PROCTIME(),\n WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(srcTable1Ddl);
        String srcTable2Ddl = "CREATE TABLE MyTable2 (\n a INT,\n b BIGINT,\n c VARCHAR,\n `rowtime` AS TO_TIMESTAMP(c),\n proctime as PROCTIME(),\n WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(srcTable2Ddl);
    }

    @Test
    public void testEventTimeTumbleWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n l_a INT,\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n l_cnt BIGINT,\n l_uv BIGINT,\n r_a INT,\n r_cnt BIGINT,\n r_uv BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  L.a,\n  L.window_start,\n  L.window_end,\n  L.cnt,\n  L.uv,\n  R.a,\n  R.cnt,\n  R.uv\nFROM (\n  SELECT\n    a,\n    window_start,\n    window_end,\n    count(*) as cnt,\n    count(distinct c) AS uv\n  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n  GROUP BY a, window_start, window_end, window_time\n) L\nJOIN (\n  SELECT\n    a,\n    window_start,\n    window_end,\n    count(*) as cnt,\n    count(distinct c) AS uv\n  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n  GROUP BY a, window_start, window_end, window_time\n) R\nON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a");
    }
}

