/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class GroupAggregateJsonPlanITCase
extends JsonPlanTestBase {
    @Parameterized.Parameter
    public boolean isMiniBatchEnabled;

    @Parameterized.Parameters(name="isMiniBatchEnabled={0}")
    public static List<Boolean> testData() {
        return Arrays.asList(true, false);
    }

    @Override
    @Before
    public void setup() throws Exception {
        super.setup();
        if (this.isMiniBatchEnabled) {
            this.tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)true).set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(10L)).set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, (Object)5L);
        } else {
            this.tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)false);
        }
    }

    @Test
    public void testSimpleAggCallsWithGroupBy() throws Exception {
        this.createTestValuesSourceTable("MyTable", (List<Row>)JavaScalaConversionUtil.toJava(TestData.smallData3()), "a int", "b bigint", "c varchar");
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "b bigint", "cnt bigint", "avg_a double", "min_c varchar", "primary key (b) not enforced");
        this.compileSqlAndExecutePlan("insert into MySink select b, count(*) as cnt, avg(a) filter (where a > 1) as avg_a, min(c) as min_c from MyTable group by b").await();
        List<String> result = TestValuesTableFactory.getResults("MySink");
        this.assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, Hello]"), result);
    }

    @Test
    public void testDistinctAggCalls() throws Exception {
        this.createTestValuesSourceTable("MyTable", (List<Row>)JavaScalaConversionUtil.toJava(TestData.data2()), "a int", "b bigint", "c int", "d varchar", "e bigint");
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "e bigint", "cnt_a1 bigint", "cnt_a2 bigint", "sum_a bigint", "sum_b bigint", "avg_b double", "cnt_d bigint", "primary key (e) not enforced");
        this.compileSqlAndExecutePlan("insert into MySink select e, count(distinct a) filter (where b > 10) as cnt_a1, count(distinct a) as cnt_a2, sum(distinct a) as sum_a, sum(distinct b) as sum_b, avg(b) as avg_b, count(distinct d) as concat_d from MyTable group by e").await();
        List<String> result = TestValuesTableFactory.getResults("MySink");
        this.assertResult(Arrays.asList("+I[1, 1, 4, 12, 32, 6.0, 5]", "+I[2, 1, 4, 14, 57, 8.0, 7]", "+I[3, 1, 2, 8, 31, 10.0, 3]"), result);
    }

    @Test
    public void testUserDefinedAggCallsWithoutMerge() throws Exception {
        this.tableEnv.createTemporaryFunction("my_sum1", (UserDefinedFunction)new JavaUserDefinedAggFunctions.VarSum1AggFunction());
        this.tableEnv.createFunction("my_avg", JavaUserDefinedAggFunctions.WeightedAvg.class);
        this.tableEnv.createTemporarySystemFunction("my_sum2", JavaUserDefinedAggFunctions.VarSum2AggFunction.class);
        this.createTestValuesSourceTable("MyTable", (List<Row>)JavaScalaConversionUtil.toJava(TestData.data2()), "a int", "b bigint", "c int", "d varchar", "e bigint");
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "d bigint", "s1 bigint", "s2 bigint", "s3 bigint", "primary key (d) not enforced");
        this.compileSqlAndExecutePlan("insert into MySink select e, my_sum1(c, 10) as s1, my_sum2(5, c) as s2, my_avg(e, a) as s3 from MyTable group by e").await();
        List<String> result = TestValuesTableFactory.getResults("MySink");
        this.assertResult(Arrays.asList("+I[1, 77, 0, 1]", "+I[2, 120, 0, 2]", "+I[3, 58, 0, 3]"), result);
    }

    @Test
    public void testUserDefinedAggCallsWithMerge() throws Exception {
        this.tableEnv.createFunction("my_avg", JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class);
        this.tableEnv.createTemporarySystemFunction("my_concat_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
        this.createTestValuesSourceTable("MyTable", (List<Row>)JavaScalaConversionUtil.toJava(TestData.data2()), "a int", "b bigint", "c int", "d varchar", "e bigint");
        this.createTestNonInsertOnlyValuesSinkTable("MySink", "d bigint", "s1 bigint", "c1 varchar", "primary key (d) not enforced");
        this.compileSqlAndExecutePlan("insert into MySink select e, my_avg(e, a) as s1, my_concat_agg(d) as c1 from MyTable group by e").await();
        List<String> result = TestValuesTableFactory.getResults("MySink");
        this.assertResult(Arrays.asList("+I[1, 1, Hallo Welt wie|Hallo|GHI|EFG|DEF]", "+I[2, 2, Hallo Welt wie gehts?|Hallo Welt|ABC|FGH|CDE|JKL|KLM]", "+I[3, 3, HIJ|IJK|BCD]"), result);
    }
}

