/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.time.Duration;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.rules.physical.stream.IncrementalAggregateRule;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class IncrementalAggregateJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.TWO_PHASE.name()).set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, (Object)true).set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)true).set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(10L)).set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, (Object)5L).set(IncrementalAggregateRule.TABLE_OPTIMIZER_INCREMENTAL_AGG_ENABLED(), (Object)true);
        String srcTableDdl = "CREATE TABLE MyTable (\n  a bigint,\n  b int not null,\n  c varchar,\n  d bigint\n) with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        this.tEnv.executeSql(srcTableDdl);
    }

    @Test
    public void testIncrementalAggregate() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a bigint,\n  c bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select a, count(distinct c) as c from MyTable group by a");
    }

    @Test
    public void testIncrementalAggregateWithSumCountDistinctAndRetraction() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  b bigint,\n  sum_b int,\n  cnt_distinct_b bigint,\n  cnt1 bigint\n) with (\n  'connector' = 'values',\n  'sink-insert-only' = 'false',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select b, sum(b1), count(distinct b1), count(1)  from    (select a, count(b) as b, max(b) as b1 from MyTable group by a) group by b");
    }
}

