/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.table.api.bridge.scala.StreamStatementSet;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001y4A!\u0001\u0002\u0001'\tyA+\u00192mKNKgn[%U\u0007\u0006\u001cXM\u0003\u0002\u0004\t\u0005\u00191/\u001d7\u000b\u0005\u00151\u0011AB:ue\u0016\fWN\u0003\u0002\b\u0011\u00059!/\u001e8uS6,'BA\u0005\u000b\u0003\u001d\u0001H.\u00198oKJT!a\u0003\u0007\u0002\u000bQ\f'\r\\3\u000b\u00055q\u0011!\u00024mS:\\'BA\b\u0011\u0003\u0019\t\u0007/Y2iK*\t\u0011#A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001)A\u0011Q\u0003G\u0007\u0002-)\u0011qCB\u0001\u0006kRLGn]\u0005\u00033Y\u0011!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016D\u0001b\u0007\u0001\u0003\u0002\u0003\u0006I\u0001H\u0001\u0005[>$W\r\u0005\u0002\u001ec9\u0011ad\f\b\u0003?9r!\u0001I\u0017\u000f\u0005\u0005bcB\u0001\u0012,\u001d\t\u0019#F\u0004\u0002%S9\u0011Q\u0005K\u0007\u0002M)\u0011qEE\u0001\u0007yI|w\u000e\u001e \n\u0003EI!a\u0004\t\n\u00055q\u0011BA\u0006\r\u0013\tI!\"\u0003\u0002\b\u0011%\u0011qCB\u0005\u0003aY\t!d\u0015;sK\u0006l\u0017N\\4XSRD7\u000b^1uKR+7\u000f\u001e\"bg\u0016L!AM\u001a\u0003!M#\u0018\r^3CC\u000e\\WM\u001c3N_\u0012,'B\u0001\u0019\u0017\u0011\u0015)\u0004\u0001\"\u00017\u0003\u0019a\u0014N\\5u}Q\u0011q'\u000f\t\u0003q\u0001i\u0011A\u0001\u0005\u00067Q\u0002\r\u0001\b\u0005\u0006w\u0001!\t\u0005P\u0001\u0007E\u00164wN]3\u0015\u0003u\u0002\"AP!\u000e\u0003}R\u0011\u0001Q\u0001\u0006g\u000e\fG.Y\u0005\u0003\u0005~\u0012A!\u00168ji\"\u0012!\b\u0012\t\u0003\u000b2k\u0011A\u0012\u0006\u0003\u000f\"\u000b1!\u00199j\u0015\tI%*A\u0004kkBLG/\u001a:\u000b\u0005-\u0003\u0012!\u00026v]&$\u0018BA'G\u0005)\u0011UMZ8sK\u0016\u000b7\r\u001b\u0005\u0006\u001f\u0002!\t\u0001P\u0001\u001ai\u0016\u001cHOS8j]\u0012K7o\u001c:eKJ\u001c\u0005.\u00198hK2{w\r\u000b\u0002O#B\u0011QIU\u0005\u0003'\u001a\u0013A\u0002V3tiR+W\u000e\u001d7bi\u0016DQ!\u0016\u0001\u0005\u0002q\n\u0011\u0004^3tiNKgn\u001b#jg>\u0014H-\u001a:DQ\u0006tw-\u001a'pO\"\u0012A+\u0015\u0005\u00061\u0002!\t\u0001P\u0001\"i\u0016\u001cHoU5oW\u0012K7o\u001c:eKJ\u001c\u0005.\u00198hK2{wmV5uQJ\u000bgn\u001b\u0015\u0003/FCQa\u0017\u0001\u0005\u0002q\na\b^3ti\u000eC\u0017M\\4fY><7k\\;sG\u0016<\u0016\u000e\u001e5O_:$U\r^3s[&t\u0017n\u001d;jG\u001a+hnY*j].<\u0016\u000e\u001e5ES\u001a4WM]3oiB[\u0007F\u0001.R\u0011\u0015q\u0006\u0001\"\u0001=\u0003Q!Xm\u001d;J]N,'\u000f\u001e)beR\u001cu\u000e\\;n]\"\u0012Q,\u0015\u0005\u0006C\u0002!\t\u0001P\u0001\u0018i\u0016\u001cHo\u0011:fCR,G+\u00192mK\u0006\u001b8+\u001a7fGRD#\u0001Y)\t\u000b\u0011\u0004A\u0011\u0001\u001f\u0002KQ,7\u000f^\"sK\u0006$X\rV1cY\u0016\f5oU3mK\u000e$x+\u001b;i_V$x\n\u001d;j_:\u001c\bFA2R\u0011\u00159\u0007\u0001\"\u0001=\u0003E!Xm\u001d;QCJ$\u0018.\u00197J]N,'\u000f\u001e\u0015\u0003MFCC\u0001\u00016qcB\u00111N\\\u0007\u0002Y*\u0011QNR\u0001\nKb$XM\\:j_:L!a\u001c7\u0003\u0015\u0015CH/\u001a8e/&$\b.A\u0003wC2,X\rL\u0001sG\u0005\u0019\bC\u0001;}\u001b\u0005)(B\u0001<x\u00035\u0001\u0018M]1nKR,'/\u001b>fI*\u0011\u00010_\u0001\u000bKb$XM\\:j_:\u001c(BA&{\u0015\tYH\"A\u0005uKN$X\u000f^5mg&\u0011Q0\u001e\u0002\u001b!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$X\t\u001f;f]NLwN\u001c")
public class TableSinkITCase
extends StreamingWithStateTestBase {
    @Override
    @BeforeEach
    public void before() {
        super.before();
        String srcDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToLong((long)1L)}))}))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(219).append("\n                       |CREATE TABLE src (person String, votes BIGINT) WITH(\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(srcDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String awardDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToDouble((double)5.2)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToDouble((double)12.1)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToDouble((double)18.3)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)4L), BoxesRunTime.boxToDouble((double)22.5)}))}))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(183).append("\n         |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT ENFORCED) WITH(\n         |  'connector' = 'values',\n         |  'data-id' = '").append(awardDataId).append("'\n         |)\n         |").toString())).stripMargin());
        String peopleDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"jason", BoxesRunTime.boxToInteger((int)22)}))}))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(181).append("\n         |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT ENFORCED) WITH(\n         |  'connector' = 'values',\n         |  'data-id' = '").append(peopleDataId).append("'\n         |)\n         |").toString())).stripMargin());
        String userDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.userChangelog());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(511).append("\n                       |CREATE TABLE users (\n                       |  user_id STRING,\n                       |  user_name STRING,\n                       |  email STRING,\n                       |  balance DECIMAL(18,2),\n                       |  primary key (user_id) not enforced\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(userDataId).append("',\n                       | 'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
    }

    @TestTemplate
    public void testJoinDisorderChangeLog() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE JoinDisorderChangeLog (\n                      |  person STRING, votes BIGINT, prize DOUBLE, age INT,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO JoinDisorderChangeLog\n                    |SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM\n                    | (SELECT T.person, T.sum_votes, award.prize FROM\n                    |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T,\n                    |   award\n                    |   WHERE T.sum_votes = award.votes) T1, people T2\n                    | WHERE T1.person = T2.person\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("JoinDisorderChangeLog");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 4, 22.5, 22]", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testSinkDisorderChangeLog() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE SinkDisorderChangeLog (\n                      |  person STRING, votes BIGINT, prize DOUBLE,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |INSERT INTO SinkDisorderChangeLog\n                    |SELECT T.person, T.sum_votes, award.prize FROM\n                    |   (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, award\n                    |   WHERE T.sum_votes = award.votes\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("SinkDisorderChangeLog");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 4, 22.5]", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testSinkDisorderChangeLogWithRank() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE SinkRankChangeLog (\n                      |  person STRING, votes BIGINT,\n                      |  PRIMARY KEY(person) NOT ENFORCED) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n          |INSERT INTO SinkRankChangeLog\n          |SELECT person, sum_votes FROM\n          | (SELECT person, sum_votes,\n          |   ROW_NUMBER() OVER (PARTITION BY vote_section ORDER BY sum_votes DESC) AS rank_number\n          |   FROM (SELECT person, SUM(votes) AS sum_votes, SUM(votes) / 2 AS vote_section FROM src\n          |      GROUP BY person))\n          |   WHERE rank_number < 10\n          |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("SinkRankChangeLog");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 4]", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testChangelogSourceWithNonDeterministicFuncSinkWithDifferentPk() {
        this.tEnv().createTemporaryFunction("ndFunc", (UserDefinedFunction)new TestNonDeterministicUdf());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE sink_with_pk (\n                      |  user_id STRING,\n                      |  user_name STRING,\n                      |  email STRING,\n                      |  balance DECIMAL(18,2),\n                      |  PRIMARY KEY(email) NOT ENFORCED\n                      |) WITH(\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                     |insert into sink_with_pk\n                     |select user_id, SPLIT_INDEX(ndFunc(user_name), '-', 0), email, balance\n                     |from users\n                     |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("sink_with_pk");
        .colon.colon expected = new .colon.colon((Object)"+I[user1, Tom, tom123@gmail.com, 8.10]", (List)new .colon.colon((Object)"+I[user3, Bailey, bailey@qq.com, 9.99]", (List)new .colon.colon((Object)"+I[user4, Tina, tina@gmail.com, 11.30]", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        java.util.List<String> rawResult = TestValuesTableFactory.getRawResultsAsStrings("sink_with_pk");
        List expectedRaw = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"+I[user1, Tom, tom@gmail.com, 10.02]", "+I[user2, Jack, jack@hotmail.com, 71.20]", "-D[user1, Tom, tom@gmail.com, 10.02]", "+I[user1, Tom, tom123@gmail.com, 8.10]", "+I[user3, Bailey, bailey@gmail.com, 9.99]", "-D[user2, Jack, jack@hotmail.com, 71.20]", "+I[user4, Tina, tina@gmail.com, 11.30]", "-D[user3, Bailey, bailey@gmail.com, 9.99]", "+I[user3, Bailey, bailey@qq.com, 9.99]"}));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(rawResult).toList()).isEqualTo((Object)expectedRaw);
    }

    @TestTemplate
    public void testInsertPartColumn() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE zm_test (\n                      |  `person` String,\n                      |  `votes` BIGINT,\n                      |  `m1` MAP<STRING, BIGINT>,\n                      |  `m2` MAP<STRING NOT NULL, BIGINT>,\n                      |  `m3` MAP<STRING, BIGINT NOT NULL>,\n                      |  `m4` MAP<STRING NOT NULL, BIGINT NOT NULL>\n                      |) WITH (\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'true'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into zm_test(`person`, `votes`)\n                    |  select\n                    |    `person`,\n                    |    `votes`\n                    |  from\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("zm_test");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)new .colon.colon((Object)"+I[jason, 1, null, null, null, null]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelect() {
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE MyCtasTable\n                    | WITH (\n                    |   'connector' = 'values',\n                    |   'sink-insert-only' = 'true'\n                    |) AS\n                    |  SELECT\n                    |    `person`,\n                    |    `votes`\n                    |  FROM\n                    |    src\n                    |")).stripMargin()).await();
        java.util.List<String> actual = TestValuesTableFactory.getResultsAsStrings("MyCtasTable");
        .colon.colon expected = new .colon.colon((Object)"+I[jason, 1]", (List)new .colon.colon((Object)"+I[jason, 1]", (List)new .colon.colon((Object)"+I[jason, 1]", (List)new .colon.colon((Object)"+I[jason, 1]", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actual).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        StreamStatementSet statementSet = this.tEnv().createStatementSet();
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyCtasTableUseStatement\n                                | WITH (\n                                |   'connector' = 'values',\n                                |   'sink-insert-only' = 'true'\n                                |) AS\n                                |  SELECT\n                                |    `person`,\n                                |    `votes`\n                                |  FROM\n                                |    src\n                                |")).stripMargin());
        statementSet.execute().await();
        java.util.List<String> actualUseStatement = TestValuesTableFactory.getResultsAsStrings("MyCtasTableUseStatement");
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(actualUseStatement).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCreateTableAsSelectWithoutOptions() {
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("CREATE TABLE MyCtasTable AS SELECT `person`, `votes` FROM src")).hasMessage("You should enable the checkpointing for sinking to managed table 'default_catalog.default_database.MyCtasTable', managed table relies on checkpoint to commit and the data is visible only after commit.");
    }

    @TestTemplate
    public void testPartialInsert() {
        String srcDataId = TestValuesTableFactory.registerData((Seq<Row>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), "jason", BoxesRunTime.boxToLong((long)3L), "X", BoxesRunTime.boxToInteger((int)43)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "andy", BoxesRunTime.boxToLong((long)2L), "Y", BoxesRunTime.boxToInteger((int)32)})), BatchTestBase$.MODULE$.row((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), "clark", BoxesRunTime.boxToLong((long)1L), "Z", BoxesRunTime.boxToInteger((int)29)}))}))));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(416).append("\n                       |CREATE TABLE test_source (\n                       |  id bigint,\n                       |  person String,\n                       |  votes bigint,\n                       |  city String,\n                       |  age int)\n                       |WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(srcDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                      |CREATE TABLE test_sink (\n                      |  id bigint,\n                      |  person String,\n                      |  votes bigint,\n                      |  city String,\n                      |  age int,\n                      |  primary key(id) not enforced\n                      |) WITH (\n                      |  'connector' = 'values',\n                      |  'sink-insert-only' = 'false'\n                      |)\n                      |")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into test_sink (id, person, votes)\n                    |  select\n                    |    id,\n                    |    person,\n                    |    votes\n                    |  from\n                    |    test_source\n                    |")).stripMargin()).await();
        java.util.List<String> result = TestValuesTableFactory.getResultsAsStrings("test_sink");
        .colon.colon expected = new .colon.colon((Object)"+I[1, jason, 3, null, null]", (List)new .colon.colon((Object)"+I[2, andy, 2, null, null]", (List)new .colon.colon((Object)"+I[3, clark, 1, null, null]", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                    |insert into test_sink (id, city, age)\n                    |  select\n                    |    id,\n                    |    city,\n                    |    age \n                    |  from\n                    |    test_source\n                    |")).stripMargin()).await();
        java.util.List<String> result2 = TestValuesTableFactory.getResultsAsStrings("test_sink");
        .colon.colon expected2 = new .colon.colon((Object)"+I[1, jason, 3, X, 43]", (List)new .colon.colon((Object)"+I[2, andy, 2, Y, 32]", (List)new .colon.colon((Object)"+I[3, clark, 1, Z, 29]", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(result2).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected2.sorted((Ordering)Ordering.String$.MODULE$));
    }

    public TableSinkITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

