/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.stream.sql.TableSinkTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u001d4A!\u0001\u0002\u0001'\tiA+\u00192mKNKgn\u001b+fgRT!a\u0001\u0003\u0002\u0007M\fHN\u0003\u0002\u0006\r\u000511\u000f\u001e:fC6T!a\u0002\u0005\u0002\tAd\u0017M\u001c\u0006\u0003\u0013)\tq\u0001\u001d7b]:,'O\u0003\u0002\f\u0019\u0005)A/\u00192mK*\u0011QBD\u0001\u0006M2Lgn\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001!\u0002CA\u000b\u0019\u001b\u00051\"BA\f\t\u0003\u0015)H/\u001b7t\u0013\tIbCA\u0007UC\ndW\rV3ti\n\u000b7/\u001a\u0005\u00067\u0001!\t\u0001H\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003u\u0001\"A\b\u0001\u000e\u0003\tAq\u0001\t\u0001C\u0002\u0013%\u0011%\u0001\u0003vi&dW#\u0001\u0012\u0011\u0005U\u0019\u0013B\u0001\u0013\u0017\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0011\u00191\u0003\u0001)A\u0005E\u0005)Q\u000f^5mA!)\u0001\u0006\u0001C\u0001S\u0005\u0011C/Z:u\u0013:\u001cXM\u001d;NSNl\u0017\r^2i)f\u0004XMR8s\u000b6\u0004H/_\"iCJ$\u0012A\u000b\t\u0003W9j\u0011\u0001\f\u0006\u0002[\u0005)1oY1mC&\u0011q\u0006\f\u0002\u0005+:LG\u000f\u000b\u0002(cA\u0011!'N\u0007\u0002g)\u0011A\u0007E\u0001\u0006UVt\u0017\u000e^\u0005\u0003mM\u0012A\u0001V3ti\")\u0001\b\u0001C\u0001S\u0005QB/Z:u\u000bb\u001cW\r\u001d;j_:4uN]!qa\u0016tGmU5oW\"\u0012q'\r\u0005\u0006w\u0001!\t!K\u0001\u001ei\u0016\u001cH/\u0012=dKB$\u0018n\u001c8G_J|e/\u001a:BO\u001e\u0014XmZ1uK\"\u0012!(\r\u0005\u0006}\u0001!\t!K\u0001\u000fi\u0016\u001cH/\u00119qK:$7+\u001b8lQ\ti\u0014\u0007C\u0003B\u0001\u0011\u0005\u0011&\u0001\tuKN$(+\u001a;sC\u000e$8+\u001b8lc!\u0012\u0001)\r\u0005\u0006\t\u0002!\t!K\u0001\u0011i\u0016\u001cHOU3ue\u0006\u001cGoU5oWJB#aQ\u0019\t\u000b\u001d\u0003A\u0011A\u0015\u0002\u001dQ,7\u000f^+qg\u0016\u0014HoU5oW\"\u0012a)\r\u0005\u0006\u0015\u0002!\t!K\u0001\u0019i\u0016\u001cH/\u00169tKJ$8+\u001b8l/&$\bNR5mi\u0016\u0014\bFA%2\u0011\u0015i\u0005\u0001\"\u0001*\u0003a!Xm\u001d;SKR\u0014\u0018m\u0019;B]\u0012,\u0006o]3siNKgn\u001b\u0015\u0003\u0019FBQ\u0001\u0015\u0001\u0005\u0002%\na\u0004^3ti\u0006\u0003\b/\u001a8e+B\u001cXM\u001d;B]\u0012\u0014V\r\u001e:bGR\u001c\u0016N\\6)\u0005=\u000b\u0004\"B*\u0001\t\u0003I\u0013\u0001\f;fgR,\u0005pY3qi&|gNR8s/JLG/\u001b8h-&\u0014H/^1m\u001b\u0016$\u0018\rZ1uC\u000e{G.^7oQ\t\u0011\u0016\u0007C\u0003W\u0001\u0011\u0005\u0011&\u0001\u0017uKN$X\t_2faRLwN\u001c$pe^\u0013\u0018\u000e^5oO&sg/\u00197jI6+G/\u00193bi\u0006\u001cu\u000e\\;n]\"\u0012Q+\r\u0005\u00063\u0002!\t!K\u0001\u0013i\u0016\u001cH/T3uC\u0012\fG/Y\"pYVlg\u000e\u000b\u0002Yc!)A\f\u0001C\u0001S\u0005\tC/Z:u'&t7\u000eR5t_J$WM]\"iC:<W\rT8h/&$\bNS8j]\"\u00121,\r\u0005\u0006?\u0002!\t!K\u0001\"i\u0016\u001cHoU5oW\u0012K7o\u001c:eKJ\u001c\u0005.\u00198hK2{wmV5uQJ\u000bgn\u001b\u0015\u0003=FBQA\u0019\u0001\u0005\u0002%\nA\u0003^3ti&s7/\u001a:u!\u0006\u0014HoQ8mk6t\u0007FA12\u0011\u0015)\u0007\u0001\"\u0001*\u0003\u0015\"Xm\u001d;J]N,'\u000f^,ji\"$\u0016M]4fi\u000e{G.^7og\u0006sGmU9m\u0011&tG\u000f\u000b\u0002ec\u0001")
public class TableSinkTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testInsertMismatchTypeForEmptyChar() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE my_sink (\n         |  name STRING,\n         |  email STRING,\n         |  message_offset BIGINT\n         |) WITH (\n         |  'connector' = 'values'\n         |)\n         |")).stripMargin());
        this.thrown().expect(ValidationException.class);
        this.thrown().expectMessage("Query schema: [a: INT, EXPR$1: CHAR(0) NOT NULL, EXPR$2: CHAR(0) NOT NULL]\nSink schema:  [name: STRING, email: STRING, message_offset: BIGINT]");
        this.util().verifyExecPlanInsert("INSERT INTO my_sink SELECT a, '', '' FROM MyTable");
    }

    @Test
    public void testExceptionForAppendSink() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE appendSink (\n         |  `a` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO appendSink SELECT COUNT(*) AS cnt FROM MyTable GROUP BY a");
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Table sink 'default_catalog.default_database.appendSink' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt])");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testExceptionForOverAggregate() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE retractSink1 (\n         |  `cnt` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE retractSink2 (\n         |  `cnt` BIGINT,\n         |  `total` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        Table table = this.util().tableEnv().sqlQuery("SELECT COUNT(*) AS cnt FROM MyTable GROUP BY a");
        this.util().tableEnv().createTemporaryView("TempTable", table);
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO retractSink1 SELECT * FROM TempTable");
        stmtSet.addInsertSql("INSERT INTO retractSink2 SELECT cnt, SUM(cnt) OVER (ORDER BY PROCTIME()) FROM TempTable");
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("OverAggregate doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt])");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAppendSink() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE appendSink (\n         |  `a` BIGINT,\n         |  `b` STRING\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO appendSink SELECT a + b, c FROM MyTable");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRetractSink1() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE retractSink (\n         |  `a` INT,\n         |  `cnt` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO retractSink SELECT a, COUNT(*) AS cnt FROM MyTable GROUP BY a");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRetractSink2() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE retractSink (\n         |  `cnt` BIGINT,\n         |  `a` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        String dml = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO retractSink\n        |SELECT cnt, COUNT(a) AS a FROM (\n        |    SELECT a, COUNT(*) AS cnt FROM MyTable GROUP BY a) t\n        |GROUP BY cnt\n      ")).stripMargin();
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(dml);
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSink() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE upsertSink (\n         |  `a` INT,\n         |  `cnt` BIGINT,\n         |  PRIMARY KEY (a) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO upsertSink SELECT a, COUNT(*) AS cnt FROM MyTable GROUP BY a");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSinkWithFilter() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE upsertSink (\n         |  `a` INT,\n         |  `cnt` BIGINT,\n         |  PRIMARY KEY (a) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO upsertSink\n        |SELECT *\n        |FROM (SELECT a, COUNT(*) AS cnt FROM MyTable GROUP BY a)\n        |WHERE cnt < 10\n        |")).stripMargin();
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(sql);
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRetractAndUpsertSink() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE retractSink (\n         |  `b` BIGINT,\n         |  `cnt` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE upsertSink (\n         |  `b` BIGINT,\n         |  `cnt` BIGINT,\n         |  PRIMARY KEY (b) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        Table table = this.util().tableEnv().sqlQuery("SELECT b, COUNT(a) AS cnt FROM MyTable GROUP BY b");
        this.util().tableEnv().createTemporaryView("TempTable", table);
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO retractSink SELECT b, cnt FROM TempTable WHERE b < 4");
        stmtSet.addInsertSql("INSERT INTO upsertSink SELECT b, cnt FROM TempTable WHERE b >= 4 AND b < 6");
        stmtSet.addInsertSql("INSERT INTO upsertSink SELECT cnt, COUNT(b) AS frequency FROM TempTable WHERE b < 4 GROUP BY cnt");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAppendUpsertAndRetractSink() {
        this.util().addDataStream("MyTable2", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "d")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "e")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "f"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.TableSinkTest$$anon$5 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().addDataStream("MyTable3", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "i")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "j")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "k"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$6 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.stream.sql.TableSinkTest$$anon$6 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE appendSink (\n         |  `a` INT,\n         |  `b` BIGINT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |")).stripMargin());
        Table table = this.util().tableEnv().sqlQuery("SELECT a, b FROM MyTable UNION ALL SELECT d, e FROM MyTable2");
        this.util().tableEnv().createTemporaryView("TempTable", table);
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO appendSink SELECT * FROM TempTable");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE retractSink (\n         |  `total_sum` INT\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        Table table1 = this.util().tableEnv().sqlQuery("SELECT a, b FROM TempTable UNION ALL SELECT i, j FROM MyTable3");
        this.util().tableEnv().createTemporaryView("TempTable1", table1);
        stmtSet.addInsertSql("INSERT INTO retractSink SELECT SUM(a) AS total_sum FROM TempTable1");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE upsertSink (\n         |  `a` INT,\n         |  `total_min` BIGINT,\n         |  PRIMARY KEY (a) NOT ENFORCED\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        stmtSet.addInsertSql("INSERT INTO upsertSink SELECT a, MIN(b) AS total_min FROM TempTable1 GROUP BY a");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testExceptionForWritingVirtualMetadataColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `m_3` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `m_2` INT METADATA FROM 'metadata_2',\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'\n         |)\n       ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO MetadataTable\n        |SELECT *\n        |FROM MetadataTable\n        |")).stripMargin();
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(sql);
        this.thrown().expect(ValidationException.class);
        this.thrown().expectMessage("Query schema: [a: INT, m_3: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]\nSink schema:  [a: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]");
        this.util().verifyRelPlan(stmtSet);
    }

    @Test
    public void testExceptionForWritingInvalidMetadataColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `metadata_1` TIMESTAMP(3) METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'writable-metadata' = 'metadata_1:BOOLEAN'\n         |)\n       ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO MetadataTable\n        |SELECT TIMESTAMP '1990-10-14 06:00:00.000'\n        |")).stripMargin();
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(sql);
        this.thrown().expect(ValidationException.class);
        this.thrown().expectMessage("Invalid data type for metadata column 'metadata_1' of table 'default_catalog.default_database.MetadataTable'. The column cannot be declared as 'TIMESTAMP(3)' because the type must be castable to metadata type 'BOOLEAN'.");
        this.util().verifyRelPlan(stmtSet);
    }

    @Test
    public void testMetadataColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `m_3` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `m_2` INT METADATA FROM 'metadata_2',\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'\n         |)\n       ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO MetadataTable\n        |SELECT `a`, `m_2`, `b`, `c`, `metadata_1`\n        |FROM MetadataTable\n        |")).stripMargin();
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(sql);
        this.util().verifyRelPlan(stmtSet);
    }

    @Test
    public void testSinkDisorderChangeLogWithJoin() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE SinkJoinChangeLog (\n        |  person STRING, votes BIGINT, prize DOUBLE,\n        |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n        |  'connector' = 'values',\n        |  'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO SinkJoinChangeLog\n        |SELECT T.person, T.sum_votes, award.prize FROM\n        |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award\n        |   WHERE T.sum_votes = award.votes\n        |")).stripMargin());
    }

    @Test
    public void testSinkDisorderChangeLogWithRank() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE SinkRankChangeLog (\n        |  person STRING, votes BIGINT,\n        |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n        |  'connector' = 'values',\n        |  'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        this.util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO SinkRankChangeLog\n        |SELECT person, sum_votes FROM\n        | (SELECT person, sum_votes,\n        |   ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS rank_number\n        |   FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS vote_section FROM src\n        |      GROUP BY person))\n        |   WHERE rank_number < 10\n        |")).stripMargin());
    }

    @Test
    public void testInsertPartColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE zm_test (\n         |  `a` BIGINT,\n         |  `m1` MAP<STRING, BIGINT>,\n         |  `m2` MAP<STRING NOT NULL, BIGINT>,\n         |  `m3` MAP<STRING, BIGINT NOT NULL>,\n         |  `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'true'\n         |)\n         |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO zm_test(`a`) SELECT `a` FROM MyTable");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInsertWithTargetColumnsAndSqlHint() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE appendSink (\n                     |  `a` BIGINT,\n                     |  `b` STRING\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'sink-insert-only' = 'false'\n                     |)\n                     |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO appendSink /*+ OPTIONS('sink.parallelism' = '1') */(a, b) SELECT a + b, c FROM MyTable");
        this.util().verifyRelPlan(stmtSet, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    public TableSinkTest() {
        this.util().addDataStream("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.TableSinkTest$$anon$4 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n      |CREATE TABLE src (person String, votes BIGINT) WITH(\n      |  'connector' = 'values'\n      |)\n      |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n      |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT ENFORCED) WITH(\n      |  'connector' = 'values'\n      |)\n      |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n      |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT ENFORCED) WITH(\n      |  'connector' = 'values'\n      |)\n      |")).stripMargin());
    }
}

