/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.utils.TestLegacyFilterableTableSource$;
import org.apache.flink.table.runtime.functions.scalar.SourceWatermarkFunction;
import org.apache.flink.table.utils.LegacyRowExtension;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015c\u0001B\u0001\u0003\u0001M\u0011\u0011\u0003V1cY\u0016\u001cv.\u001e:dK&#6)Y:f\u0015\t\u0019A!A\u0002tc2T!!\u0002\u0004\u0002\rM$(/Z1n\u0015\t9\u0001\"A\u0004sk:$\u0018.\\3\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\r\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u0012'R\u0014X-Y7j]\u001e$Vm\u001d;CCN,\u0007\"B\u000e\u0001\t\u0003a\u0012A\u0002\u001fj]&$h\bF\u0001\u001e!\tq\u0002!D\u0001\u0003\u0011\u001d\u0001\u0003A1A\u0005\n\u0005\n\u0011aX\u000b\u0002EA\u00191\u0005\u000b\u0016\u000e\u0003\u0011R!!\n\u0014\u0002\u0013Q,7\u000f^;uS2\u001c(BA\u0014\r\u0003\u0011\u0019wN]3\n\u0005%\"#aE#bG\"\u001c\u0015\r\u001c7cC\u000e\\wK]1qa\u0016\u0014\bCA\u0016.\u001b\u0005a#BA\f\u000b\u0013\tqCF\u0001\nMK\u001e\f7-\u001f*po\u0016CH/\u001a8tS>t\u0007B\u0002\u0019\u0001A\u0003%!%\u0001\u0002`A!\u0012qF\r\t\u0003gqj\u0011\u0001\u000e\u0006\u0003kY\n\u0011\"\u001a=uK:\u001c\u0018n\u001c8\u000b\u0005]B\u0014aA1qS*\u0011\u0011HO\u0001\bUV\u0004\u0018\u000e^3s\u0015\tY\u0004#A\u0003kk:LG/\u0003\u0002>i\t\t\"+Z4jgR,'/\u0012=uK:\u001c\u0018n\u001c8\t\u000b}\u0002A\u0011\t!\u0002\r\t,gm\u001c:f)\u0005\t\u0005C\u0001\"F\u001b\u0005\u0019%\"\u0001#\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0019\u001b%\u0001B+oSRD#A\u0010%\u0011\u0005%SU\"\u0001\u001c\n\u0005-3$A\u0003\"fM>\u0014X-R1dQ\")Q\n\u0001C\u0001\u0001\u0006\tB/Z:u'&l\u0007\u000f\\3Qe>TWm\u0019;)\u00051{\u0005CA%Q\u0013\t\tfG\u0001\u0003UKN$\b\"B*\u0001\t\u0003\u0001\u0015A\u0007;fgR\u0004&o\u001c6fGR<\u0016\u000e\u001e5pkRLe\u000e];u%\u00164\u0007F\u0001*P\u0011\u00151\u0006\u0001\"\u0001A\u0003E!Xm\u001d;OKN$X\r\u001a)s_*,7\r\u001e\u0015\u0003+>CQ!\u0017\u0001\u0005\u0002\u0001\u000b\u0011\u0004^3ti:+7\u000f^3e!J|'.Z2u/&$\b.\u0013;f[\"\u0012\u0001l\u0014\u0005\u00069\u0002!\t\u0001Q\u0001\u001ei\u0016\u001cH\u000fV1cY\u0016\u001cv.\u001e:dK^KG\u000f\u001b$jYR,'/\u00192mK\"\u00121l\u0014\u0005\u0006?\u0002!\t\u0001Q\u0001&i\u0016\u001cH\u000fV1cY\u0016\u001cv.\u001e:dK^KG\u000f\u001b$v]\u000e$\u0018n\u001c8GS2$XM]1cY\u0016D#AX(\t\u000b\t\u0004A\u0011\u0001!\u0002+Q,7\u000f^%oaV$hi\u001c:nCR\u001cv.\u001e:dK\"\u0012\u0011m\u0014\u0005\u0006K\u0002!\t\u0001Q\u0001\u0011i\u0016\u001cH/\u00117m\t\u0006$\u0018\rV=qKND#\u0001Z(\t\u000b!\u0004A\u0011\u0001!\u00021Q,7\u000f^*j[BdW-T3uC\u0012\fG/Y!dG\u0016\u001c8\u000f\u000b\u0002h\u001f\")1\u000e\u0001C\u0001\u0001\u0006IB/Z:u\u0007>l\u0007\u000f\\3y\u001b\u0016$\u0018\rZ1uC\u0006\u001b7-Z:tQ\tQw\nC\u0003o\u0001\u0011\u0005\u0001)\u0001\u0011uKN$H)\u001e9mS\u000e\fG/Z'fi\u0006$\u0017\r^1Ge>l7+Y7f\u0017\u0016L\bFA7P\u0011\u0015\t\b\u0001\"\u0001A\u0003\u0019\"Xm\u001d;OKN$X\r\u001a)s_*,7\r^5p]^KG\u000f['fi\u0006$\u0017\r^1BG\u000e,7o\u001d\u0015\u0003a>CQ\u0001\u001e\u0001\u0005\u0002\u0001\u000b\u0001\u0004^3tiN{WO]2f/\u0006$XM]7be.Le\u000e\u0012#MQ\t\u0019x\nC\u0003x\u0001\u0011\u0005\u0001)\u0001\u000euKN$8k\\;sG\u0016<\u0016\r^3s[\u0006\u00148.\u00138Rk\u0016\u0014\u0018\u0010\u000b\u0002w\u001f\")!\u0010\u0001C\u0001\u0001\u00061B/Z:u'&l\u0007\u000f\\3OKN$X\r\u001a$jYR,'\u000f\u000b\u0002z\u001f\")Q\u0010\u0001C\u0001\u0001\u00069B/Z:u\u001d\u0016\u001cH/\u001a3GS2$XM](o\u0003J\u0014\u0018-\u001f\u0015\u0003y>Ca!!\u0001\u0001\t\u0003\u0001\u0015!\u0006;fgRtUm\u001d;fI\u001aKG\u000e^3s\u001f:l\u0015\r\u001d\u0015\u0003\u007f>Cq!a\u0002\u0001\t\u0013\tI!A\fj]:,'\u000fV3tiN+G\u000fU1sC2dW\r\\5t[R9\u0011)a\u0003\u0002&\u0005=\u0002\u0002CA\u0007\u0003\u000b\u0001\r!a\u0004\u0002\u0011A\u0014xN^5eKJ\u0004B!!\u0005\u0002 9!\u00111CA\u000e!\r\t)bQ\u0007\u0003\u0003/Q1!!\u0007\u0013\u0003\u0019a$o\\8u}%\u0019\u0011QD\"\u0002\rA\u0013X\rZ3g\u0013\u0011\t\t#a\t\u0003\rM#(/\u001b8h\u0015\r\tib\u0011\u0005\t\u0003O\t)\u00011\u0001\u0002*\u0005Y\u0001/\u0019:bY2,G.[:n!\r\u0011\u00151F\u0005\u0004\u0003[\u0019%aA%oi\"A\u0011\u0011GA\u0003\u0001\u0004\tI#A\u0003j]\u0012,\u0007\u0010\u0003\u0004\u00026\u0001!\t\u0001Q\u0001\"i\u0016\u001cH\u000fU1sC2dW\r\\5t[^KG\u000f[*pkJ\u001cWMR;oGRLwN\u001c\u0015\u0004\u0003gy\u0005BBA\u001e\u0001\u0011\u0005\u0001)\u0001\u0010uKN$\b+\u0019:bY2,G.[:n/&$\b.\u00138qkR4uN]7bi\"\u001a\u0011\u0011H(\t\r\u0005\u0005\u0003\u0001\"\u0001A\u0003u!Xm\u001d;QCJ\fG\u000e\\3mSNlw+\u001b;i\t\u0006$\u0018m\u0015;sK\u0006l\u0007fAA \u001f\u0002")
public class TableSourceITCase
extends StreamingTestBase {
    @RegisterExtension
    private final EachCallbackWrapper<LegacyRowExtension> _ = new EachCallbackWrapper((CustomExtension)new LegacyRowExtension());

    private EachCallbackWrapper<LegacyRowExtension> _() {
        return this._;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        String myTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData3());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(379).append("\n                       |CREATE TABLE MyTable (\n                       |  `a` INT,\n                       |  `b` BIGINT,\n                       |  `c` STRING\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(myTableDataId).append("',\n                       |  'bounded' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        String filterableTableDataId = TestValuesTableFactory.registerData(TestLegacyFilterableTableSource$.MODULE$.defaultRows());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(488).append("\n                       |CREATE TABLE FilterableTable (\n                       |  name STRING,\n                       |  id BIGINT,\n                       |  amount INT,\n                       |  price DOUBLE\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'filterable-fields' = 'amount',\n                       |  'data-id' = '").append(filterableTableDataId).append("',\n                       |  'bounded' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        String metadataTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData5());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(564).append("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3',\n         |  `other_metadata2` AS CAST(`other_metadata` AS BIGINT),\n         |  `b` BIGINT,\n         |  `metadata_1` INT METADATA,\n         |  `computed` AS `metadata_1` * 2,\n         |  `metadata_2` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '").append(metadataTableDataId).append("',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n         |)\n         |").toString())).stripMargin());
        String nestedTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.deepNestedRow());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(684).append("\n         |CREATE TABLE NestedTable (\n         |  id BIGINT,\n         |  deepNested ROW<\n         |     nested1 ROW<name STRING, `value.` INT>,\n         |     `nested2.` ROW<num INT, flag BOOLEAN>>,\n         |  nested ROW<name STRING, `value` INT>,\n         |  name STRING,\n         |  nestedItem ROW<deepArray ROW<`value` INT> ARRAY, deepMap MAP<STRING, INT>>,\n         |  lower_name AS LOWER(name)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'nested-projection-supported' = 'true',\n         |  'filterable-fields' = '`nested.value`;`nestedItem.deepMap`;`nestedItem.deepArray`',\n         |  'data-id' = '").append(nestedTableDataId).append("',\n         |  'bounded' = 'true'\n         |)\n         |").toString())).stripMargin());
    }

    @Test
    public void testSimpleProject() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT a, c FROM MyTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Hi", "2,Hello", "3,Hello world"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testProjectWithoutInputRef() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT COUNT(*) FROM MyTable")).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink sink = new TestingRetractSink();
        result.addSink((SinkFunction)sink).setParallelism(result.parallelism());
        this.env().execute();
        Assertions.assertThat((Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo((Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"3"})));
    }

    @Test
    public void testNestedProject() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |    deepNested.nested1.name AS nestedName,\n        |    nested.`value` AS nestedValue,\n        |    deepNested.`nested2.`.flag AS nestedFlag,\n        |    deepNested.`nested2.`.num + deepNested.nested1.`value.` AS nestedNum,\n        |    lower_name\n        |FROM NestedTable\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Sarah,10000,true,1100,mary", "2,Rob,20000,false,2200,bob", "3,Mike,30000,true,3300,liz"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedProjectWithItem() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT nestedItem.deepArray[nestedItem.deepMap['Monday']] FROM  NestedTable\n        |")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1", "1", "1"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testTableSourceWithFilterable() {
        String query = "SELECT id, amount, name FROM FilterableTable WHERE amount > 4 AND price < 9";
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,5,Record_5", "6,6,Record_6", "7,7,Record_7", "8,8,Record_8"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testTableSourceWithFunctionFilterable() {
        String query = "SELECT id, amount, name FROM FilterableTable WHERE amount > 4 AND price < 9 AND upper(name) = 'RECORD_5'";
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,5,Record_5"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testInputFormatSource() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData3());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(263).append("\n         |CREATE TABLE MyInputFormatTable (\n         |  `a` INT,\n         |  `b` BIGINT,\n         |  `c` STRING\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '").append(dataId).append("',\n         |  'runtime-source' = 'InputFormat'\n         |)\n         |").toString())).stripMargin());
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT a, c FROM MyInputFormatTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Hi", "2,Hello", "3,Hello world"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAllDataTypes() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.fullDataTypesData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(629).append("\n         |CREATE TABLE T (\n         |  `a` BOOLEAN,\n         |  `b` TINYINT,\n         |  `c` SMALLINT,\n         |  `d` INT,\n         |  `e` BIGINT,\n         |  `f` FLOAT,\n         |  `g` DOUBLE,\n         |  `h` DECIMAL(5, 2),\n         |  `x` DECIMAL(30, 10),\n         |  `i` VARCHAR(5),\n         |  `j` CHAR(5),\n         |  `k` DATE,\n         |  `l` TIME(0),\n         |  `m` TIMESTAMP(9),\n         |  `n` TIMESTAMP(9) WITH LOCAL TIME ZONE,\n         |  `o` ARRAY<BIGINT>,\n         |  `p` ROW<f1 BIGINT, f2 STRING, f3 DOUBLE>\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '").append(dataId).append("'\n         |)\n         |").toString())).stripMargin());
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT * FROM T")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"true,127,32767,2147483647,9223372036854775807,-1.123,-1.123,5.10,1234567891012345.1000000000,1,1,1969-01-01,00:00:00.123,1969-01-01T00:00:00.123456789,1969-01-01T00:00:00.123456789Z,[1, 2, 3],1,a,2.3", "false,-128,-32768,-2147483648,-9223372036854775808,3.4,3.4,6.10,61234567891012345.1000000000,12,12,1970-09-30,01:01:01.123,1970-09-30T01:01:01.123456,1970-09-30T01:01:01.123456Z,[4, 5],null,b,4.56", "true,0,0,0,0,0.12,0.12,7.10,71234567891012345.1000000000,123,123,1990-12-24,08:10:24.123,1990-12-24T08:10:24.123,1990-12-24T08:10:24.123Z,[6, null, 7],3,null,7.86", "false,5,4,123,1234,1.2345,1.2345,8.12,812345678910123451.0123456789,1234,1234,2020-05-01,23:23:23,2020-05-01T23:23:23,2020-05-01T23:23:23Z,[8],4,c,null", "null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testSimpleMetadataAccess() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT `a`, `b`, `metadata_2` FROM MetadataTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,1,Hallo", "2,2,Hallo Welt", "2,3,Hallo Welt wie"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testComplexMetadataAccess() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT `a`, `other_metadata`, `b`, `metadata_2`, `computed` FROM MetadataTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,1,1,Hallo,0", "2,2,2,Hallo Welt,2", "2,1,3,Hallo Welt wie,4"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testDuplicateMetadataFromSameKey() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT other_metadata, other_metadata2, metadata_2 FROM MetadataTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,1,Hallo", "1,1,Hallo Welt wie", "2,2,Hallo Welt"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedProjectionWithMetadataAccess() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |    deepNested.nested1.name AS nestedName,\n        |    nested.`value` AS nestedValue,\n        |    deepNested.`nested2.`.flag AS nestedFlag,\n        |    deepNested.`nested2.`.num + deepNested.nested1.`value.` AS nestedNum,\n        |    LOWER(name) as lowerName\n        |FROM NestedTable\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Sarah,10000,true,1100,mary", "2,Rob,20000,false,2200,bob", "3,Mike,30000,true,3300,liz"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testSourceWatermarkInDDL() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.data3WithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(501).append("\n                       |CREATE TABLE tableWithWatermark (\n                       |  `a` INT,\n                       |  `b` BIGINT,\n                       |  `c` STRING,\n                       |  `ts` TIMESTAMP(3),\n                       |  WATERMARK FOR ts AS SOURCE_WATERMARK()\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(dataId).append("',\n                       |  'bounded' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("SELECT * FROM tableWithWatermark").await()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, (String)SourceWatermarkFunction.ERROR_MESSAGE)});
    }

    @Test
    public void testSourceWatermarkInQuery() {
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("SELECT *, SOURCE_WATERMARK() FROM MyTable").print()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, (String)SourceWatermarkFunction.ERROR_MESSAGE)});
    }

    @Test
    public void testSimpleNestedFilter() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id, deepNested.nested1.name AS nestedName FROM NestedTable\n        |   WHERE nested.`value` > 20000\n    ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"3,Mike"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedFilterOnArray() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |   deepNested.nested1.name AS nestedName,\n        |   nestedItem.deepArray[2].`value` FROM NestedTable\n        |WHERE nestedItem.deepArray[2].`value` > 1\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Sarah,2", "2,Rob,2", "3,Mike,2"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedFilterOnMap() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |   deepNested.nested1.name AS nestedName,\n        |   nestedItem.deepMap['Monday'] FROM NestedTable\n        |WHERE nestedItem.deepMap['Monday'] = 1\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1,Sarah,1", "2,Rob,1", "3,Mike,1"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    private void innerTestSetParallelism(String provider, int parallelism, int index) {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.data1());
        String sourceTableName = new StringBuilder(18).append("test_para_source_").append(provider.toLowerCase().trim()).append("_").append(index).toString();
        String sinkTableName = new StringBuilder(16).append("test_para_sink_").append(provider.toLowerCase().trim()).append("_").append(index).toString();
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(546).append("\n                       |CREATE TABLE ").append(sourceTableName).append(" (\n                       |  the_month INT,\n                       |  area STRING,\n                       |  product INT\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(dataId).append("',\n                       |  'bounded' = 'true',\n                       |  'runtime-source' = '").append(provider).append("',\n                       |  'scan.parallelism' = '").append(parallelism).append("',\n                       |  'enable-projection-push-down' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(346).append("\n                       |CREATE TABLE ").append(sinkTableName).append(" (\n                       |  the_month INT,\n                       |  area STRING,\n                       |  product INT\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'true'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringBuilder(27).append("INSERT INTO ").append(sinkTableName).append(" SELECT * FROM ").append(sourceTableName).toString()).await();
    }

    @Test
    public void testParallelismWithSourceFunction() {
        int negativeParallelism = -1;
        int validParallelism = 3;
        AtomicInteger index = new AtomicInteger(1);
        Assertions.assertThatThrownBy(() -> this.innerTestSetParallelism("SourceFunction", negativeParallelism, index.getAndIncrement())).hasMessageContaining("Invalid configured parallelism");
        this.innerTestSetParallelism("SourceFunction", validParallelism, index.getAndIncrement());
    }

    @Test
    public void testParallelismWithInputFormat() {
        int negativeParallelism = -1;
        int validParallelism = 3;
        AtomicInteger index = new AtomicInteger(2);
        Assertions.assertThatThrownBy(() -> this.innerTestSetParallelism("InputFormat", negativeParallelism, index.getAndIncrement())).hasMessageContaining("Invalid configured parallelism");
        this.innerTestSetParallelism("InputFormat", validParallelism, index.getAndIncrement());
    }

    @Test
    public void testParallelismWithDataStream() {
        int negativeParallelism = -1;
        int validParallelism = 3;
        AtomicInteger index = new AtomicInteger(3);
        Assertions.assertThatThrownBy(() -> this.innerTestSetParallelism("DataStream", negativeParallelism, index.getAndIncrement())).hasMessageContaining("Invalid configured parallelism");
        this.innerTestSetParallelism("DataStream", validParallelism, index.getAndIncrement());
    }
}

