/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.stream.sql.TableScanTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005md\u0001B\u0001\u0003\u0001M\u0011Q\u0002V1cY\u0016\u001c6-\u00198UKN$(BA\u0002\u0005\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000b\u0019\taa\u001d;sK\u0006l'BA\u0004\t\u0003\u0011\u0001H.\u00198\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\u0011\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\t\u000bm\u0001A\u0011\u0001\u000f\u0002\rqJg.\u001b;?)\u0005i\u0002C\u0001\u0010\u0001\u001b\u0005\u0011\u0001b\u0002\u0011\u0001\u0005\u0004%I!I\u0001\u0005kRLG.F\u0001#!\t)2%\u0003\u0002%-\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\"1a\u0005\u0001Q\u0001\n\t\nQ!\u001e;jY\u0002BQ\u0001\u000b\u0001\u0005\u0002%\n\u0011\u0004^3ti2+w-Y2z)\u0006\u0014G.Z*pkJ\u001cWmU2b]R\t!\u0006\u0005\u0002,]5\tAFC\u0001.\u0003\u0015\u00198-\u00197b\u0013\tyCF\u0001\u0003V]&$\bFA\u00142!\t\u0011\u0014(D\u00014\u0015\t!T'A\u0002ba&T!AN\u001c\u0002\u000f),\b/\u001b;fe*\u0011\u0001\bE\u0001\u0006UVt\u0017\u000e^\u0005\u0003uM\u0012A\u0001V3ti\")A\b\u0001C\u0001S\u0005\u0011B/Z:u\t\u0006$\u0018m\u0015;sK\u0006l7kY1oQ\tY\u0014\u0007C\u0003@\u0001\u0011\u0005\u0011&\u0001\tuKN$H\t\u0012'UC\ndWmU2b]\"\u0012a(\r\u0005\u0006\u0005\u0002!\t!K\u0001\u001ai\u0016\u001cH\u000f\u0012#M/&$\bnQ8naV$X\rZ\"pYVlg\u000e\u000b\u0002Bc!)Q\t\u0001C\u0001S\u0005\u0001C/Z:u\t\u0012cu+\u001b;i%><H+\u001f9f\u0007>l\u0007/\u001e;fI\u000e{G.^7oQ\t!\u0015\u0007C\u0003I\u0001\u0011\u0005\u0011&A\ruKN$H\t\u0012'XSRDW*\u001a;bI\u0006$\u0018mQ8mk6t\u0007FA$2\u0011\u0015Y\u0005\u0001\"\u0001*\u0003I\"Xm\u001d;E\t2;\u0016\u000e\u001e5NKR\fG-\u0019;b)\"\fGoQ8oM2L7\r^:XSRD\u0007\u000b[=tS\u000e\fGnQ8mk6t\u0007F\u0001&2\u0011\u0015q\u0005\u0001\"\u0001*\u0003-\"Xm\u001d;E\t2;\u0016\u000e\u001e5NKR\fG-\u0019;b\u0007>dW/\u001c8Qe>TWm\u0019;j_:\u0004Vo\u001d5E_^t\u0007FA'2\u0011\u0015\t\u0006\u0001\"\u0001*\u0003\t\"Xm\u001d;E\t2;\u0016\u000e\u001e5XCR,'/\\1sW\u000e{W\u000e];uK\u0012\u001cu\u000e\\;n]\"\u0012\u0001+\r\u0005\u0006)\u0002!\t!K\u0001&i\u0016\u001cH\u000f\u0012#M/&$\bnQ8naV$X\rZ\"pYVlgNU3gKJ\u0014vn\u001e;j[\u0016D#aU\u0019\t\u000b]\u0003A\u0011A\u0015\u0002[Q,7\u000f\u001e#E\u0019^KG\u000f['vYRL\u0007\u000f\\3D_2,XN\\:Ge>l7+Y7f\u001b\u0016$\u0018\rZ1uC.+\u0017\u0010\u000b\u0002Wc!)!\f\u0001C\u0001S\u0005qC/Z:u\t\u0012cu+\u001b;i\u001bVdG/\u001b9mK\u000e{G.^7og\u001a\u0013x.\\*b[\u0016lU\r^1eCR\f7*Z=3Q\tI\u0016\u0007C\u0003^\u0001\u0011\u0005\u0011&A\u0014uKN$8*Z=x_J$7oV5uQ^\u000bG/\u001a:nCJ\\7i\\7qkR,GmQ8mk6t\u0007F\u0001/2\u0011\u0015\u0001\u0007\u0001\"\u0001*\u0003]!Xm\u001d;TG\u0006twJ\u001c\"pk:$W\rZ*pkJ\u001cW\r\u000b\u0002`c!)1\r\u0001C\u0001S\u0005YB/Z:u\r&dG/\u001a:P]\u000eC\u0017M\\4fY><7k\\;sG\u0016D#AY\u0019\t\u000b\u0019\u0004A\u0011A\u0015\u00023Q,7\u000f^*dC:|en\u00115b]\u001e,Gn\\4T_V\u00148-\u001a\u0015\u0003KFBQ!\u001b\u0001\u0005\u0002%\na\u0005^3tiVs\u0017n\u001c8DQ\u0006tw-\u001a7pON{WO]2f\u0003:$\u0017iZ4sK\u001e\fG/[8oQ\tA\u0017\u0007C\u0003m\u0001\u0011\u0005\u0011&\u0001\u0010uKN$\u0018iZ4sK\u001e\fG/Z(o\u0007\"\fgnZ3m_\u001e\u001cv.\u001e:dK\"\u00121.\r\u0005\u0006_\u0002!\t!K\u0001\u001ai\u0016\u001cHOS8j]>s7\t[1oO\u0016dwnZ*pkJ\u001cW\r\u000b\u0002oc!)!\u000f\u0001C\u0001S\u0005aC/Z:u\u0015>Lgn\u00148DQ\u0006tw-\u001a7pON{WO]2f/&$\b.\u0012<f]R\u001cH)\u001e9mS\u000e\fG/\u001a\u0015\u0003cFBQ!\u001e\u0001\u0005\u0002%\n\u0001\u0004^3ti*{\u0017N\\(o\u001d>,\u0006\u000fZ1uKN{WO]2fQ\t!\u0018\u0007C\u0003y\u0001\u0011\u0005\u0011&\u0001\fuKN$(j\\5o\u001f:,\u0006o]3siN{WO]2fQ\t9\u0018\u0007C\u0003|\u0001\u0011%A0\u0001\nwKJLg-\u001f&pS:|enU8ve\u000e,GC\u0001\u0016~\u0011\u0015q(\u00101\u0001\u0000\u00035\u0019\u0007.\u00198hK2|w-T8eKB!\u0011\u0011AA\b\u001d\u0011\t\u0019!a\u0003\u0011\u0007\u0005\u0015A&\u0004\u0002\u0002\b)\u0019\u0011\u0011\u0002\n\u0002\rq\u0012xn\u001c;?\u0013\r\ti\u0001L\u0001\u0007!J,G-\u001a4\n\t\u0005E\u00111\u0003\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u00055A\u0006\u0003\u0004\u0002\u0018\u0001!\t!K\u0001 i\u0016\u001cHoV1uKJl\u0017M]6B]\u0012\u001c\u0005.\u00198hK2|wmU8ve\u000e,\u0007fAA\u000bc!1\u0011Q\u0004\u0001\u0005\u0002%\na\u0005^3ti\u000eC\u0017M\\4fY><7k\\;sG\u0016<\u0016\u000e\u001e5Fm\u0016tGo\u001d#va2L7-\u0019;fQ\r\tY\"\r\u0005\u0007\u0003G\u0001A\u0011A\u0015\u0002-Q,7\u000f^*dC:|e.\u00169tKJ$8k\\;sG\u0016D3!!\t2\u0011\u0019\tI\u0003\u0001C\u0001S\u0005qC/Z:u+B\u001cXM\u001d;T_V\u00148-Z,ji\"\u001cu.\u001c9vi\u0016$7i\u001c7v[:\fe\u000eZ,bi\u0016\u0014X.\u0019:lQ\r\t9#\r\u0005\u0007\u0003_\u0001A\u0011A\u0015\u0002KQ,7\u000f^+qg\u0016\u0014HoU8ve\u000e,w+\u001b;i/\u0006$XM]7be.\u0004Vo\u001d5E_^t\u0007fAA\u0017c!1\u0011Q\u0007\u0001\u0005\u0002%\n1\u0005^3tiVs\u0017n\u001c8VaN,'\u000f^*pkJ\u001cW-\u00118e\u0003\u001e<'/Z4bi&|g\u000eK\u0002\u00024EBa!a\u000f\u0001\t\u0003I\u0013a\u0007;fgR\fum\u001a:fO\u0006$Xm\u00148VaN,'\u000f^*pkJ\u001cW\rK\u0002\u0002:EBa!!\u0011\u0001\t\u0003I\u0013!\n;fgR\fum\u001a:fO\u0006$Xm\u00148VaN,'\u000f^*pkJ\u001cW\r\u0015:j[\u0006\u0014\u0018pS3zQ\r\ty$\r\u0005\u0007\u0003\u000f\u0002A\u0011A\u0015\u0002MQ,7\u000f\u001e)s_\u000e$\u0016.\\3UK6\u0004xN]1m\u0015>Lgn\u00148VaN,'\u000f^*pkJ\u001cW\rK\u0002\u0002FEBa!!\u0014\u0001\t\u0003I\u0013a\n;fgR,e/\u001a8u)&lW\rV3na>\u0014\u0018\r\u001c&pS:|e.\u00169tKJ$8k\\;sG\u0016D3!a\u00132\u0011\u0019\t\u0019\u0006\u0001C\u0001S\u0005!C/Z:u/&tGm\\<BO\u001e\u0014XmZ1uK>s7\t[1oO\u0016dwnZ*pkJ\u001cW\rK\u0002\u0002REBa!!\u0017\u0001\t\u0003I\u0013A\b;fgRLeN^1mS\u0012\u001cv.\u001e:dK\u000eC\u0017M\\4fY><Wj\u001c3fQ\r\t9&\r\u0005\u0007\u0003?\u0002A\u0011A\u0015\u0002IQ,7\u000f^'jgNLgn\u001a)sS6\f'/_&fs\u001a{'/\u00169tKJ$8k\\;sG\u0016D3!!\u00182\u0011\u0019\t)\u0007\u0001C\u0001S\u00059C/Z:u\u001b&\u001c8/\u001b8h!JLW.\u0019:z\u0017\u0016Lhi\u001c:Fm\u0016tGo\u001d#va2L7-\u0019;fQ\r\t\u0019'\r\u0005\u0007\u0003W\u0002A\u0011A\u0015\u0002;Q,7\u000f^%om\u0006d\u0017\u000eZ*dC:|e\u000eT8pWV\u00048k\\;sG\u0016D3!!\u001b2\u0011\u0019\t\t\b\u0001C\u0001S\u0005qB/Z:u\u0013:4\u0018\r\\5e/\u0006$XM]7be.|U\u000f\u001e9viRK\b/\u001a\u0015\u0004\u0003_\n\u0004BBA<\u0001\u0011\u0005\u0011&A\u000euKN$8+\u001a;QCJ\fG\u000e\\3mSNlgi\u001c:T_V\u00148-\u001a\u0015\u0004\u0003k\n\u0004")
public class TableScanTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testLegacyTableSourceScan() {
        this.util().addTableSource("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.TableScanTest$$anon$3 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().verifyExecPlan("SELECT * FROM MyTable");
    }

    @Test
    public void testDataStreamScan() {
        this.util().addDataStream("DataStreamTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.TableScanTest$$anon$4 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().verifyExecPlan("SELECT * FROM DataStreamTable");
    }

    @Test
    public void testDDLTableScan() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n                    |) WITH (\n                    |  'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testDDLWithComputedColumn() {
        this.util().addTemporarySystemFunction("my_udf", (UserDefinedFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as a + 1,\n                     |  d as to_timestamp(b),\n                     |  e as my_udf(a)\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithRowTypeComputedColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as row(a, b)\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithMetadataColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA,\n         |  `computed` AS UPPER(`metadata_1`)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM MetadataTable");
    }

    @Test
    public void testDDLWithMetadataThatConflictsWithPhysicalColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE MetadataTable (\n                     |  `timestamp` TIMESTAMP(9),\n                     |  `metadata_timestamp` TIMESTAMP(0) METADATA FROM 'timestamp',\n                     |  `other` STRING METADATA,\n                     |  `computed_other` AS UPPER(`other`),\n                     |  `computed_timestamp` AS CAST(`metadata_timestamp` AS STRING)\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'false',\n                     |  'readable-metadata' = 'timestamp:TIMESTAMP(0), other:STRING'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM MetadataTable");
    }

    @Test
    public void testDDLWithMetadataColumnProjectionPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT `b`, `other_metadata` FROM MetadataTable");
    }

    @Test
    public void testDDLWithWatermarkComputedColumn() {
        this.util().addTemporarySystemFunction("my_udf", (UserDefinedFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as a + 1,\n                     |  d as to_timestamp(b),\n                     |  e as my_udf(a),\n                     |  WATERMARK FOR d AS d - INTERVAL '0.001' SECOND\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithComputedColumnReferRowtime() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  my_ts AS ts - INTERVAL '0.001' SECOND,\n                    |  proc AS PROCTIME(),\n                    |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n                    |) WITH (\n                    |  'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testDDLWithMultipleColumnsFromSameMetadataKey() {
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                                        |CREATE TABLE source (\n                                                        |  a INT METADATA,\n                                                        |  b INT METADATA FROM 'a'\n                                                        |) WITH (\n                                                        |  'connector' = 'COLLECTION'\n                                                        |)\n                                                        |")).stripMargin())).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, (String)"The column `a` and `b` in the table are both from the same metadata key 'a'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.")});
    }

    @Test
    public void testDDLWithMultipleColumnsFromSameMetadataKey2() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE source (\n                               |  a INT METADATA\n                               |) WITH (\n                               |  'connector' = 'COLLECTION'\n                               |)\n                               |")).stripMargin());
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n            |CREATE TABLE like_source (\n            |  b INT METADATA FROM 'a'\n            |)\n            |WITH (\n            |  'connector' = 'COLLECTION'\n            |) LIKE source (\n            |  INCLUDING METADATA\n            |)\n            |")).stripMargin())).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)"The column `a` and `b` in the table are both from the same metadata key 'a'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.")});
    }

    @Test
    public void testKeywordsWithWatermarkComputedColumn() {
        this.util().addTemporarySystemFunction("my_udf", (UserDefinedFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  `time` time,\n                     |  mytime as `time`,\n                     |  `current_time` as current_time,\n                     |  json_row ROW<`timestamp` TIMESTAMP(3)>,\n                     |  `timestamp` AS json_row.`timestamp`,\n                     |  WATERMARK FOR `timestamp` AS `timestamp`\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testScanOnBoundedSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testFilterOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b,a,ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionChangelogSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE changelog_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE append_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n                  |SELECT b, ts, a\n                  |FROM (\n                  |  SELECT * FROM changelog_src\n                  |  UNION ALL\n                  |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n                  |)\n                  |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT COUNT(*) FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testJoinOnChangelogSource() {
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnNoUpdateSource() {
        this.verifyJoinOnSource("I,D");
    }

    @Test
    public void testJoinOnUpsertSource() {
        this.verifyJoinOnSource("UA,D");
    }

    private void verifyJoinOnSource(String changelogMode) {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency_id BIGINT,\n                    |  currency_name STRING\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(397).append("\n                     |CREATE TABLE rates_history (\n                     |  currency_id BIGINT,\n                     |  currency_name STRING,\n                     |  rate BIGINT,\n                     |  PRIMARY KEY (currency_id) NOT ENFORCED\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'changelog-mode' = '").append(changelogMode).append("'\n                     |)\n      ").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency_name, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o JOIN rates_history AS r\n        |ON o.currency_id = r.currency_id AND o.currency_name = r.currency_name\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testWatermarkAndChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  id1 STRING,\n                    |  a INT,\n                    |  id2 BIGINT,\n                    |  b DOUBLE,\n                    |  PRIMARY KEY (id2, id1) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id1, a, b FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithComputedColumnAndWatermark() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithWatermarkPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'enable-watermark-push-down' = 'true',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id, ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionUpsertSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE upsert_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE append_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, ts, a\n        |FROM (\n        |  SELECT * FROM upsert_src\n        |  UNION ALL\n        |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n        |)\n        |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  c STRING,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY b", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSourcePrimaryKey() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  c STRING,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY a", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testProcTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency STRING,\n                    |  proctime AS PROCTIME()\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE rates_history (\n                    |  currency STRING PRIMARY KEY NOT ENFORCED,\n                    |  rate BIGINT\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.proctime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testEventTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency STRING,\n                    |  rowtime TIMESTAMP(3),\n                    |  WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE rates_history (\n                    |  currency STRING PRIMARY KEY NOT ENFORCED,\n                    |  rate BIGINT,\n                    |  rowtime TIMESTAMP(3),\n                    |  WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.rowtime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testWindowAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts AS PROCTIME(),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)\n        |FROM src\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidSourceChangelogMode() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,D'\n                    |)\n      ")).stripMargin());
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Invalid source for table 'default_catalog.default_database.src'. A ScanTableSource doesn't support a changelog stream that contains UPDATE_BEFORE but no UPDATE_AFTER. Please adapt the implementation of class 'org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanLookupTableSource'.") instanceof ValidationException;
    }

    @Test
    public void testMissingPrimaryKeyForUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Table 'default_catalog.default_database.src' produces a changelog stream that contains UPDATE_AFTER but no UPDATE_BEFORE. This requires defining a primary key constraint on the table.") instanceof TableException;
    }

    @Test
    public void testMissingPrimaryKeyForEventsDuplicate() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Configuration 'table.exec.source.cdc-events-duplicate' is enabled which requires the changelog sources to define a PRIMARY KEY. However, table 'default_catalog.default_database.src' doesn't have a primary key.") instanceof TableException;
    }

    @Test
    public void testInvalidScanOnLookupSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(306).append("\n                     |CREATE TABLE src (\n                     |  ts TIMESTAMP(3),\n                     |  a INT,\n                     |  b DOUBLE\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'table-source-class' = '").append(TestValuesTableFactory.MockedLookupTableSource.class.getName()).append("'\n                     |)\n      ").toString())).stripMargin());
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Cannot generate a valid execution plan for the given query") instanceof ValidationException;
    }

    @Test
    public void testInvalidWatermarkOutputType() {
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                                             |CREATE TABLE src (\n                                             |  ts TIMESTAMP(3),\n                                             |  a INT,\n                                             |  b DOUBLE,\n                                             |  WATERMARK FOR `ts` AS ''\n                                             |) WITH (\n                                             |  'connector' = 'values'\n                                             |)\n      ")).stripMargin())).hasMessageContaining("Invalid data type of expression for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the watermark expression type is CHAR(0) NOT NULL") instanceof ValidationException;
    }

    @Test
    public void testSetParallelismForSource() {
        TableConfig config = TableConfig.getDefault();
        config.set(ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        StreamTableTestUtil util = this.streamTestUtil(config);
        util.addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE changelog_src (\n                    |  id INT,\n                    |  a STRING,\n                    |  PRIMARY KEY (id) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'true',\n                    |  'runtime-source' = 'DataStream',\n                    |  'scan.parallelism' = '5',\n                    |  'enable-projection-push-down' = 'false',\n                    |  'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        util.addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id INT,\n                    |  b STRING,\n                    |  c INT\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'true',\n                    |  'runtime-source' = 'DataStream',\n                    |  'scan.parallelism' = '3',\n                    |  'enable-projection-push-down' = 'false'\n                    |)\n      ")).stripMargin());
        util.verifyTransformation("SELECT * FROM src LEFT JOIN changelog_src on src.id = changelog_src.id WHERE src.c > 1");
    }
}

