/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.ZoneId;
import java.util.Collection;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.stream.sql.WindowJoinITCase$;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\u0005Ef\u0001B\u0001\u0003\u0001M\u0011\u0001cV5oI><(j\\5o\u0013R\u001b\u0015m]3\u000b\u0005\r!\u0011aA:rY*\u0011QAB\u0001\u0007gR\u0014X-Y7\u000b\u0005\u001dA\u0011a\u0002:v]RLW.\u001a\u0006\u0003\u0013)\tq\u0001\u001d7b]:,'O\u0003\u0002\f\u0019\u0005)A/\u00192mK*\u0011QBD\u0001\u0006M2Lgn\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001!\u0002CA\u000b\u0019\u001b\u00051\"BA\f\u0007\u0003\u0015)H/\u001b7t\u0013\tIbC\u0001\u000eTiJ,\u0017-\\5oO^KG\u000f[*uCR,G+Z:u\u0005\u0006\u001cX\r\u0003\u0005\u001c\u0001\t\u0005\t\u0015!\u0003\u001d\u0003\u0011iw\u000eZ3\u0011\u0005u\tdB\u0001\u00100\u001d\tybF\u0004\u0002![9\u0011\u0011\u0005\f\b\u0003E-r!a\t\u0016\u000f\u0005\u0011JcBA\u0013)\u001b\u00051#BA\u0014\u0013\u0003\u0019a$o\\8u}%\t\u0011#\u0003\u0002\u0010!%\u0011QBD\u0005\u0003\u00171I!!\u0003\u0006\n\u0005\u001dA\u0011BA\f\u0007\u0013\t\u0001d#\u0001\u000eTiJ,\u0017-\\5oO^KG\u000f[*uCR,G+Z:u\u0005\u0006\u001cX-\u0003\u00023g\t\u00012\u000b^1uK\n\u000b7m[3oI6{G-\u001a\u0006\u0003aYA\u0001\"\u000e\u0001\u0003\u0002\u0003\u0006IAN\u0001\u0010kN,G+[7fgR\fW\u000e\u001d'uuB\u0011qGO\u0007\u0002q)\t\u0011(A\u0003tG\u0006d\u0017-\u0003\u0002<q\t9!i\\8mK\u0006t\u0007\"B\u001f\u0001\t\u0003q\u0014A\u0002\u001fj]&$h\bF\u0002@\u0003\n\u0003\"\u0001\u0011\u0001\u000e\u0003\tAQa\u0007\u001fA\u0002qAQ!\u000e\u001fA\u0002YBq\u0001\u0012\u0001C\u0002\u0013\u0005Q)A\u0007T\u0011\u0006su\tS!J?j{e*R\u000b\u0002\rB\u0011q\tT\u0007\u0002\u0011*\u0011\u0011JS\u0001\u0005i&lWMC\u0001L\u0003\u0011Q\u0017M^1\n\u00055C%A\u0002.p]\u0016LE\r\u0003\u0004P\u0001\u0001\u0006IAR\u0001\u000f'\"\u000bej\u0012%B\u0013~SvJT#!\u0011\u0015\t\u0006\u0001\"\u0011S\u0003\u0019\u0011WMZ8sKR\t1\u000b\u0005\u00028)&\u0011Q\u000b\u000f\u0002\u0005+:LG\u000f\u000b\u0002Q/B\u0011\u0001lX\u0007\u00023*\u0011!lW\u0001\u0004CBL'B\u0001/^\u0003\u001dQW\u000f]5uKJT!A\u0018\t\u0002\u000b),h.\u001b;\n\u0005\u0001L&A\u0003\"fM>\u0014X-R1dQ\")!\r\u0001C\u0001%\u0006iA/Z:u\u0013:tWM\u001d&pS:D#!\u00193\u0011\u0005a+\u0017B\u00014Z\u00051!Vm\u001d;UK6\u0004H.\u0019;f\u0011\u0015A\u0007\u0001\"\u0001S\u0003I!Xm\u001d;J]:,'OS8j]>sw\u000b\u0016$)\u0005\u001d$\u0007\"B6\u0001\t\u0003\u0011\u0016\u0001\b;fgRLeN\\3s\u0015>Lgn\u00148X)\u001a;\u0016\u000e\u001e5PM\u001a\u001cX\r\u001e\u0015\u0003U\u0012DQA\u001c\u0001\u0005\u0002I\u000bA\u0005^3ti&sg.\u001a:K_&twJ\\,U\r^KG\u000f\u001b(fO\u0006$\u0018N^3PM\u001a\u001cX\r\u001e\u0015\u0003[\u0012DQ!\u001d\u0001\u0005\u0002I\u000b!\u0005^3ti&sg.\u001a:K_&tw+\u001b;i\u0013Ntu\u000e\u001e#jgRLgn\u0019;Ge>l\u0007F\u00019e\u0011\u0015!\b\u0001\"\u0001S\u0003\u001d\"Xm\u001d;J]:,'OS8j]^KG\u000f[%t\u001d>$H)[:uS:\u001cGO\u0012:p[>sw\u000b\u0016$)\u0005M$\u0007\"B<\u0001\t\u0003\u0011\u0016A\u0005;fgR\u001cV-\\5K_&tW\t_5tiND#A\u001e3\t\u000bi\u0004A\u0011\u0001*\u0002+Q,7\u000f^*f[&Tu.\u001b8Fq&\u001cHo],U\r\"\u0012\u0011\u0010\u001a\u0005\u0006{\u0002!\tAU\u0001\u000fi\u0016\u001cHoU3nS*{\u0017N\\%OQ\taH\r\u0003\u0004\u0002\u0002\u0001!\tAU\u0001\u0013i\u0016\u001cHoU3nS*{\u0017N\\%O?^#f\t\u000b\u0002\u0000I\"1\u0011q\u0001\u0001\u0005\u0002I\u000bQ\u0003^3ti\u0006sG/\u001b&pS:tu\u000e^#ySN$8\u000fK\u0002\u0002\u0006\u0011Da!!\u0004\u0001\t\u0003\u0011\u0016\u0001\u0007;fgR\fe\u000e^5K_&tgj\u001c;Fq&\u001cHo],U\r\"\u001a\u00111\u00023\t\r\u0005M\u0001\u0001\"\u0001S\u0003E!Xm\u001d;B]RL'j\\5o\u001d>$\u0018J\u0014\u0015\u0004\u0003#!\u0007BBA\r\u0001\u0011\u0005!+A\u000buKN$\u0018I\u001c;j\u0015>LgNT8u\u0013:{v\u000b\u0016$)\u0007\u0005]A\r\u0003\u0004\u0002 \u0001!\tAU\u0001\ri\u0016\u001cH\u000fT3gi*{\u0017N\u001c\u0015\u0004\u0003;!\u0007BBA\u0013\u0001\u0011\u0005!+A\u0011uKN$H*\u001a4u\u0015>LgnV5uQ&\u001bhj\u001c;ESN$\u0018N\\2u\rJ|W\u000eK\u0002\u0002$\u0011Da!a\u000b\u0001\t\u0003\u0011\u0016!\u0004;fgR\u0014\u0016n\u001a5u\u0015>Lg\u000eK\u0002\u0002*\u0011Da!!\r\u0001\t\u0003\u0011\u0016A\t;fgR\u0014\u0016n\u001a5u\u0015>LgnV5uQ&\u001bhj\u001c;ESN$\u0018N\\2u\rJ|W\u000eK\u0002\u00020\u0011Da!a\u000e\u0001\t\u0003\u0011\u0016!\u0004;fgR|U\u000f^3s\u0015>Lg\u000eK\u0002\u00026\u0011Da!!\u0010\u0001\t\u0003\u0011\u0016A\t;fgR|U\u000f^3s\u0015>LgnV5uQ&\u001bhj\u001c;ESN$\u0018N\\2u\rJ|W\u000eK\u0002\u0002<\u0011Ds\u0001AA\"\u0003\u001f\n\t\u0006\u0005\u0003\u0002F\u0005-SBAA$\u0015\r\tI%W\u0001\nKb$XM\\:j_:LA!!\u0014\u0002H\tQQ\t\u001f;f]\u0012<\u0016\u000e\u001e5\u0002\u000bY\fG.^3-\u0005\u0005M3EAA+!\u0011\t9&a\u001a\u000e\u0005\u0005e#\u0002BA.\u0003;\nQ\u0002]1sC6,G/\u001a:ju\u0016$'\u0002BA0\u0003C\n!\"\u001a=uK:\u001c\u0018n\u001c8t\u0015\rq\u00161\r\u0006\u0004\u0003Kb\u0011!\u0003;fgR,H/\u001b7t\u0013\u0011\tI'!\u0017\u00035A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0012=uK:\u001c\u0018n\u001c8\b\u000f\u00055$\u0001#\u0001\u0002p\u0005\u0001r+\u001b8e_^Tu.\u001b8J)\u000e\u000b7/\u001a\t\u0004\u0001\u0006EdAB\u0001\u0003\u0011\u0003\t\u0019h\u0005\u0003\u0002r\u0005U\u0004cA\u001c\u0002x%\u0019\u0011\u0011\u0010\u001d\u0003\r\u0005s\u0017PU3g\u0011\u001di\u0014\u0011\u000fC\u0001\u0003{\"\"!a\u001c\t\u0011\u0005\u0005\u0015\u0011\u000fC\u0001\u0003\u0007\u000b!\u0002]1sC6,G/\u001a:t)\t\t)\t\u0005\u0004\u0002\b\u00065\u0015\u0011S\u0007\u0003\u0003\u0013S1!a#K\u0003\u0011)H/\u001b7\n\t\u0005=\u0015\u0011\u0012\u0002\u000b\u0007>dG.Z2uS>t\u0007#B\u001c\u0002\u0014\u0006]\u0015bAAKq\t)\u0011I\u001d:bsB!\u0011\u0011TAP\u001b\t\tYJC\u0002\u0002\u001e*\u000bA\u0001\\1oO&!\u0011\u0011UAN\u0005\u0019y%M[3di\"B\u0011qPAS\u0003W\u000bi\u000b\u0005\u0003\u0002X\u0005\u001d\u0016\u0002BAU\u00033\u0012!\u0002U1sC6,G/\u001a:t\u0003\u0011q\u0017-\\3\"\u0005\u0005=\u0016aJ*uCR,')Y2lK:$Wh\u001f\u0019~Y\u0001*6/\u001a+j[\u0016\u001cH/Y7q\u0019RT\b%\u0010\u0011|cu\u0004")
public class WindowJoinITCase
extends StreamingWithStateTestBase {
    private final boolean useTimestampLtz;
    private final ZoneId SHANGHAI_ZONE;

    @Parameters(name="StateBackend={0}, UseTimestampLtz = {1}")
    public static Collection<Object[]> parameters() {
        return WindowJoinITCase$.MODULE$.parameters();
    }

    public ZoneId SHANGHAI_ZONE() {
        return this.SHANGHAI_ZONE;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        this.env().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        FailingCollectionSource.reset();
        String dataId1 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        String dataIdWithLtz = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithLtzInShanghai());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(460).append("\n         |CREATE TABLE T1 (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object)(this.useTimestampLtz ? dataIdWithLtz : dataId1)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
        String dataId2 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowData2WithTimestamp());
        String dataIdWithLtz2 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowData2WithLtzInShanghai());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(460).append("\n         |CREATE TABLE T2 (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object)(this.useTimestampLtz ? dataIdWithLtz2 : dataId2)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
        this.tEnv().getConfig().setLocalTimeZone(this.SHANGHAI_ZONE());
    }

    @TestTemplate
    public void testInnerJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinOnWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(768).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z,4,Hi,b", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999,4,Hi,b", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1,Comment#3,b"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinOnWTFWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(809).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime),INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-09T16:00:20.999Z,4,Hi,b", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-09T16:00:35.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999,4,Hi,b", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999,1,Comment#3,b"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinOnWTFWithNegativeOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(812).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-09T16:00:18.999Z,4,Hi,b", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-09T16:00:38.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-10T00:00:18.999,4,Hi,b", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999,1,Comment#3,b"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinWithIsNotDistinctFromOnWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(788).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` IS NOT DISTINCT from R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z,4,Hi,b", "2020-10-09T16:00:32Z,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,7,null,null", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1,Comment#3,b"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999,4,Hi,b", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,7,null,null", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1,Comment#3,b"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinExists() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE EXISTS (\n        |SELECT * FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n        |        AND L.`name` = R.`name`)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinExistsWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(714).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE EXISTS (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`)\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE L.`name` IN (\n        |SELECT `name` FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinIN_WTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(724).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE L.`name` IN (\n         |  SELECT `name`\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`)\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z", "2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotExists() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE NOT EXISTS (\n        |SELECT * FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n        |        AND L.`name` = R.`name`)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotExistsWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(748).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE NOT EXISTS (\n         |  SELECT *\n         |    FROM TABLE(\n         |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`) AND L.`float` IS NOT NULL\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:01Z,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:02Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:03Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:04Z,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:08Z,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", "2020-10-09T16:00:32Z,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE L.`name` NOT IN (\n        |SELECT `name` FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotIN_WTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(714).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |  FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE L.`name` NOT IN (\n         |SELECT `name`\n         |FROM\n         |TABLE(TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) AND\n         | L.`float` IS NOT NULL\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-09T16:00:01Z,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:02Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:03Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:04Z,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", "2020-10-09T16:00:08Z,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testLeftJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |LEFT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,null"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testLeftJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |LEFT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testRightJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |RIGHT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testRightJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |RIGHT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testOuterJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end,\n        |uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |FULL OUTER JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,null,null,0,null", "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", "null,null,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testOuterJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end,\n        |uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |FULL OUTER JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0", "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowJoinITCase(StreamingWithStateTestBase.StateBackendMode mode, boolean useTimestampLtz) {
        this.useTimestampLtz = useTimestampLtz;
        super(mode);
        this.SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
    }
}

