/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.stream.sql.TableScanTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001dd\u0001B\u0001\u0003\u0001M\u0011Q\u0002V1cY\u0016\u001c6-\u00198UKN$(BA\u0002\u0005\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000b\u0019\taa\u001d;sK\u0006l'BA\u0004\t\u0003\u0011\u0001H.\u00198\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\u0011\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\t\u000bm\u0001A\u0011\u0001\u000f\u0002\rqJg.\u001b;?)\u0005i\u0002C\u0001\u0010\u0001\u001b\u0005\u0011\u0001b\u0002\u0011\u0001\u0005\u0004%I!I\u0001\u0005kRLG.F\u0001#!\t)2%\u0003\u0002%-\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\"1a\u0005\u0001Q\u0001\n\t\nQ!\u001e;jY\u0002BQ\u0001\u000b\u0001\u0005\u0002%\n\u0011\u0004^3ti2+w-Y2z)\u0006\u0014G.Z*pkJ\u001cWmU2b]R\t!\u0006\u0005\u0002,]5\tAFC\u0001.\u0003\u0015\u00198-\u00197b\u0013\tyCF\u0001\u0003V]&$\bFA\u00142!\t\u0011T'D\u00014\u0015\t!\u0004#A\u0003kk:LG/\u0003\u00027g\t!A+Z:u\u0011\u0015A\u0004\u0001\"\u0001*\u0003I!Xm\u001d;ECR\f7\u000b\u001e:fC6\u001c6-\u00198)\u0005]\n\u0004\"B\u001e\u0001\t\u0003I\u0013\u0001\u0005;fgR$E\t\u0014+bE2,7kY1oQ\tQ\u0014\u0007C\u0003?\u0001\u0011\u0005\u0011&A\ruKN$H\t\u0012'XSRD7i\\7qkR,GmQ8mk6t\u0007FA\u001f2\u0011\u0015\t\u0005\u0001\"\u0001*\u0003e!Xm\u001d;E\t2;\u0016\u000e\u001e5NKR\fG-\u0019;b\u0007>dW/\u001c8)\u0005\u0001\u000b\u0004\"\u0002#\u0001\t\u0003I\u0013A\r;fgR$E\tT,ji\"lU\r^1eCR\fG\u000b[1u\u0007>tg\r\\5diN<\u0016\u000e\u001e5QQf\u001c\u0018nY1m\u0007>dW/\u001c8)\u0005\r\u000b\u0004\"B$\u0001\t\u0003I\u0013a\u000b;fgR$E\tT,ji\"lU\r^1eCR\f7i\u001c7v[:\u0004&o\u001c6fGRLwN\u001c)vg\"$un\u001e8)\u0005\u0019\u000b\u0004\"\u0002&\u0001\t\u0003I\u0013A\t;fgR$E\tT,ji\"<\u0016\r^3s[\u0006\u00148nQ8naV$X\rZ\"pYVlg\u000e\u000b\u0002Jc!)Q\n\u0001C\u0001S\u0005)C/Z:u\t\u0012cu+\u001b;i\u0007>l\u0007/\u001e;fI\u000e{G.^7o%\u00164WM\u001d*poRLW.\u001a\u0015\u0003\u0019FBQ\u0001\u0015\u0001\u0005\u0002%\nQ\u0006^3ti\u0012#EjV5uQ6+H\u000e^5qY\u0016\u001cu\u000e\\;n]N4%o\\7TC6,W*\u001a;bI\u0006$\u0018mS3zQ\ty\u0015\u0007C\u0003T\u0001\u0011\u0005\u0011&\u0001\u0018uKN$H\t\u0012'XSRDW*\u001e7uSBdWmQ8mk6t7O\u0012:p[N\u000bW.Z'fi\u0006$\u0017\r^1LKf\u0014\u0004F\u0001*2\u0011\u00151\u0006\u0001\"\u0001*\u0003\u001d\"Xm\u001d;LKf<xN\u001d3t/&$\bnV1uKJl\u0017M]6D_6\u0004X\u000f^3e\u0007>dW/\u001c8)\u0005U\u000b\u0004\"B-\u0001\t\u0003I\u0013a\u0006;fgR\u001c6-\u00198P]\n{WO\u001c3fIN{WO]2fQ\tA\u0016\u0007C\u0003]\u0001\u0011\u0005\u0011&A\u000euKN$h)\u001b7uKJ|en\u00115b]\u001e,Gn\\4T_V\u00148-\u001a\u0015\u00037FBQa\u0018\u0001\u0005\u0002%\n\u0011\u0004^3tiN\u001b\u0017M\\(o\u0007\"\fgnZ3m_\u001e\u001cv.\u001e:dK\"\u0012a,\r\u0005\u0006E\u0002!\t!K\u0001'i\u0016\u001cH/\u00168j_:\u001c\u0005.\u00198hK2|wmU8ve\u000e,\u0017I\u001c3BO\u001e\u0014XmZ1uS>t\u0007FA12\u0011\u0015)\u0007\u0001\"\u0001*\u0003y!Xm\u001d;BO\u001e\u0014XmZ1uK>s7\t[1oO\u0016dwnZ*pkJ\u001cW\r\u000b\u0002ec!)\u0001\u000e\u0001C\u0001S\u0005IB/Z:u\u0015>Lgn\u00148DQ\u0006tw-\u001a7pON{WO]2fQ\t9\u0017\u0007C\u0003l\u0001\u0011\u0005\u0011&\u0001\u0017uKN$(j\\5o\u001f:\u001c\u0005.\u00198hK2|wmU8ve\u000e,w+\u001b;i\u000bZ,g\u000e^:EkBd\u0017nY1uK\"\u0012!.\r\u0005\u0006]\u0002!\t!K\u0001\u0019i\u0016\u001cHOS8j]>sgj\\+qI\u0006$XmU8ve\u000e,\u0007FA72\u0011\u0015\t\b\u0001\"\u0001*\u0003Y!Xm\u001d;K_&twJ\\+qg\u0016\u0014HoU8ve\u000e,\u0007F\u000192\u0011\u0015!\b\u0001\"\u0003v\u0003I1XM]5gs*{\u0017N\\(o'>,(oY3\u0015\u0005)2\b\"B<t\u0001\u0004A\u0018!D2iC:<W\r\\8h\u001b>$W\rE\u0002z\u0003\u0003q!A\u001f@\u0011\u0005mdS\"\u0001?\u000b\u0005u\u0014\u0012A\u0002\u001fs_>$h(\u0003\u0002\u0000Y\u00051\u0001K]3eK\u001aLA!a\u0001\u0002\u0006\t11\u000b\u001e:j]\u001eT!a \u0017\t\r\u0005%\u0001\u0001\"\u0001*\u0003}!Xm\u001d;XCR,'/\\1sW\u0006sGm\u00115b]\u001e,Gn\\4T_V\u00148-\u001a\u0015\u0004\u0003\u000f\t\u0004BBA\b\u0001\u0011\u0005\u0011&\u0001\u0014uKN$8\t[1oO\u0016dwnZ*pkJ\u001cWmV5uQ\u00163XM\u001c;t\tV\u0004H.[2bi\u0016D3!!\u00042\u0011\u0019\t)\u0002\u0001C\u0001S\u00051B/Z:u'\u000e\fgn\u00148VaN,'\u000f^*pkJ\u001cW\rK\u0002\u0002\u0014EBa!a\u0007\u0001\t\u0003I\u0013A\f;fgR,\u0006o]3siN{WO]2f/&$\bnQ8naV$X\rZ\"pYVlg.\u00118e/\u0006$XM]7be.D3!!\u00072\u0011\u0019\t\t\u0003\u0001C\u0001S\u0005)C/Z:u+B\u001cXM\u001d;T_V\u00148-Z,ji\"<\u0016\r^3s[\u0006\u00148\u000eU;tQ\u0012{wO\u001c\u0015\u0004\u0003?\t\u0004BBA\u0014\u0001\u0011\u0005\u0011&A\u0012uKN$XK\\5p]V\u00038/\u001a:u'>,(oY3B]\u0012\fum\u001a:fO\u0006$\u0018n\u001c8)\u0007\u0005\u0015\u0012\u0007\u0003\u0004\u0002.\u0001!\t!K\u0001\u001ci\u0016\u001cH/Q4he\u0016<\u0017\r^3P]V\u00038/\u001a:u'>,(oY3)\u0007\u0005-\u0012\u0007\u0003\u0004\u00024\u0001!\t!K\u0001&i\u0016\u001cH/Q4he\u0016<\u0017\r^3P]V\u00038/\u001a:u'>,(oY3Qe&l\u0017M]=LKfD3!!\r2\u0011\u0019\tI\u0004\u0001C\u0001S\u00051C/Z:u!J|7\rV5nKR+W\u000e]8sC2Tu.\u001b8P]V\u00038/\u001a:u'>,(oY3)\u0007\u0005]\u0012\u0007\u0003\u0004\u0002@\u0001!\t!K\u0001(i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>Lgn\u00148VaN,'\u000f^*pkJ\u001cW\rK\u0002\u0002>EBa!!\u0012\u0001\t\u0003I\u0013\u0001\n;fgR<\u0016N\u001c3po\u0006;wM]3hCR,wJ\\\"iC:<W\r\\8h'>,(oY3)\u0007\u0005\r\u0013\u0007\u0003\u0004\u0002L\u0001!\t!K\u0001\u001fi\u0016\u001cH/\u00138wC2LGmU8ve\u000e,7\t[1oO\u0016dwnZ'pI\u0016D3!!\u00132\u0011\u0019\t\t\u0006\u0001C\u0001S\u0005!C/Z:u\u001b&\u001c8/\u001b8h!JLW.\u0019:z\u0017\u0016Lhi\u001c:VaN,'\u000f^*pkJ\u001cW\rK\u0002\u0002PEBa!a\u0016\u0001\t\u0003I\u0013a\n;fgRl\u0015n]:j]\u001e\u0004&/[7bef\\U-\u001f$pe\u00163XM\u001c;t\tV\u0004H.[2bi\u0016D3!!\u00162\u0011\u0019\ti\u0006\u0001C\u0001S\u0005iB/Z:u\u0013:4\u0018\r\\5e'\u000e\fgn\u00148M_>\\W\u000f]*pkJ\u001cW\rK\u0002\u0002\\EBa!a\u0019\u0001\t\u0003I\u0013A\b;fgRLeN^1mS\u0012<\u0016\r^3s[\u0006\u00148nT;uaV$H+\u001f9fQ\r\t\t'\r")
public class TableScanTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testLegacyTableSourceScan() {
        this.util().addTableSource("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.TableScanTest$$anon$3 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().verifyExecPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testDataStreamScan() {
        this.util().addDataStream("DataStreamTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.TableScanTest$$anon$4 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().verifyExecPlan("SELECT * FROM DataStreamTable");
    }

    @Test
    public void testDDLTableScan() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n                    |) WITH (\n                    |  'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testDDLWithComputedColumn() {
        this.util().tableEnv().registerFunction("my_udf", (ScalarFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as a + 1,\n                     |  d as to_timestamp(b),\n                     |  e as my_udf(a)\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithMetadataColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA,\n         |  `computed` AS UPPER(`metadata_1`)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM MetadataTable");
    }

    @Test
    public void testDDLWithMetadataThatConflictsWithPhysicalColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE MetadataTable (\n                     |  `timestamp` TIMESTAMP(9),\n                     |  `metadata_timestamp` TIMESTAMP(0) METADATA FROM 'timestamp',\n                     |  `other` STRING METADATA,\n                     |  `computed_other` AS UPPER(`other`),\n                     |  `computed_timestamp` AS CAST(`metadata_timestamp` AS STRING)\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'false',\n                     |  'readable-metadata' = 'timestamp:TIMESTAMP(0), other:STRING'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM MetadataTable");
    }

    @Test
    public void testDDLWithMetadataColumnProjectionPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT `b`, `other_metadata` FROM MetadataTable");
    }

    @Test
    public void testDDLWithWatermarkComputedColumn() {
        this.util().tableEnv().registerFunction("my_udf", (ScalarFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as a + 1,\n                     |  d as to_timestamp(b),\n                     |  e as my_udf(a),\n                     |  WATERMARK FOR d AS d - INTERVAL '0.001' SECOND\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithComputedColumnReferRowtime() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  my_ts AS ts - INTERVAL '0.001' SECOND,\n                    |  proc AS PROCTIME(),\n                    |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n                    |) WITH (\n                    |  'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testDDLWithMultipleColumnsFromSameMetadataKey() {
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                                        |CREATE TABLE source (\n                                                        |  a INT METADATA,\n                                                        |  b INT METADATA FROM 'a'\n                                                        |) WITH (\n                                                        |  'connector' = 'COLLECTION'\n                                                        |)\n                                                        |")).stripMargin())).satisfies(FlinkAssertions.anyCauseMatches(ValidationException.class, (String)"The column `a` and `b` in the table are both from the same metadata key 'a'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others."));
    }

    @Test
    public void testDDLWithMultipleColumnsFromSameMetadataKey2() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE source (\n                               |  a INT METADATA\n                               |) WITH (\n                               |  'connector' = 'COLLECTION'\n                               |)\n                               |")).stripMargin());
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n            |CREATE TABLE like_source (\n            |  b INT METADATA FROM 'a'\n            |)\n            |WITH (\n            |  'connector' = 'COLLECTION'\n            |) LIKE source (\n            |  INCLUDING METADATA\n            |)\n            |")).stripMargin())).satisfies(FlinkAssertions.anyCauseMatches((String)"The column `a` and `b` in the table are both from the same metadata key 'a'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others."));
    }

    @Test
    public void testKeywordsWithWatermarkComputedColumn() {
        this.util().tableEnv().registerFunction("my_udf", (ScalarFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  `time` time,\n                     |  mytime as `time`,\n                     |  `current_time` as current_time,\n                     |  json_row ROW<`timestamp` TIMESTAMP(3)>,\n                     |  `timestamp` AS json_row.`timestamp`,\n                     |  WATERMARK FOR `timestamp` AS `timestamp`\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testScanOnBoundedSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testFilterOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b,a,ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionChangelogSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE changelog_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE append_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n                  |SELECT b, ts, a\n                  |FROM (\n                  |  SELECT * FROM changelog_src\n                  |  UNION ALL\n                  |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n                  |)\n                  |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT COUNT(*) FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testJoinOnChangelogSource() {
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnNoUpdateSource() {
        this.verifyJoinOnSource("I,D");
    }

    @Test
    public void testJoinOnUpsertSource() {
        this.verifyJoinOnSource("UA,D");
    }

    private void verifyJoinOnSource(String changelogMode) {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency_id BIGINT,\n                    |  currency_name STRING\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(397).append("\n                     |CREATE TABLE rates_history (\n                     |  currency_id BIGINT,\n                     |  currency_name STRING,\n                     |  rate BIGINT,\n                     |  PRIMARY KEY (currency_id) NOT ENFORCED\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'changelog-mode' = '").append(changelogMode).append("'\n                     |)\n      ").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency_name, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o JOIN rates_history AS r\n        |ON o.currency_id = r.currency_id AND o.currency_name = r.currency_name\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testWatermarkAndChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  id1 STRING,\n                    |  a INT,\n                    |  id2 BIGINT,\n                    |  b DOUBLE,\n                    |  PRIMARY KEY (id2, id1) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id1, a, b FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithComputedColumnAndWatermark() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithWatermarkPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'enable-watermark-push-down' = 'true',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id, ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionUpsertSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE upsert_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE append_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, ts, a\n        |FROM (\n        |  SELECT * FROM upsert_src\n        |  UNION ALL\n        |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n        |)\n        |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  c STRING,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY b", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSourcePrimaryKey() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  c STRING,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY a", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testProcTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency STRING,\n                    |  proctime AS PROCTIME()\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE rates_history (\n                    |  currency STRING PRIMARY KEY NOT ENFORCED,\n                    |  rate BIGINT\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.proctime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testEventTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency STRING,\n                    |  rowtime TIMESTAMP(3),\n                    |  WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE rates_history (\n                    |  currency STRING PRIMARY KEY NOT ENFORCED,\n                    |  rate BIGINT,\n                    |  rowtime TIMESTAMP(3),\n                    |  WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.rowtime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testWindowAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts AS PROCTIME(),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)\n        |FROM src\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidSourceChangelogMode() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,D'\n                    |)\n      ")).stripMargin());
        this.thrown().expect(ValidationException.class);
        this.thrown().expectMessage("Invalid source for table 'default_catalog.default_database.src'. A ScanTableSource doesn't support a changelog stream that contains UPDATE_BEFORE but no UPDATE_AFTER. Please adapt the implementation of class 'org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanLookupTableSource'.");
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testMissingPrimaryKeyForUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Table 'default_catalog.default_database.src' produces a changelog stream that contains UPDATE_AFTER but no UPDATE_BEFORE. This requires defining a primary key constraint on the table.");
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testMissingPrimaryKeyForEventsDuplicate() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Configuration 'table.exec.source.cdc-events-duplicate' is enabled which requires the changelog sources to define a PRIMARY KEY. However, table 'default_catalog.default_database.src' doesn't have a primary key.");
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidScanOnLookupSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(306).append("\n                     |CREATE TABLE src (\n                     |  ts TIMESTAMP(3),\n                     |  a INT,\n                     |  b DOUBLE\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'table-source-class' = '").append(TestValuesTableFactory.MockedLookupTableSource.class.getName()).append("'\n                     |)\n      ").toString())).stripMargin());
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Cannot generate a valid execution plan for the given query");
        this.util().verifyRelPlan("SELECT * FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidWatermarkOutputType() {
        this.thrown().expect(ValidationException.class);
        this.thrown().expectMessage("Watermark strategy '' must be of type TIMESTAMP or TIMESTAMP_LTZ but is of type 'CHAR(0) NOT NULL'.");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  WATERMARK FOR `ts` AS ''\n                    |) WITH (\n                    |  'connector' = 'values'\n                    |)\n      ")).stripMargin());
    }
}

