/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.ZoneId;
import java.util.Collection;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.stream.sql.WindowJoinITCase$;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\u0005\u0015f\u0001B\u0001\u0003\u0001M\u0011\u0001cV5oI><(j\\5o\u0013R\u001b\u0015m]3\u000b\u0005\r!\u0011aA:rY*\u0011QAB\u0001\u0007gR\u0014X-Y7\u000b\u0005\u001dA\u0011a\u0002:v]RLW.\u001a\u0006\u0003\u0013)\tq\u0001\u001d7b]:,'O\u0003\u0002\f\u0019\u0005)A/\u00192mK*\u0011QBD\u0001\u0006M2Lgn\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001!\u0002CA\u000b\u0019\u001b\u00051\"BA\f\u0007\u0003\u0015)H/\u001b7t\u0013\tIbC\u0001\u000eTiJ,\u0017-\\5oO^KG\u000f[*uCR,G+Z:u\u0005\u0006\u001cX\r\u0003\u0005\u001c\u0001\t\u0005\t\u0015!\u0003\u001d\u0003\u0011iw\u000eZ3\u0011\u0005u\tdB\u0001\u00100\u001d\tybF\u0004\u0002![9\u0011\u0011\u0005\f\b\u0003E-r!a\t\u0016\u000f\u0005\u0011JcBA\u0013)\u001b\u00051#BA\u0014\u0013\u0003\u0019a$o\\8u}%\t\u0011#\u0003\u0002\u0010!%\u0011QBD\u0005\u0003\u00171I!!\u0003\u0006\n\u0005\u001dA\u0011BA\f\u0007\u0013\t\u0001d#\u0001\u000eTiJ,\u0017-\\5oO^KG\u000f[*uCR,G+Z:u\u0005\u0006\u001cX-\u0003\u00023g\t\u00012\u000b^1uK\n\u000b7m[3oI6{G-\u001a\u0006\u0003aYA\u0001\"\u000e\u0001\u0003\u0002\u0003\u0006IAN\u0001\u0010kN,G+[7fgR\fW\u000e\u001d'uuB\u0011qGO\u0007\u0002q)\t\u0011(A\u0003tG\u0006d\u0017-\u0003\u0002<q\t9!i\\8mK\u0006t\u0007\"B\u001f\u0001\t\u0003q\u0014A\u0002\u001fj]&$h\bF\u0002@\u0003\n\u0003\"\u0001\u0011\u0001\u000e\u0003\tAQa\u0007\u001fA\u0002qAQ!\u000e\u001fA\u0002YBq\u0001\u0012\u0001C\u0002\u0013\u0005Q)A\u0007T\u0011\u0006su\tS!J?j{e*R\u000b\u0002\rB\u0011q\tT\u0007\u0002\u0011*\u0011\u0011JS\u0001\u0005i&lWMC\u0001L\u0003\u0011Q\u0017M^1\n\u00055C%A\u0002.p]\u0016LE\r\u0003\u0004P\u0001\u0001\u0006IAR\u0001\u000f'\"\u000bej\u0012%B\u0013~SvJT#!\u0011\u0015\t\u0006\u0001\"\u0011S\u0003\u0019\u0011WMZ8sKR\t1\u000b\u0005\u00028)&\u0011Q\u000b\u000f\u0002\u0005+:LG\u000f\u000b\u0002Q/B\u0011\u0001lW\u0007\u00023*\u0011!\fE\u0001\u0006UVt\u0017\u000e^\u0005\u00039f\u0013aAQ3g_J,\u0007\"\u00020\u0001\t\u0003\u0011\u0016!\u0004;fgRLeN\\3s\u0015>Lg\u000e\u000b\u0002^AB\u0011\u0001,Y\u0005\u0003Ef\u0013A\u0001V3ti\")A\r\u0001C\u0001%\u0006\u0011B/Z:u\u0013:tWM\u001d&pS:|en\u0016+GQ\t\u0019\u0007\rC\u0003h\u0001\u0011\u0005!+\u0001\u000fuKN$\u0018J\u001c8fe*{\u0017N\\(o/R3u+\u001b;i\u001f\u001a47/\u001a;)\u0005\u0019\u0004\u0007\"\u00026\u0001\t\u0003\u0011\u0016\u0001\n;fgRLeN\\3s\u0015>Lgn\u00148X)\u001a;\u0016\u000e\u001e5OK\u001e\fG/\u001b<f\u001f\u001a47/\u001a;)\u0005%\u0004\u0007\"B7\u0001\t\u0003\u0011\u0016A\t;fgRLeN\\3s\u0015>LgnV5uQ&\u001bhj\u001c;ESN$\u0018N\\2u\rJ|W\u000e\u000b\u0002mA\")\u0001\u000f\u0001C\u0001%\u00069C/Z:u\u0013:tWM\u001d&pS:<\u0016\u000e\u001e5Jg:{G\u000fR5ti&t7\r\u001e$s_6|en\u0016+GQ\ty\u0007\rC\u0003t\u0001\u0011\u0005!+\u0001\nuKN$8+Z7j\u0015>Lg.\u0012=jgR\u001c\bF\u0001:a\u0011\u00151\b\u0001\"\u0001S\u0003U!Xm\u001d;TK6L'j\\5o\u000bbL7\u000f^:X)\u001aC#!\u001e1\t\u000be\u0004A\u0011\u0001*\u0002\u001dQ,7\u000f^*f[&Tu.\u001b8J\u001d\"\u0012\u0001\u0010\u0019\u0005\u0006y\u0002!\tAU\u0001\u0013i\u0016\u001cHoU3nS*{\u0017N\\%O?^#f\t\u000b\u0002|A\")q\u0010\u0001C\u0001%\u0006)B/Z:u\u0003:$\u0018NS8j]:{G/\u0012=jgR\u001c\bF\u0001@a\u0011\u0019\t)\u0001\u0001C\u0001%\u0006AB/Z:u\u0003:$\u0018NS8j]:{G/\u0012=jgR\u001cx\u000b\u0016$)\u0007\u0005\r\u0001\r\u0003\u0004\u0002\f\u0001!\tAU\u0001\u0012i\u0016\u001cH/\u00118uS*{\u0017N\u001c(pi&s\u0005fAA\u0005A\"1\u0011\u0011\u0003\u0001\u0005\u0002I\u000bQ\u0003^3ti\u0006sG/\u001b&pS:tu\u000e^%O?^#f\tK\u0002\u0002\u0010\u0001Da!a\u0006\u0001\t\u0003\u0011\u0016\u0001\u0004;fgRdUM\u001a;K_&t\u0007fAA\u000bA\"1\u0011Q\u0004\u0001\u0005\u0002I\u000b\u0011\u0005^3ti2+g\r\u001e&pS:<\u0016\u000e\u001e5Jg:{G\u000fR5ti&t7\r\u001e$s_6D3!a\u0007a\u0011\u0019\t\u0019\u0003\u0001C\u0001%\u0006iA/Z:u%&<\u0007\u000e\u001e&pS:D3!!\ta\u0011\u0019\tI\u0003\u0001C\u0001%\u0006\u0011C/Z:u%&<\u0007\u000e\u001e&pS:<\u0016\u000e\u001e5Jg:{G\u000fR5ti&t7\r\u001e$s_6D3!a\na\u0011\u0019\ty\u0003\u0001C\u0001%\u0006iA/Z:u\u001fV$XM\u001d&pS:D3!!\fa\u0011\u0019\t)\u0004\u0001C\u0001%\u0006\u0011C/Z:u\u001fV$XM\u001d&pS:<\u0016\u000e\u001e5Jg:{G\u000fR5ti&t7\r\u001e$s_6D3!a\raQ\u001d\u0001\u00111HA$\u0003\u0013\u0002B!!\u0010\u0002D5\u0011\u0011q\b\u0006\u0004\u0003\u0003J\u0016A\u0002:v]:,'/\u0003\u0003\u0002F\u0005}\"a\u0002*v]^KG\u000f[\u0001\u0006m\u0006dW/Z\u0012\u0003\u0003\u0017\u0002B!!\u0014\u0002T5\u0011\u0011q\n\u0006\u0004\u0003#J\u0016a\u0002:v]:,'o]\u0005\u0005\u0003+\nyEA\u0007QCJ\fW.\u001a;fe&TX\rZ\u0004\b\u00033\u0012\u0001\u0012AA.\u0003A9\u0016N\u001c3po*{\u0017N\\%U\u0007\u0006\u001cX\rE\u0002A\u0003;2a!\u0001\u0002\t\u0002\u0005}3\u0003BA/\u0003C\u00022aNA2\u0013\r\t)\u0007\u000f\u0002\u0007\u0003:L(+\u001a4\t\u000fu\ni\u0006\"\u0001\u0002jQ\u0011\u00111\f\u0005\t\u0003[\ni\u0006\"\u0001\u0002p\u0005Q\u0001/\u0019:b[\u0016$XM]:\u0015\u0005\u0005E\u0004CBA:\u0003s\ni(\u0004\u0002\u0002v)\u0019\u0011q\u000f&\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003w\n)H\u0001\u0006D_2dWm\u0019;j_:\u0004RaNA@\u0003\u0007K1!!!9\u0005\u0015\t%O]1z!\u0011\t))a#\u000e\u0005\u0005\u001d%bAAE\u0015\u0006!A.\u00198h\u0013\u0011\ti)a\"\u0003\r=\u0013'.Z2uQ!\tY'!%\u0002 \u0006\u0005\u0006\u0003BAJ\u00033sA!!\u0014\u0002\u0016&!\u0011qSA(\u00035\u0001\u0016M]1nKR,'/\u001b>fI&!\u00111TAO\u0005)\u0001\u0016M]1nKR,'o\u001d\u0006\u0005\u0003/\u000by%\u0001\u0003oC6,\u0017EAAR\u0003\u001d\u001aF/\u0019;f\u0005\u0006\u001c7.\u001a8e{m\u0004T\u0010\f\u0011Vg\u0016$\u0016.\\3ti\u0006l\u0007\u000f\u0014;{Au\u000230M?")
public class WindowJoinITCase
extends StreamingWithStateTestBase {
    private final boolean useTimestampLtz;
    private final ZoneId SHANGHAI_ZONE;

    @Parameterized.Parameters(name="StateBackend={0}, UseTimestampLtz = {1}")
    public static Collection<Object[]> parameters() {
        return WindowJoinITCase$.MODULE$.parameters();
    }

    public ZoneId SHANGHAI_ZONE() {
        return this.SHANGHAI_ZONE;
    }

    @Override
    @Before
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        this.env().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        FailingCollectionSource.reset();
        String dataId1 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        String dataIdWithLtz = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithLtzInShanghai());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(460).append("\n         |CREATE TABLE T1 (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object)(this.useTimestampLtz ? dataIdWithLtz : dataId1)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
        String dataId2 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowData2WithTimestamp());
        String dataIdWithLtz2 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowData2WithLtzInShanghai());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(460).append("\n         |CREATE TABLE T2 (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object)(this.useTimestampLtz ? dataIdWithLtz2 : dataId2)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
        this.tEnv().getConfig().setLocalTimeZone(this.SHANGHAI_ZONE());
    }

    @Test
    public void testInnerJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testInnerJoinOnWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(768).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z,4,Hi,b", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999,4,Hi,b", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1,Comment#3,b"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testInnerJoinOnWTFWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(809).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime),INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-09T16:00:20.999Z,4,Hi,b", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-09T16:00:35.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999,4,Hi,b", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999,1,Comment#3,b"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testInnerJoinOnWTFWithNegativeOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(812).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-09T16:00:18.999Z,4,Hi,b", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-09T16:00:38.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-10T00:00:18.999,4,Hi,b", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999,1,Comment#3,b"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testInnerJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testInnerJoinWithIsNotDistinctFromOnWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(788).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` IS NOT DISTINCT from R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z,4,Hi,b", "2020-10-09T16:00:32Z,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,7,null,null", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999,4,Hi,b", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,7,null,null", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1,Comment#3,b"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testSemiJoinExists() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE EXISTS (\n        |SELECT * FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n        |        AND L.`name` = R.`name`)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testSemiJoinExistsWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(714).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE EXISTS (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`)\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testSemiJoinIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE L.`name` IN (\n        |SELECT `name` FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testSemiJoinIN_WTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(724).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE L.`name` IN (\n         |  SELECT `name`\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`)\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testAntiJoinNotExists() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE NOT EXISTS (\n        |SELECT * FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n        |        AND L.`name` = R.`name`)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testAntiJoinNotExistsWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(748).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE NOT EXISTS (\n         |  SELECT *\n         |    FROM TABLE(\n         |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`) AND L.`float` IS NOT NULL\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:01Z,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:02Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:03Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:04Z,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:08Z,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:32Z,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testAntiJoinNotIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE L.`name` NOT IN (\n        |SELECT `name` FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testAntiJoinNotIN_WTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(714).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |  FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE L.`name` NOT IN (\n         |SELECT `name`\n         |FROM\n         |TABLE(TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) AND\n         | L.`float` IS NOT NULL\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:01Z,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:02Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:03Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:04Z,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:08Z,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testLeftJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |LEFT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,null"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testLeftJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |LEFT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testRightJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |RIGHT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testRightJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |RIGHT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testOuterJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end,\n        |uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |FULL OUTER JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,null,null,0,null", "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", "null,null,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testOuterJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end,\n        |uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |FULL OUTER JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0", "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowJoinITCase(StreamingWithStateTestBase.StateBackendMode mode, boolean useTimestampLtz) {
        this.useTimestampLtz = useTimestampLtz;
        super(mode);
        this.SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
    }
}

