/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\u0005eb\u0001B\u0001\u0003\u0001M\u0011\u0001cV5oI><(+\u00198l\u0013R\u001b\u0015m]3\u000b\u0005\r!\u0011aA:rY*\u0011QAB\u0001\u0007gR\u0014X-Y7\u000b\u0005\u001dA\u0011a\u0002:v]RLW.\u001a\u0006\u0003\u0013)\tq\u0001\u001d7b]:,'O\u0003\u0002\f\u0019\u0005)A/\u00192mK*\u0011QBD\u0001\u0006M2Lgn\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001!\u0002CA\u000b\u0019\u001b\u00051\"BA\f\u0007\u0003\u0015)H/\u001b7t\u0013\tIbC\u0001\u000eTiJ,\u0017-\\5oO^KG\u000f[*uCR,G+Z:u\u0005\u0006\u001cX\r\u0003\u0005\u001c\u0001\t\u0005\t\u0015!\u0003\u001d\u0003\u0011iw\u000eZ3\u0011\u0005u\tdB\u0001\u00100\u001d\tybF\u0004\u0002![9\u0011\u0011\u0005\f\b\u0003E-r!a\t\u0016\u000f\u0005\u0011JcBA\u0013)\u001b\u00051#BA\u0014\u0013\u0003\u0019a$o\\8u}%\t\u0011#\u0003\u0002\u0010!%\u0011QBD\u0005\u0003\u00171I!!\u0003\u0006\n\u0005\u001dA\u0011BA\f\u0007\u0013\t\u0001d#\u0001\u000eTiJ,\u0017-\\5oO^KG\u000f[*uCR,G+Z:u\u0005\u0006\u001cX-\u0003\u00023g\t\u00012\u000b^1uK\n\u000b7m[3oI6{G-\u001a\u0006\u0003aYAQ!\u000e\u0001\u0005\u0002Y\na\u0001P5oSRtDCA\u001c:!\tA\u0004!D\u0001\u0003\u0011\u0015YB\u00071\u0001\u001d\u0011\u0015Y\u0004\u0001\"\u0011=\u0003\u0019\u0011WMZ8sKR\tQ\b\u0005\u0002?\u00036\tqHC\u0001A\u0003\u0015\u00198-\u00197b\u0013\t\u0011uH\u0001\u0003V]&$\bF\u0001\u001eE!\t)E*D\u0001G\u0015\t9\u0005*A\u0002ba&T!!\u0013&\u0002\u000f),\b/\u001b;fe*\u00111\nE\u0001\u0006UVt\u0017\u000e^\u0005\u0003\u001b\u001a\u0013!BQ3g_J,W)Y2i\u0011\u0015y\u0005\u0001\"\u0001=\u0003A!Xm\u001d;Uk6\u0014G.Z,j]\u0012|w\u000f\u000b\u0002O#B\u0011QIU\u0005\u0003'\u001a\u0013A\u0002V3tiR+W\u000e\u001d7bi\u0016DQ!\u0016\u0001\u0005\u0002q\na\u0004^3tiR+XN\u00197f/&tGm\\<XSRD'+\u00198l\u001f\u001a47/\u001a;)\u0005Q\u000b\u0006\"\u0002-\u0001\t\u0003a\u0014!\t;fgR$V/\u001c2mK^Kg\u000eZ8x/&$\bn\\;u%\u0006t7NT;nE\u0016\u0014\bFA,R\u0011\u0015Y\u0006\u0001\"\u0001=\u0003M!Xm\u001d;Uk6\u0014G.Z,j]\u0012|w\u000f\u0016,GQ\tQ\u0016\u000bC\u0003_\u0001\u0011\u0005A(A\u000fuKN$H+^7cY\u0016<\u0016N\u001c3poR3fiV5uQ>3gm]3uQ\ti\u0016\u000bC\u0003b\u0001\u0011\u0005A(A\u0013uKN$H+^7cY\u0016<\u0016N\u001c3poR3fiV5uQ:+w-\u0019;jm\u0016|eMZ:fi\"\u0012\u0001-\u0015\u0005\u0006I\u0002!\t\u0001P\u0001\u001ci\u0016\u001cH\u000fV;nE2,w+\u001b8e_^$fKR,ji\"\u001c\u0015\r\\2)\u0005\r\f\u0006\"B4\u0001\t\u0003a\u0014!\u0004;fgRDu\u000e],j]\u0012|w\u000f\u000b\u0002g#\")!\u000e\u0001C\u0001y\u0005YB/Z:u\u0011>\u0004x+\u001b8e_^<\u0016\u000e\u001e5SC:\\wJ\u001a4tKRD#![)\t\u000b5\u0004A\u0011\u0001\u001f\u0002=Q,7\u000f\u001e%pa^Kg\u000eZ8x/&$\bn\\;u%\u0006t7NT;nE\u0016\u0014\bF\u00017R\u0011\u0015\u0001\b\u0001\"\u0001=\u0003A!Xm\u001d;I_B<\u0016N\u001c3poR3f\t\u000b\u0002p#\")1\u000f\u0001C\u0001y\u0005AB/Z:u\u0011>\u0004x+\u001b8e_^$fKR,ji\"\u001c\u0015\r\\2)\u0005I\f\u0006\"\u0002<\u0001\t\u0003a\u0014A\u0005;fgR\u001cU/\\;mCR,w+\u001b8e_^D#!^)\t\u000be\u0004A\u0011\u0001\u001f\u0002AQ,7\u000f^\"v[Vd\u0017\r^3XS:$wn^,ji\"\u0014\u0016M\\6PM\u001a\u001cX\r\u001e\u0015\u0003qFCQ\u0001 \u0001\u0005\u0002q\n1\u0005^3ti\u000e+X.\u001e7bi\u0016<\u0016N\u001c3po^KG\u000f[8viJ\u000bgn\u001b(v[\n,'\u000f\u000b\u0002|#\")q\u0010\u0001C\u0001y\u0005AA/Z:u)>\u0004\u0018\u0007\u000b\u0002\u007f#\"1\u0011Q\u0001\u0001\u0005\u0002q\nQ\u0003^3ti\u000e+X.\u001e7bi\u0016<\u0016N\u001c3poR3f\tK\u0002\u0002\u0004ECa!a\u0003\u0001\t\u0003a\u0014!\b;fgR\u001cU/\\;mCR,w+\u001b8e_^$fKR,ji\"\u001c\u0015\r\\2)\u0007\u0005%\u0011\u000bK\u0004\u0001\u0003#\ti\"a\b\u0011\t\u0005M\u0011\u0011D\u0007\u0003\u0003+Q1!a\u0006G\u0003%)\u0007\u0010^3og&|g.\u0003\u0003\u0002\u001c\u0005U!AC#yi\u0016tGmV5uQ\u0006)a/\u00197vK2\u0012\u0011\u0011E\u0012\u0003\u0003G\u0001B!!\n\u000265\u0011\u0011q\u0005\u0006\u0005\u0003S\tY#A\u0007qCJ\fW.\u001a;fe&TX\r\u001a\u0006\u0005\u0003[\ty#\u0001\u0006fqR,gn]5p]NT1aSA\u0019\u0015\r\t\u0019\u0004D\u0001\ni\u0016\u001cH/\u001e;jYNLA!a\u000e\u0002(\tQ\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;FqR,gn]5p]\u0002")
public class WindowRankITCase
extends StreamingWithStateTestBase {
    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        this.env().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        FailingCollectionSource.reset();
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(697).append("\n                       |CREATE TABLE T1 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(dataId).append("',\n                       | 'failing-source' = 'true'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().createFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
    }

    @TestTemplate
    public void testTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithNegativeOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-10T00:00:18.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:29,2020-10-10T00:00:34,2020-10-10T00:00:33.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `int`,\n        |  `string`,\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n        |  FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,null,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2,Comment#1,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "3,Comment#2,a,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "3,Hello,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-09T23:59:55,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Comment#2|Hi|Comment#1,1", "a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:05,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,1", "b,2020-10-10T00:00:10,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:25,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowTVF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:10,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowTVFWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `int`,\n        |  `string`,\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n        |  FROM TABLE(\n        |      HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,null,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2,Comment#1,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "5,null,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "5,Hi,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "3,Hello,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "3,Comment#2,a,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "3,Hello,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "4,Hi,b,2020-10-10T00:00:10,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "7,null,null,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "1,Comment#3,b,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2,1", "a,2020-10-10T00:00,2020-10-10T00:00:15,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2,1", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:30,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0,null,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3,2"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTop1() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 1\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,1", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowTVF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      CUMULATE(\n         |        TABLE T1,\n         |        DESCRIPTOR(rowtime),\n         |        INTERVAL '5' SECOND,\n         |        INTERVAL '15' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:30,2020-10-10T00:00:29.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowTVFWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `int`,\n        |  `string`,\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n        |  FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,null,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2,Comment#1,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "5,null,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "5,Hi,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "5,null,a,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "5,Hi,a,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "6,Hi,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "3,Hello,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "3,Hello,b,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:30,2020-10-10T00:00:29.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowRankITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

