/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.MockPythonTableFunction;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class PythonCorrelateJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        TableConfig tableConfig = TableConfig.getDefault();
        this.util = this.streamTestUtil(tableConfig);
        this.tEnv = this.util.getTableEnv();
        this.tEnv.createTemporaryFunction("TableFunc", (UserDefinedFunction)new MockPythonTableFunction());
        this.tEnv.createTemporaryFunction("pyFunc", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.PythonScalarFunction("pyFunc"));
        String srcTableDdl = "CREATE TABLE MyTable (\n  a int,\n  b int,\n  c int,\n  d timestamp(3)\n) with (\n  'connector' = 'values',\n  'bounded' = 'false')";
        this.tEnv.executeSql(srcTableDdl);
    }

    @Test
    public void testPythonTableFunction() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int,\n  b int\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        String sqlQuery = "INSERT INTO MySink SELECT x, y FROM MyTable LEFT JOIN LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) ON TRUE";
        this.util.verifyJsonPlan(sqlQuery);
    }

    @Test
    public void testJoinWithFilter() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int,\n  b int\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        String sqlQuery = "INSERT INTO MySink SELECT x, y FROM MyTable, LATERAL TABLE(TableFunc(a * a, pyFunc(a, b))) AS T(x, y) WHERE x = a and y + 1 = y * y";
        this.util.verifyJsonPlan(sqlQuery);
    }
}

