/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.batch.sql;

import java.util.Arrays;
import java.util.stream.Stream;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.planner.factories.TestValuesCatalog;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import scala.collection.Iterable;
import scala.collection.Seq;

class RuntimeFilterITCase
extends BatchTestBase {
    private final TestValuesCatalog catalog = new TestValuesCatalog("testCatalog", "test_database", true);
    private TableEnvironment tEnv;

    RuntimeFilterITCase() {
    }

    static Stream<Arguments> parameters() {
        return Stream.of(Arguments.of((Object[])new Object[]{BatchShuffleMode.ALL_EXCHANGES_BLOCKING, true}), Arguments.of((Object[])new Object[]{BatchShuffleMode.ALL_EXCHANGES_BLOCKING, false}), Arguments.of((Object[])new Object[]{BatchShuffleMode.ALL_EXCHANGES_PIPELINED, true}), Arguments.of((Object[])new Object[]{BatchShuffleMode.ALL_EXCHANGES_PIPELINED, false}));
    }

    @Override
    @BeforeEach
    public void before() throws Exception {
        super.before();
        this.tEnv = this.tEnv();
        this.catalog.open();
        this.tEnv.registerCatalog("testCatalog", (Catalog)this.catalog);
        this.tEnv.useCatalog("testCatalog");
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, (Object)true);
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, (Object)MemorySize.parse((String)"10m"));
        this.tEnv.executeSql(String.format("CREATE TABLE dim (\n  x INT,\n  y INT,\n  z BIGINT\n)  WITH (\n 'connector' = 'values',\n 'disable-lookup' = 'true',\n 'data-id' = '%s',\n 'bounded' = 'true'\n)", TestValuesTableFactory.registerData(TestData.data7())));
        this.tEnv.executeSql(String.format("CREATE TABLE fact (\n  `a` INT,\n  `b` BIGINT,\n  `c` INT,\n  `d` VARCHAR,\n  `e` BIGINT\n) WITH (\n  'connector' = 'values',\n  'runtime-source' = 'NewSource',\n  'data-id' = '%s',\n  'disable-lookup' = 'true',\n  'bounded' = 'true'\n)", TestValuesTableFactory.registerData(TestData.data5())));
        this.catalog.alterTableStatistics(new ObjectPath(this.tEnv.getCurrentDatabase(), "dim"), new CatalogTableStatistics(131072L, 1, 1L, 1L), false);
        this.catalog.alterTableStatistics(new ObjectPath(this.tEnv.getCurrentDatabase(), "fact"), new CatalogTableStatistics(0x40000000L, 1, 1L, 1L), false);
    }

    @ParameterizedTest(name="mode = {0}, ofcg = {1}")
    @MethodSource(value={"parameters"})
    void testSimpleRuntimeFilter(BatchShuffleMode shuffleMode, boolean ofcg) {
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, (Object)ofcg);
        RuntimeFilterITCase.configBatchShuffleMode(this.tEnv.getConfig(), shuffleMode);
        String sql = "select * from fact, dim where x = a and z >= 3";
        Assertions.assertThat((String)this.tEnv().explainSql(sql, new ExplainDetail[0])).contains(new CharSequence[]{"RuntimeFilter"});
        this.checkResult(sql, (Seq<Row>)JavaScalaConversionUtil.toScala(Arrays.asList(Row.of((Object[])new Object[]{3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 4, 3}), Row.of((Object[])new Object[]{3, 5, 4, "ABC", 2, 3, 4, 3}), Row.of((Object[])new Object[]{3, 6, 5, "BCD", 3, 3, 4, 3}), Row.of((Object[])new Object[]{5, 11, 10, "GHI", 1, 5, 10, 3}), Row.of((Object[])new Object[]{5, 12, 11, "HIJ", 3, 5, 10, 3}), Row.of((Object[])new Object[]{5, 13, 12, "IJK", 3, 5, 10, 3}), Row.of((Object[])new Object[]{5, 14, 13, "JKL", 2, 5, 10, 3}), Row.of((Object[])new Object[]{5, 15, 14, "KLM", 2, 5, 10, 3}))), false);
    }

    @ParameterizedTest(name="mode = {0}, ofcg = {1}")
    @MethodSource(value={"parameters"})
    void testRuntimeFilterWithBuildSidePushDown(BatchShuffleMode shuffleMode, boolean ofcg) {
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, (Object)ofcg);
        RuntimeFilterITCase.configBatchShuffleMode(this.tEnv.getConfig(), shuffleMode);
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, (Object)-1L);
        this.tEnv.getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)"ONE_PHASE");
        String sql = "select * from fact join (select x, sum(z) from dim where z = 2 group by x) dimSide on x = a";
        Assertions.assertThat((String)this.tEnv().explainSql(sql, new ExplainDetail[0])).contains(new CharSequence[]{"RuntimeFilter"});
        this.checkResult(sql, (Seq<Row>)JavaScalaConversionUtil.toScala(Arrays.asList(Row.of((Object[])new Object[]{2, 2, 1, "Hallo Welt", 2, 2, 2}), Row.of((Object[])new Object[]{2, 3, 2, "Hallo Welt wie", 1, 2, 2}), Row.of((Object[])new Object[]{3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 2}), Row.of((Object[])new Object[]{3, 5, 4, "ABC", 2, 3, 2}), Row.of((Object[])new Object[]{3, 6, 5, "BCD", 3, 3, 2}), Row.of((Object[])new Object[]{4, 10, 9, "FGH", 2, 4, 4}), Row.of((Object[])new Object[]{4, 7, 6, "CDE", 2, 4, 4}), Row.of((Object[])new Object[]{4, 8, 7, "DEF", 1, 4, 4}), Row.of((Object[])new Object[]{4, 9, 8, "EFG", 1, 4, 4}), Row.of((Object[])new Object[]{5, 11, 10, "GHI", 1, 5, 2}), Row.of((Object[])new Object[]{5, 12, 11, "HIJ", 3, 5, 2}), Row.of((Object[])new Object[]{5, 13, 12, "IJK", 3, 5, 2}), Row.of((Object[])new Object[]{5, 14, 13, "JKL", 2, 5, 2}), Row.of((Object[])new Object[]{5, 15, 14, "KLM", 2, 5, 2}))), false);
    }

    @ParameterizedTest(name="mode = {0}, ofcg = {1}")
    @MethodSource(value={"parameters"})
    void testRuntimeFilterWithProbeSidePushDown(BatchShuffleMode shuffleMode, boolean ofcg) throws Exception {
        this.tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED, (Object)ofcg);
        RuntimeFilterITCase.configBatchShuffleMode(this.tEnv.getConfig(), shuffleMode);
        this.tEnv.executeSql(String.format("CREATE TABLE fact2 (\n  `a` INT,\n  `b` BIGINT\n) WITH (\n  'connector' = 'values',\n  'runtime-source' = 'NewSource',\n  'data-id' = '%s',\n  'disable-lookup' = 'true',\n  'bounded' = 'true'\n)", TestValuesTableFactory.registerData((Seq<Row>)((Iterable)TestData.data5().take(5)).toList())));
        this.catalog.alterTableStatistics(new ObjectPath(this.tEnv.getCurrentDatabase(), "fact2"), new CatalogTableStatistics(0x40000000L, 1, 1L, 1L), false);
        String sql = "select * from fact, fact2, dim where fact.a = fact2.a and fact.a = dim.x and z = 2";
        Assertions.assertThat((String)this.tEnv().explainSql(sql, new ExplainDetail[0])).contains(new CharSequence[]{"RuntimeFilter"});
        this.checkResult(sql, (Seq<Row>)JavaScalaConversionUtil.toScala(Arrays.asList(Row.of((Object[])new Object[]{2, 2, 1, "Hallo Welt", 2, 2, 2, 2, 2, 2}), Row.of((Object[])new Object[]{2, 2, 1, "Hallo Welt", 2, 2, 3, 2, 2, 2}), Row.of((Object[])new Object[]{2, 3, 2, "Hallo Welt wie", 1, 2, 2, 2, 2, 2}), Row.of((Object[])new Object[]{2, 3, 2, "Hallo Welt wie", 1, 2, 3, 2, 2, 2}), Row.of((Object[])new Object[]{3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 4, 3, 3, 2}), Row.of((Object[])new Object[]{3, 4, 3, "Hallo Welt wie gehts?", 2, 3, 5, 3, 3, 2}), Row.of((Object[])new Object[]{3, 5, 4, "ABC", 2, 3, 4, 3, 3, 2}), Row.of((Object[])new Object[]{3, 5, 4, "ABC", 2, 3, 5, 3, 3, 2}), Row.of((Object[])new Object[]{3, 6, 5, "BCD", 3, 3, 4, 3, 3, 2}), Row.of((Object[])new Object[]{3, 6, 5, "BCD", 3, 3, 5, 3, 3, 2}))), false);
    }
}

