/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class PythonOverAggregateJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        String srcTableDdl = "CREATE TABLE MyTable (\n  a int,\n  b varchar,\n  c int not null,\n  rowtime timestamp(3),\n  proctime as PROCTIME(),\n  watermark for rowtime as rowtime) with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        this.tEnv.executeSql(srcTableDdl);
        this.tEnv.createTemporarySystemFunction("pyFunc", (UserDefinedFunction)new JavaUserDefinedAggFunctions.PandasAggregateFunction());
    }

    @Test
    public void testProcTimeBoundedPartitionedRangeOver() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a bigint,\n  b bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        String sql = "insert into MySink SELECT a,\n    pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime\n        RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW)\nFROM MyTable";
        this.util.verifyJsonPlan(sql);
    }

    @Test
    public void testProcTimeBoundedNonPartitionedRangeOver() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a bigint,\n  b bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        String sql = "insert into MySink SELECT a,\n    pyFunc(c, c) OVER (ORDER BY proctime\n        RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)\n FROM MyTable";
        this.util.verifyJsonPlan(sql);
    }

    @Test
    public void testProcTimeUnboundedPartitionedRangeOver() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a bigint,\n  b bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        String sql = "insert into MySink SELECT a,\n    pyFunc(c, c) OVER (PARTITION BY a ORDER BY proctime RANGE UNBOUNDED PRECEDING)\nFROM MyTable";
        this.util.verifyJsonPlan(sql);
    }

    @Test
    public void testRowTimeBoundedPartitionedRowsOver() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a bigint,\n  b bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        String sql = "insert into MySink SELECT a,\n    pyFunc(c, c) OVER (PARTITION BY a ORDER BY rowtime\n        ROWS BETWEEN 5 preceding AND CURRENT ROW)\nFROM MyTable";
        this.util.verifyJsonPlan(sql);
    }

    @Test
    public void testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a bigint,\n  b bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        String sql = "insert into MySink SELECT a,   pyFunc(c, c) OVER (    PARTITION BY a ORDER BY proctime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) FROM MyTable";
        this.util.verifyJsonPlan(sql);
    }
}

