/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class WindowAggregateJsonITCase
extends JsonPlanTestBase {
    @Parameter
    private AggregatePhaseStrategy aggPhase;

    WindowAggregateJsonITCase() {
    }

    @Parameters(name="agg_phase = {0}")
    private static Object[] parameters() {
        return new Object[][]{{AggregatePhaseStrategy.ONE_PHASE}, {AggregatePhaseStrategy.TWO_PHASE}};
    }

    @Override
    @BeforeEach
    protected void setup() throws Exception {
        super.setup();
        this.createTestValuesSourceTable("MyTable", (List<Row>)JavaScalaConversionUtil.toJava(TestData.windowDataWithTimestamp()), new String[]{"ts STRING", "`int` INT", "`double` DOUBLE", "`float` FLOAT", "`bigdec` DECIMAL(10, 2)", "`string` STRING", "`name` STRING", "`rowtime` AS TO_TIMESTAMP(`ts`)", "WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND"}, (Map<String, String>)new HashMap<String, String>(){
            {
                this.put("enable-watermark-push-down", "true");
                this.put("failing-source", "true");
            }
        });
        this.tableEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)this.aggPhase.toString());
    }

    @TestTemplate
    void testEventTimeTumbleWindow() throws Exception {
        this.createTestValuesSinkTable("MySink", "name STRING", "window_start TIMESTAMP(3)", "window_end TIMESTAMP(3)", "cnt BIGINT", "sum_int INT", "distinct_cnt BIGINT");
        this.compileSqlAndExecutePlan("insert into MySink select\n  name,\n  window_start,\n  window_end,\n  COUNT(*),\n  SUM(`int`),\n  COUNT(DISTINCT `string`)\nFROM TABLE(\n   TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\nGROUP BY name, window_start, window_end").await();
        List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
        this.assertResult(Arrays.asList("+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]", "+I[a, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 1, 3, 1]", "+I[b, 2020-10-10T00:00:05, 2020-10-10T00:00:10, 2, 9, 2]", "+I[b, 2020-10-10T00:00:15, 2020-10-10T00:00:20, 1, 4, 1]", "+I[b, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 1, 1]", "+I[null, 2020-10-10T00:00:30, 2020-10-10T00:00:35, 1, 7, 0]"), result);
    }

    @TestTemplate
    void testEventTimeHopWindow() throws Exception {
        this.createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
        this.compileSqlAndExecutePlan("insert into MySink select\n  name,\n  COUNT(*)\nFROM TABLE(\n   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\nGROUP BY name, window_start, window_end").await();
        List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
        this.assertResult(Arrays.asList("+I[a, 1]", "+I[a, 4]", "+I[a, 6]", "+I[b, 1]", "+I[b, 1]", "+I[b, 1]", "+I[b, 1]", "+I[b, 2]", "+I[b, 2]", "+I[null, 1]", "+I[null, 1]"), result);
    }

    @TestTemplate
    void testEventTimeCumulateWindow() throws Exception {
        this.createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
        this.compileSqlAndExecutePlan("insert into MySink select\n  name,\n  COUNT(*)\nFROM TABLE(\n  CUMULATE(\n     TABLE MyTable,\n     DESCRIPTOR(rowtime),\n     INTERVAL '5' SECOND,\n     INTERVAL '15' SECOND))GROUP BY name, window_start, window_end").await();
        List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
        this.assertResult(Arrays.asList("+I[a, 4]", "+I[a, 6]", "+I[a, 6]", "+I[b, 1]", "+I[b, 1]", "+I[b, 1]", "+I[b, 1]", "+I[b, 1]", "+I[b, 1]", "+I[b, 2]", "+I[b, 2]", "+I[null, 1]", "+I[null, 1]", "+I[null, 1]"), result);
    }

    @TestTemplate
    void testDistinctSplitEnabled() throws Exception {
        this.tableEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, (Object)true);
        this.createTestValuesSinkTable("MySink", "name STRING", "max_double DOUBLE", "cnt_distinct_int BIGINT");
        this.compileSqlAndExecutePlan("insert into MySink select name,    max(`double`),\n   count(distinct `int`) FROM TABLE (  CUMULATE(\n     TABLE MyTable,\n     DESCRIPTOR(rowtime),\n     INTERVAL '5' SECOND,\n     INTERVAL '15' SECOND))GROUP BY name, window_start, window_end").await();
        List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
        this.assertResult(Arrays.asList("+I[a, 5.0, 3]", "+I[a, 5.0, 4]", "+I[a, 5.0, 4]", "+I[b, 3.0, 1]", "+I[b, 3.0, 1]", "+I[b, 3.0, 1]", "+I[b, 4.0, 1]", "+I[b, 4.0, 1]", "+I[b, 4.0, 1]", "+I[b, 6.0, 2]", "+I[b, 6.0, 2]", "+I[null, 7.0, 1]", "+I[null, 7.0, 1]", "+I[null, 7.0, 1]"), result);
    }

    @TestTemplate
    public void testEventTimeSessionWindow() throws Exception {
        this.createTestValuesSinkTable("MySink", "name STRING", "cnt BIGINT");
        this.compileSqlAndExecutePlan("insert into MySink select\n  name,\n  COUNT(*)\nFROM TABLE(\n  SESSION(\n     TABLE MyTable PARTITION BY name,\n     DESCRIPTOR(rowtime),\n     INTERVAL '5' SECOND))GROUP BY name, window_start, window_end").await();
        List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
        this.assertResult(Arrays.asList("+I[a, 6]", "+I[b, 1]", "+I[b, 1]", "+I[b, 2]", "+I[null, 1]"), result);
    }
}

