/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.stream.sql.SubplanReuseTest$;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.runtime.functions.aggregate.FirstValueAggFunction;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005%b\u0001B\u0001\u0003\u0001M\u0011\u0001cU;ca2\fgNU3vg\u0016$Vm\u001d;\u000b\u0005\r!\u0011aA:rY*\u0011QAB\u0001\u0007gR\u0014X-Y7\u000b\u0005\u001dA\u0011\u0001\u00029mC:T!!\u0003\u0006\u0002\u000fAd\u0017M\u001c8fe*\u00111\u0002D\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u001b9\tQA\u001a7j].T!a\u0004\t\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0012aA8sO\u000e\u00011C\u0001\u0001\u0015!\t)\u0002$D\u0001\u0017\u0015\t9\u0002\"A\u0003vi&d7/\u0003\u0002\u001a-\tiA+\u00192mKR+7\u000f\u001e\"bg\u0016DQa\u0007\u0001\u0005\u0002q\ta\u0001P5oSRtD#A\u000f\u0011\u0005y\u0001Q\"\u0001\u0002\t\u000f\u0001\u0002!\u0019!C\u0005C\u0005!Q\u000f^5m+\u0005\u0011\u0003CA\u000b$\u0013\t!cCA\nTiJ,\u0017-\u001c+bE2,G+Z:u+RLG\u000e\u0003\u0004'\u0001\u0001\u0006IAI\u0001\u0006kRLG\u000e\t\u0005\u0006Q\u0001!\t!K\u0001\u0007E\u00164wN]3\u0015\u0003)\u0002\"a\u000b\u0018\u000e\u00031R\u0011!L\u0001\u0006g\u000e\fG.Y\u0005\u0003_1\u0012A!\u00168ji\"\u0012q%\r\t\u0003eej\u0011a\r\u0006\u0003iU\n1!\u00199j\u0015\t1t'A\u0004kkBLG/\u001a:\u000b\u0005a\u0002\u0012!\u00026v]&$\u0018B\u0001\u001e4\u0005)\u0011UMZ8sK\u0016\u000b7\r\u001b\u0005\u0006y\u0001!\t!K\u0001\u0018i\u0016\u001cH\u000fR5tC\ndWmU;ca2\fgNU3vg\u0016D#a\u000f \u0011\u0005Iz\u0014B\u0001!4\u0005\u0011!Vm\u001d;\t\u000b\t\u0003A\u0011A\u0015\u0002IQ,7\u000f^*vEBd\u0017M\u001c*fkN,w+\u001b;i\t&4g-\u001a:f]R\u0014vn\u001e+za\u0016D#!\u0011 \t\u000b\u0015\u0003A\u0011A\u0015\u00025Q,7\u000f^#oC\ndWMU3vg\u0016$\u0016M\u00197f'>,(oY3)\u0005\u0011s\u0004\"\u0002%\u0001\t\u0003I\u0013a\u0007;fgR$\u0015n]1cY\u0016\u0014V-^:f)\u0006\u0014G.Z*pkJ\u001cW\r\u000b\u0002H}!)1\n\u0001C\u0001S\u00051B/Z:u'V\u0014\u0007\u000f\\1o%\u0016,8/Z(o\u0007\u0006d7\r\u000b\u0002K}!)a\n\u0001C\u0001S\u0005\tD/Z:u'V\u0014\u0007\u000f\\1o%\u0016,8/Z(o\u0007\u0006d7mV5uQ:{g\u000eR3uKJl\u0017N\\5ti&\u001c\u0007K]8kK\u000e$\bFA'?\u0011\u0015\t\u0006\u0001\"\u0001*\u00035\"Xm\u001d;Tk\n\u0004H.\u00198SKV\u001cXm\u00148DC2\u001cw+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e,FM\u001a\u0015\u0003!zBQ\u0001\u0016\u0001\u0005\u0002%\n!\u0004^3tiN+(\r\u001d7b]J+Wo]3P]\u0016C8\r[1oO\u0016D#a\u0015 \t\u000b]\u0003A\u0011A\u0015\u0002AQ,7\u000f^*vEBd\u0017M\u001c*fkN,wJ\\$s_V\u0004\u0018iZ4sK\u001e\fG/\u001a\u0015\u0003-zBQA\u0017\u0001\u0005\u0002%\na\u0007^3tiN+(\r\u001d7b]J+Wo]3P]\u0006;wM]3hCR,w+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e\fumZ\"bY2D#!\u0017 \t\u000bu\u0003A\u0011A\u0015\u0002-Q,7\u000f^*vEBd\u0017M\u001c*fkN,wJ\\*peRD#\u0001\u0018 \t\u000b\u0001\u0004A\u0011A\u0015\u0002/Q,7\u000f^*vEBd\u0017M\u001c*fkN,wJ\u001c'j[&$\bFA0?\u0011\u0015\u0019\u0007\u0001\"\u0001*\u0003m!Xm\u001d;Tk\n\u0004H.\u00198SKV\u001cXm\u00148T_J$H*[7ji\"\u0012!M\u0010\u0005\u0006M\u0002!\t!K\u0001\u0017i\u0016\u001cHoU;ca2\fgNU3vg\u0016|eNS8j]\"\u0012QM\u0010\u0005\u0006S\u0002!\t!K\u00014i\u0016\u001cHoU;ca2\fgNU3vg\u0016|eNS8j]:{g\u000eR3uKJl\u0017N\\5ti&\u001c'j\\5o\u0007>tG-\u001b;j_:D#\u0001\u001b \t\u000b1\u0004A\u0011A\u0015\u00029Q,7\u000f^*vEBd\u0017M\u001c*fkN,wJ\\(wKJ<\u0016N\u001c3po\"\u00121N\u0010\u0005\u0006_\u0002!\t!K\u00018i\u0016\u001cHoU;ca2\fgNU3vg\u0016|en\u0014<fe^Kg\u000eZ8x/&$\bNT8o\t\u0016$XM]7j]&\u001cH/[2BO\u001e\u001c\u0015\r\u001c7)\u00059t\u0004\"\u0002:\u0001\t\u0003I\u0013a\u0007;fgR\u001cVO\u00199mC:\u0014V-^:f\u001f:\u001cuN\u001d:fY\u0006$X\r\u000b\u0002r}!)Q\u000f\u0001C\u0001S\u0005\u0019D/Z:u'V\u0014\u0007\u000f\\1o%\u0016,8/Z(o\u0007>\u0014(/\u001a7bi\u0016<\u0016\u000e\u001e5O_:$U\r^3s[&t\u0017n\u001d;jGV#EK\u0012\u0015\u0003izBQ\u0001\u001f\u0001\u0005\u0002%\n1\u0005^3tiN+(\r\u001d7b]J+Wo]3XSRDG)\u001f8b[&\u001cg)\u001e8di&|g\u000e\u000b\u0002x}!)1\u0010\u0001C\u0001S\u0005)C/Z:u\u000b:\f'\r\\3SKV\u001cX\rV1cY\u0016\u001cv.\u001e:dK>sg*Z<T_V\u00148-\u001a\u0015\u0003uzBQA \u0001\u0005\u0002%\na\u0005^3ti\u0012K7/\u00192mKJ+Wo]3UC\ndWmU8ve\u000e,wJ\u001c(foN{WO]2fQ\tih\b\u0003\u0004\u0002\u0004\u0001!\t!K\u00017i\u0016\u001cHOU3vg\u0016$\u0016M\u00197f'>,(oY3XSRD\u0007K]8kK\u000e$\b+^:i\t><h.\u00118e\u001b\u0016$\u0018\rR1uC.+\u00170\r\u0015\u0004\u0003\u0003q\u0004BBA\u0005\u0001\u0011\u0005\u0011&A\u001buKN$(+Z;tKR\u000b'\r\\3T_V\u00148-Z,ji\"\u0004&o\u001c6fGR\u0004Vo\u001d5E_^t\u0017I\u001c3NKR\fG)\u0019;b\u0017\u0016L\bfAA\u0004}!1\u0011q\u0002\u0001\u0005\u0002%\na\u0007^3tiN{WO]2f%\u0016,8/Z,ji\",U\u000e\u001d;z\r&dG/\u001a:D_:$\u0017I\u001c3JO:|'/Z#naRLh)\u001b7uKJD3!!\u0004?\u0011\u0019\t)\u0002\u0001C\u0001S\u0005QD/Z:u'>,(oY3SKV\u001cXmV5uQ\u0016k\u0007\u000f^=GS2$XM]\"p]\u0012\fe\u000eZ%h]>\u0014X-R7qif4\u0015\u000e\u001c;feR\u0013X/\u001a\u0015\u0004\u0003'q\u0004BBA\u000e\u0001\u0011\u0005\u0011&A\u001euKN$8k\\;sG\u0016\u0014V-^:f/&$\b.R7qif4\u0015\u000e\u001c;fe\u000e{g\u000eZ!oI&;gn\u001c:f\u000b6\u0004H/\u001f$jYR,'\u000f\u0016:vKJB3!!\u0007?\u0011\u0019\t\t\u0003\u0001C\u0001S\u0005YD/Z:u'>,(oY3SKV\u001cXmV5uQ\u0016k\u0007\u000f^=GS2$XM]\"p]\u0012\fe\u000eZ%h]>\u0014X-R7qif4\u0015\u000e\u001c;feR\u0013X/Z\u001a)\u0007\u0005}a\b\u0003\u0004\u0002(\u0001!I!K\u0001\u0015i\u0016\u001cHOU3vg\u0016|eNT3x'>,(oY3")
public class SubplanReuseTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @BeforeEach
    public void before() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        this.util().addTableSource("x", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.SubplanReuseTest$$anon$3 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().addTableSource("y", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "d")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "e")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "f"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.SubplanReuseTest$$anon$4 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
    }

    @Test
    public void testDisableSubplanReuse() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (\n        | SELECT a, SUM(b) as b, SUM(e) as e FROM x, y WHERE a = d AND c > 100 GROUP BY a\n        |)\n        |SELECT r1.a, r1.b, r2.e FROM r r1, r r2 WHERE r1.b > 10 AND r2.e < 20 AND r1.a = r2.a\n      ")).stripMargin();
        this.util().verifyRelPlanNotExpected(sqlQuery, (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reused"}));
    }

    @Test
    public void testSubplanReuseWithDifferentRowType() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t1 AS (SELECT CAST(a as BIGINT) AS a, SUM(b) AS b FROM x GROUP BY CAST(a as BIGINT)),\n        |     t2 AS (SELECT CAST(a as DOUBLE) AS a, SUM(b) AS b FROM x GROUP BY CAST(a as DOUBLE))\n        |SELECT t1.*, t2.* FROM t1, t2 WHERE t1.b = t2.b\n      ")).stripMargin();
        this.util().verifyRelPlanNotExpected(sqlQuery, (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reused"}));
    }

    @Test
    public void testEnableReuseTableSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t AS (SELECT x.a AS a, x.b AS b, y.d AS d, y.e AS e FROM x, y WHERE x.a = y.d)\n        |SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testDisableReuseTableSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t AS (SELECT * FROM x, y WHERE x.a = y.d)\n        |SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCalc() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c FROM x WHERE c LIKE 'test%')\n        |(SELECT r.a, LOWER(c) AS c, y.e FROM r, y WHERE r.a = y.d)\n        |UNION ALL\n        |(SELECT r.a, LOWER(c) AS c, y.e FROM r, y WHERE r.a = y.d)\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCalcWithNonDeterministicProject() {
        this.util().addTemporarySystemFunction("random_udf", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |(SELECT a, random_udf() FROM x WHERE a > 10)\n        |UNION ALL\n        |(SELECT a, random_udf() FROM x WHERE a > 10)\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCalcWithNonDeterministicUdf() {
        this.util().addTemporarySystemFunction("random_udf", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |(SELECT a FROM x WHERE b > random_udf(a))\n        |UNION ALL\n        |(SELECT a FROM x WHERE b > random_udf(a))\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnExchange() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c FROM x WHERE c LIKE 'test%')\n        |SELECT * FROM r, y WHERE a = d AND e > 10\n        |UNION ALL\n        |SELECT * FROM r, y WHERE a = d AND f <> ''\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnGroupAggregate() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.b AND r2.a > 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnAggregateWithNonDeterministicAggCall() {
        this.util().addTemporarySystemFunction("MyFirst", (UserDefinedFunction)new FirstValueAggFunction(DataTypes.INT().getLogicalType()));
        this.util().addTemporarySystemFunction("MyLast", (UserDefinedFunction)new FirstValueAggFunction(DataTypes.BIGINT().getLogicalType()));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, MyFirst(a) a, MyLast(b) b FROM x GROUP BY c)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.b AND r2.a > 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnSort() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c ORDER BY a, b DESC)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.a AND r1.a > 1 AND r2.b < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnLimit() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, (Object)"HashJoin,SortMergeJoin");
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b FROM x LIMIT 10)\n        |\n        |SELECT a, b FROM r WHERE a > 10\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r WHERE b < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnSortLimit() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c ORDER BY a, b DESC LIMIT 10)\n        |\n        |SELECT a, b FROM r WHERE a > 10\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r WHERE b < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT * FROM x FULL OUTER JOIN y ON ABS(a) = ABS(d) OR c = f\n        |           WHERE b > 1 and e < 2)\n        |\n        |SELECT a, b FROM r\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnJoinNonDeterministicJoinCondition() {
        this.util().addTemporarySystemFunction("random_udf", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT * FROM x FULL OUTER JOIN y ON random_udf(a) = random_udf(d) OR c = f\n        |           WHERE b > 1 and e < 2)\n        |\n        |SELECT a, b FROM r\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnOverWindow() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, RANK() OVER (ORDER BY c DESC) FROM x)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.a AND r1.b < 100 AND r2.b > 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnOverWindowWithNonDeterministicAggCall() {
        this.util().addTemporarySystemFunction("MyFirst", (UserDefinedFunction)new FirstValueAggFunction(DataTypes.STRING().getLogicalType()));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, MyFirst(c) OVER (PARTITION BY c ORDER BY c DESC) FROM x)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.a AND r1.b < 100 AND r2.b > 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCorrelate() {
        this.util().addTemporarySystemFunction("str_split", (UserDefinedFunction)new JavaUserDefinedTableFunctions.StringSplit());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v))\n        |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCorrelateWithNonDeterministicUDTF() {
        this.util().addTemporarySystemFunction("TableFun", (UserDefinedFunction)new JavaUserDefinedTableFunctions.NonDeterministicTableFunc());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c, s FROM x, LATERAL TABLE(TableFun(c)) AS T(s))\n        |SELECT * FROM r r1, r r2 WHERE r1.c = r2.s\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseWithDynamicFunction() {
        Table sqlQuery = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                            |(SELECT a AS random FROM x ORDER BY rand() LIMIT 1)\n                                            |INTERSECT\n                                            |(SELECT a AS random FROM x ORDER BY rand() LIMIT 1)\n                                            |INTERSECT\n                                            |(SELECT a AS random FROM x ORDER BY rand() LIMIT 1)\n      ")).stripMargin());
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEnableReuseTableSourceOnNewSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.testReuseOnNewSource();
    }

    @Test
    public void testDisableReuseTableSourceOnNewSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        this.testReuseOnNewSource();
    }

    @Test
    public void testReuseTableSourceWithProjectPushDownAndMetaDataKey1() {
        String ddl1 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE reuseTable (\n         |  a bigint,\n         |  b varchar(64),\n         |  c bigint,\n         |  d STRING,\n         |  ts1 TIMESTAMP(3) METADATA,\n         |  ts2 TIMESTAMP(3) METADATA\n         | ) WITH (\n         |  'connector' = 'values',\n         |  'readable-metadata' = 'ts1:TIMESTAMP(3), ts2:TIMESTAMP(3)'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl1);
        String ddl2 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink1 (\n         |  a1 bigint,\n         |  b1 VARCHAR(32),\n         |  my_time1 timestamp,\n         |  d1 DECIMAL(20,2)\n         | ) WITH (\n         |  'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl2);
        String ddl3 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink2 (\n         |  a2 bigint,\n         | `update_time` timestamp\n         | ) WITH (\n         | 'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl3);
        String query1 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, b, ts1, CAST(d AS DECIMAL(28,2)) AS d1\n         | FROM reuseTable\n         |")).stripMargin();
        Table table = this.util().tableEnv().sqlQuery(query1);
        this.util().tableEnv().createTemporaryView("view1", table);
        String query2 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, ts1 AS update_time\n         | FROM reuseTable\n         |")).stripMargin();
        Table table2 = this.util().tableEnv().sqlQuery(query2);
        this.util().tableEnv().createTemporaryView("view2", table2);
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO sink1 SELECT a, b, ts1, d1 FROM view1");
        stmtSet.addInsertSql("INSERT INTO sink2 SELECT a, update_time FROM view2");
        this.util().verifyExecPlan(stmtSet);
    }

    @Test
    public void testReuseTableSourceWithProjectPushDownAndMetaDataKey() {
        String ddl1 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE reuseTable (\n         |  a bigint,\n         |  b varchar(64),\n         |  c bigint,\n         |  d STRING,\n         |  my_time TIMESTAMP(3) METADATA FROM 'ts1',\n         |  unUse_time TIMESTAMP(3) METADATA FROM 'ts2'\n         | ) WITH (\n         |  'connector' = 'values',\n         |  'readable-metadata' = 'ts1:TIMESTAMP(3), ts2:TIMESTAMP(3)'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl1);
        String ddl2 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink1 (\n         |  a1 bigint,\n         |  b1 VARCHAR(32),\n         |  my_time1 timestamp,\n         |  d1 DECIMAL(20,2)\n         | ) WITH (\n         |  'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl2);
        String ddl3 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink2 (\n         |  a2 bigint,\n         | `update_time` timestamp\n         | ) WITH (\n         | 'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl3);
        String query1 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, b, my_time, CAST(d AS DECIMAL(28,2)) AS d1\n         | FROM reuseTable\n         |")).stripMargin();
        Table table = this.util().tableEnv().sqlQuery(query1);
        this.util().tableEnv().createTemporaryView("view1", table);
        String query2 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, my_time AS update_time\n         | FROM reuseTable\n         |")).stripMargin();
        Table table2 = this.util().tableEnv().sqlQuery(query2);
        this.util().tableEnv().createTemporaryView("view2", table2);
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO sink1 SELECT a, b, my_time, d1 FROM view1");
        stmtSet.addInsertSql("INSERT INTO sink2 SELECT a, update_time FROM view2");
        this.util().verifyExecPlan(stmtSet);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilter() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T1.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilterTrue() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T1.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilterTrue2() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T2.a < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilterTrue3() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T1.a > 10 AND T2.a < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    private void testReuseOnNewSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table newX(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table newY(\n                     |  d int,\n                     |  e bigint,\n                     |  f varchar\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t AS (\n        |  SELECT newX.a AS a, newX.b AS b, newY.d AS d, newY.e AS e\n        |  FROM newX, newY\n        |  WHERE newX.a = newY.d)\n        |SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }
}

