/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.logical;

import java.time.Duration;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.tools.RuleSets;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
import org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
import org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanAcrossCalcRule;
import org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRule;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class PushWatermarkIntoTableSourceScanRuleTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(new TableConfig());

    @Before
    public void setup() {
        FlinkChainedProgram program = new FlinkChainedProgram();
        program.addLast("Converters", (FlinkOptimizeProgram)FlinkVolcanoProgramBuilder.newBuilder().add(RuleSets.ofList((RelOptRule[])new RelOptRule[]{CoreRules.PROJECT_TO_CALC, CoreRules.FILTER_TO_CALC, FlinkLogicalCalc.CONVERTER(), FlinkLogicalTableSourceScan.CONVERTER(), FlinkLogicalWatermarkAssigner.CONVERTER()})).setRequiredOutputTraits((RelTrait[])new Convention[]{FlinkConventions.LOGICAL()}).build());
        program.addLast("PushWatermarkIntoTableSourceScanRule", (FlinkOptimizeProgram)FlinkHepRuleSetProgramBuilder.newBuilder().setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE()).setHepMatchOrder(HepMatchOrder.BOTTOM_UP).add(RuleSets.ofList((RelOptRule[])new RelOptRule[]{PushWatermarkIntoTableSourceScanRule.INSTANCE, PushWatermarkIntoTableSourceScanAcrossCalcRule.INSTANCE})).build());
        this.util.replaceStreamProgram((FlinkChainedProgram<StreamOptimizeContext>)program);
    }

    @Test
    public void testSimpleWatermark() {
        String ddl = "CREATE TABLE MyTable(  a INT,\n  b BIGINT,\n  c TIMESTAMP(3),\n  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n) WITH (\n  'connector' = 'values',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("select a, c from MyTable");
    }

    @Test
    public void testWatermarkOnComputedColumn() {
        String ddl = "CREATE TABLE MyTable(  a INT,\n  b BIGINT,\n  c TIMESTAMP(3),\n  d AS c + INTERVAL '5' SECOND,\n  WATERMARK FOR d AS d - INTERVAL '5' SECOND\n) WITH (\n 'connector' = 'values',\n 'enable-watermark-push-down' = 'true',\n 'bounded' = 'false',\n 'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT * from MyTable");
    }

    @Test
    public void testWatermarkOnComputedColumnWithQuery() {
        String ddl = "CREATE TABLE MyTable(  a INT,\n  b BIGINT,\n  c TIMESTAMP(3) NOT NULL,\n  d AS c + INTERVAL '5' SECOND,\n  WATERMARK FOR d AS d - INTERVAL '5' SECOND\n) WITH (\n  'connector' = 'values',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT a, b FROM MyTable WHERE d > TO_TIMESTAMP('2020-10-09 12:12:12')");
    }

    @Test
    public void testWatermarkOnComputedColumnWithMultipleInputs() {
        String ddl = "CREATE TABLE MyTable(  a STRING,\n  b STRING,\n  c as TO_TIMESTAMP(a, b),\n  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n) WITH (\n  'connector' = 'values',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testWatermarkOnRow() {
        String ddl = "CREATE TABLE MyTable(  a INT,\n  b BIGINT,\n  c ROW<name STRING, d TIMESTAMP(3)>,  e AS c.d,  WATERMARK FOR e AS e - INTERVAL '5' SECOND\n) WITH (\n  'connector' = 'values',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testWatermarkOnNestedRow() {
        String ddl = "CREATE TABLE MyTable(  a INT,\n  b BIGINT,\n  c ROW<name STRING, d row<e STRING, f TIMESTAMP(3)>>,  g as c.d.f,  WATERMARK for g as g - INTERVAL '5' SECOND\n) WITH (\n  'connector' = 'values',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testWatermarkWithMultiInputUdf() {
        JavaUserDefinedScalarFunctions.JavaFunc5.closeCalled = false;
        JavaUserDefinedScalarFunctions.JavaFunc5.openCalled = false;
        this.util.addTemporarySystemFunction("func", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.JavaFunc5());
        String ddl = "CREATE TABLE MyTable(\n  a INT,\n  b BIGINT,\n  c TIMESTAMP(3),\n  d AS func(c, a),\n  WATERMARK FOR d AS func(func(d, a), a)\n) WITH (\n  'connector' = 'values',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testWatermarkOnMetadata() {
        String ddl = "CREATE TABLE MyTable(  `a` INT,\n  `b` BIGINT,\n  `c` TIMESTAMP(3),\n  `metadata` BIGINT METADATA FROM 'metadata_2' VIRTUAL,\n  `computed` AS `metadata` + `b`,\n  WATERMARK for `c` as c - CAST(`metadata` + `computed` AS INTERVAL SECOND)\n) WITH (\n  'connector' = 'values',\n  'readable-metadata' = 'metadata_1:STRING,metadata_2:INT',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testWatermarkWithIdleSource() {
        this.util.tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT, (Object)Duration.ofMillis(1000L));
        String ddl = "CREATE TABLE MyTable(  a INT,\n  b BIGINT,\n  c TIMESTAMP(3),\n  WATERMARK FOR c AS c - INTERVAL '5' SECOND\n) WITH (\n  'connector' = 'values',\n  'enable-watermark-push-down' = 'true',\n  'bounded' = 'false',\n  'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("select a, c from MyTable");
    }

    @Test
    public void testWatermarkWithPythonFunctionInComputedColumn() {
        this.util.tableEnv().createTemporaryFunction("parse_ts", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.PythonTimestampScalarFunction());
        String ddl = "CREATE TABLE MyTable(  a INT,\n  b AS parse_ts(a),\n  WATERMARK FOR b AS b\n) WITH (\n 'connector' = 'values',\n 'enable-watermark-push-down' = 'true',\n 'bounded' = 'false',\n 'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        this.util.verifyRelPlan("SELECT * FROM MyTable");
    }
}

