/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
import org.junit.Test;

public class OverAggregateJsonPlanITCase
extends JsonPlanTestBase {
    @Test
    public void testProcTimeBoundedPartitionedRowsOver() throws ExecutionException, InterruptedException, IOException {
        this.createTestValuesSourceTable("MyTable", (List<Row>)JavaScalaConversionUtil.toJava(TestData.data5()), "a int", "b bigint", "c int", "d string", "e bigint", "proctime as PROCTIME()");
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "a bigint", "b bigint", "c bigint");
        String sql = "insert into MySink SELECT a,   SUM(c) OVER (    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW),   MIN(c) OVER (    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) FROM MyTable";
        this.compileSqlAndExecutePlan(sql).await();
        List<String> expected = Arrays.asList("+I[1, 0, 0]", "+I[2, 1, 1]", "+I[2, 3, 1]", "+I[3, 3, 3]", "+I[3, 7, 3]", "+I[3, 12, 3]", "+I[4, 6, 6]", "+I[4, 13, 6]", "+I[4, 21, 6]", "+I[4, 30, 6]", "+I[5, 10, 10]", "+I[5, 21, 10]", "+I[5, 33, 10]", "+I[5, 46, 10]", "+I[5, 60, 10]");
        this.assertResult(expected, TestValuesTableFactory.getResults("MySink"));
    }

    @Test
    public void testProcTimeUnboundedNonPartitionedRangeOver() throws IOException, ExecutionException, InterruptedException {
        List<Row> data = Arrays.asList(Row.of((Object[])new Object[]{1L, 1, "Hello"}), Row.of((Object[])new Object[]{2L, 2, "Hello"}), Row.of((Object[])new Object[]{3L, 3, "Hello"}), Row.of((Object[])new Object[]{4L, 4, "Hello"}), Row.of((Object[])new Object[]{5L, 5, "Hello"}), Row.of((Object[])new Object[]{6L, 6, "Hello"}), Row.of((Object[])new Object[]{7L, 7, "Hello World"}), Row.of((Object[])new Object[]{8L, 8, "Hello World"}), Row.of((Object[])new Object[]{20L, 20, "Hello World"}));
        this.createTestValuesSourceTable("MyTable", data, "a bigint", "b int", "c string", "proctime as PROCTIME()");
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "a string", "b int", "c string");
        String sql = "insert into MySink SELECT c, sum1, maxnull\nFROM (\n SELECT c,\n  max(cast(null as varchar)) OVER\n   (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)\n   as maxnull,\n  sum(1) OVER\n   (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)\n   as sum1\n FROM MyTable\n)";
        this.compileSqlAndExecutePlan(sql).await();
        List<String> expected = Arrays.asList("+I[Hello World, 1, null]", "+I[Hello World, 2, null]", "+I[Hello World, 3, null]", "+I[Hello, 1, null]", "+I[Hello, 2, null]", "+I[Hello, 3, null]", "+I[Hello, 4, null]", "+I[Hello, 5, null]", "+I[Hello, 6, null]");
        this.assertResult(expected, TestValuesTableFactory.getResults("MySink"));
    }

    @Test
    public void testRowTimeBoundedPartitionedRangeOver() throws IOException, ExecutionException, InterruptedException {
        List<Row> data = Arrays.asList(Row.of((Object[])new Object[]{10L, 1L, 1, "Hello"}), Row.of((Object[])new Object[]{15L, 1L, 15, "Hello"}), Row.of((Object[])new Object[]{16L, 1L, 16, "Hello"}), Row.of((Object[])new Object[]{20L, 2L, 2, "Hello"}), Row.of((Object[])new Object[]{20L, 2L, 2, "Hello"}), Row.of((Object[])new Object[]{20L, 2L, 3, "Hello"}), Row.of((Object[])new Object[]{30L, 3L, 3, "Hello"}), Row.of((Object[])new Object[]{40L, 4L, 4, "Hello"}), Row.of((Object[])new Object[]{50L, 5L, 5, "Hello"}), Row.of((Object[])new Object[]{60L, 6L, 6, "Hello"}), Row.of((Object[])new Object[]{65L, 6L, 65, "Hello"}), Row.of((Object[])new Object[]{90L, 6L, 9, "Hello"}), Row.of((Object[])new Object[]{95L, 6L, 18, "Hello"}), Row.of((Object[])new Object[]{90L, 6L, 9, "Hello"}), Row.of((Object[])new Object[]{100L, 7L, 7, "Hello World"}), Row.of((Object[])new Object[]{110L, 7L, 17, "Hello World"}), Row.of((Object[])new Object[]{110L, 7L, 77, "Hello World"}), Row.of((Object[])new Object[]{140L, 7L, 18, "Hello World"}), Row.of((Object[])new Object[]{150L, 8L, 8, "Hello World"}), Row.of((Object[])new Object[]{200L, 20L, 20, "Hello World"}));
        this.createTestValuesSourceTable("MyTable", data, "ts bigint", "a bigint", "b int", "c string", "rowtime as TO_TIMESTAMP(FROM_UNIXTIME(ts))", "watermark for rowtime as rowtime - INTERVAL '10' second");
        this.tableEnv.createTemporaryFunction("LTCNT", (UserDefinedFunction)new JavaUserDefinedAggFunctions.LargerThanCount());
        String sql = "insert into MySink SELECT   c, b,  LTCNT(a, CAST('4' AS BIGINT)) OVER (PARTITION BY c ORDER BY rowtime RANGE     BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW),   COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE     BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW),   SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE     BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) FROM MyTable";
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "a string", "b int", "c bigint", "d bigint", "e bigint");
        this.compileSqlAndExecutePlan(sql).await();
        List<String> expected = Arrays.asList("+I[Hello, 1, 0, 1, 1]", "+I[Hello, 15, 0, 2, 2]", "+I[Hello, 16, 0, 3, 3]", "+I[Hello, 2, 0, 6, 9]", "+I[Hello, 3, 0, 6, 9]", "+I[Hello, 2, 0, 6, 9]", "+I[Hello, 3, 0, 4, 9]", "+I[Hello, 4, 0, 2, 7]", "+I[Hello, 5, 1, 2, 9]", "+I[Hello, 6, 2, 2, 11]", "+I[Hello, 65, 2, 2, 12]", "+I[Hello, 9, 2, 2, 12]", "+I[Hello, 9, 2, 2, 12]", "+I[Hello, 18, 3, 3, 18]", "+I[Hello World, 17, 3, 3, 21]", "+I[Hello World, 7, 1, 1, 7]", "+I[Hello World, 77, 3, 3, 21]", "+I[Hello World, 18, 1, 1, 7]", "+I[Hello World, 8, 2, 2, 15]", "+I[Hello World, 20, 1, 1, 20]");
        this.assertResult(expected, TestValuesTableFactory.getResults("MySink"));
    }
}

