/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractTableSink;
import org.apache.flink.table.planner.runtime.utils.TestingUpsertTableSink;
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Array$;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\u0005\u001dh\u0001B\u0001\u0003\u0001M\u0011\u0011c\u0012:pkB<\u0016N\u001c3po&#6)Y:f\u0015\t\u0019A!A\u0002tc2T!!\u0002\u0004\u0002\rM$(/Z1n\u0015\t9\u0001\"A\u0004sk:$\u0018.\\3\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\r\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/\u001a\u0005\t7\u0001\u0011\t\u0011)A\u00059\u0005!Qn\u001c3f!\ti\u0012G\u0004\u0002\u001f_9\u0011qD\f\b\u0003A5r!!\t\u0017\u000f\u0005\tZcBA\u0012+\u001d\t!\u0013F\u0004\u0002&Q5\taE\u0003\u0002(%\u00051AH]8pizJ\u0011!E\u0005\u0003\u001fAI!!\u0004\b\n\u0005-a\u0011BA\u0005\u000b\u0013\t9\u0001\"\u0003\u0002\u0018\r%\u0011\u0001GF\u0001\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/Z\u0005\u0003eM\u0012\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u0005A2\u0002\u0002C\u001b\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001c\u0002\u001fU\u001cX\rV5nKN$\u0018-\u001c9Mij\u0004\"a\u000e\u001e\u000e\u0003aR\u0011!O\u0001\u0006g\u000e\fG.Y\u0005\u0003wa\u0012qAQ8pY\u0016\fg\u000eC\u0003>\u0001\u0011\u0005a(\u0001\u0004=S:LGO\u0010\u000b\u0004\u007f\u0005\u0013\u0005C\u0001!\u0001\u001b\u0005\u0011\u0001\"B\u000e=\u0001\u0004a\u0002\"B\u001b=\u0001\u00041\u0004b\u0002#\u0001\u0005\u0004%\t!R\u0001\u000e'\"\u000bej\u0012%B\u0013~SvJT#\u0016\u0003\u0019\u0003\"a\u0012'\u000e\u0003!S!!\u0013&\u0002\tQLW.\u001a\u0006\u0002\u0017\u0006!!.\u0019<b\u0013\ti\u0005J\u0001\u0004[_:,\u0017\n\u001a\u0005\u0007\u001f\u0002\u0001\u000b\u0011\u0002$\u0002\u001dMC\u0015IT$I\u0003&{&l\u0014(FA!9\u0011\u000b\u0001b\u0001\n\u0003\u0011\u0016\u0001G;qg\u0016\u0014HoU8ve\u000e,7)\u001e:sK:\u001c\u0017\u0010R1uCV\t1\u000bE\u0002U3nk\u0011!\u0016\u0006\u0003-^\u000b\u0011\"[7nkR\f'\r\\3\u000b\u0005aC\u0014AC2pY2,7\r^5p]&\u0011!,\u0016\u0002\u0005\u0019&\u001cH\u000f\u0005\u0002]?6\tQL\u0003\u0002_\u0019\u0005)A/\u001f9fg&\u0011\u0001-\u0018\u0002\u0004%><\bB\u00022\u0001A\u0003%1+A\rvaN,'\u000f^*pkJ\u001cWmQ;se\u0016t7-\u001f#bi\u0006\u0004\u0003\"\u00023\u0001\t\u0003*\u0017A\u00022fM>\u0014X\rF\u0001g!\t9t-\u0003\u0002iq\t!QK\\5uQ\t\u0019'\u000e\u0005\u0002le6\tAN\u0003\u0002n]\u0006\u0019\u0011\r]5\u000b\u0005=\u0004\u0018a\u00026va&$XM\u001d\u0006\u0003cB\tQA[;oSRL!a\u001d7\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007\u000eC\u0003v\u0001\u0011\u0005Q-\u0001\u000euKN$XI^3oiRKW.Z*mS\u0012LgnZ,j]\u0012|w\u000f\u000b\u0002uoB\u00111\u000e_\u0005\u0003s2\u0014A\u0002V3tiR+W\u000e\u001d7bi\u0016DQa\u001f\u0001\u0005\u0002\u0015\f\u0011\u0004^3ti\u000e\u000b7oY1eS:<G+^7cY\u0016<\u0016N\u001c3po\"\u0012!p\u001e\u0005\u0006}\u0002!\t!Z\u0001\u001di\u0016\u001cH/T5o\u001b\u0006Dx+\u001b;i)Vl'\r\\5oO^Kg\u000eZ8xQ\tix\u000f\u0003\u0004\u0002\u0004\u0001!\t!Z\u0001#i\u0016\u001cHoV5oI><\u0018iZ4sK\u001e\fG/Z(o\u0007>t7\u000f^1oiZ\u000bG.^3)\u0007\u0005\u0005q\u000fC\u0004\u0002\n\u0001!\t!a\u0003\u00029Q,7\u000f\u001e)s_\u000e$\u0018.\\3DCN\u001c\u0017\rZ3XS:$wn^!hOV\ta\rK\u0002\u0002\b]Da!!\u0005\u0001\t\u0003)\u0017A\u0007;fgR,e/\u001a8u)&lWmU3tg&|gnV5oI><\bfAA\bo\"1\u0011q\u0003\u0001\u0005\u0002\u0015\fA\u0006^3ti\u00163XM\u001c;US6,G+^7cY&twmV5oI><x+\u001b;i\u00032dwn\u001e'bi\u0016tWm]:)\u0007\u0005Uq\u000f\u0003\u0004\u0002\u001e\u0001!\t!Z\u0001\"i\u0016\u001cHoV5oI><\u0018iZ4sK\u001e\fG/Z(o+B\u001cXM\u001d;T_V\u00148-\u001a\u0015\u0004\u000379\bBBA\u0012\u0001\u0011\u0005Q-\u0001\u001auKN$x+\u001b8e_^\fum\u001a:fO\u0006$Xm\u00148VaN,'\u000f^*pkJ\u001cWmV5uQ\u0006cGn\\<MCR,g.Z:tQ\r\t\tc\u001e\u0005\u0007\u0003S\u0001A\u0011A3\u0002eQ,7\u000f^,j]\u0012|w/Q4he\u0016<\u0017\r^3P]V\u00038/\u001a:u'>,(oY3QkNDGm\\<o/\u0006$XM]7be.D3!a\nx\u0011\u0019\ty\u0003\u0001C\u0001K\u0006\u0011C/Z:u/&tGm\\<BO\u001e\u0014XmZ1uK>s'+\u001a;sC\u000e$8\u000b\u001e:fC6D3!!\fx\u0011\u0019\t)\u0004\u0001C\u0001K\u0006)D/Z:u\t&\u001cH/\u001b8di\u0006;wmV5uQ6+'oZ3P]\u00163XM\u001c;US6,7+Z:tS>twI]8va^Kg\u000eZ8xQ\r\t\u0019d\u001e\u0005\b\u0003w\u0001A\u0011BA\u001f\u0003E9\u0018\u000e\u001e5MCR,g)\u001b:f\t\u0016d\u0017-\u001f\u000b\u0006M\u0006}\u0012Q\n\u0005\t\u0003\u0003\nI\u00041\u0001\u0002D\u0005YA/\u00192mK\u000e{gNZ5h!\u0011\t)%!\u0013\u000e\u0005\u0005\u001d#BA7\u000b\u0013\u0011\tY%a\u0012\u0003\u0017Q\u000b'\r\\3D_:4\u0017n\u001a\u0005\t\u0003\u001f\nI\u00041\u0001\u0002R\u0005A\u0011N\u001c;feZ\fG\u000e\u0005\u0003\u0002T\u0005uSBAA+\u0015\rI\u0015q\u000b\u0006\u0005\u00033\nY&\u0001\u0004d_6lwN\u001c\u0006\u0003[2IA!a\u0018\u0002V\t!A+[7f\u0011\u001d\t\u0019\u0007\u0001C\u0005\u0003K\nQ\u0002\\8dC2$\u0015\r^3US6,G\u0003BA4\u0003[\u00022aRA5\u0013\r\tY\u0007\u0013\u0002\u000e\u0019>\u001c\u0017\r\u001c#bi\u0016$\u0016.\\3\t\u0011\u0005=\u0014\u0011\ra\u0001\u0003c\n1\"\u001a9pG\"\u001cVmY8oIB\u0019q'a\u001d\n\u0007\u0005U\u0004H\u0001\u0003M_:<\u0007f\u0002\u0001\u0002z\u0005\u0015\u0015q\u0011\t\u0005\u0003w\n\t)\u0004\u0002\u0002~)\u0019\u0011q\u00107\u0002\u0013\u0015DH/\u001a8tS>t\u0017\u0002BAB\u0003{\u0012!\"\u0012=uK:$w+\u001b;i\u0003\u00151\u0018\r\\;fY\t\tIi\t\u0002\u0002\fB!\u0011QRAO\u001b\t\tyI\u0003\u0003\u0002\u0012\u0006M\u0015!\u00049be\u0006lW\r^3sSj,GM\u0003\u0003\u0002\u0016\u0006]\u0015AC3yi\u0016t7/[8og*\u0019\u0011/!'\u000b\u0007\u0005mE\"A\u0005uKN$X\u000f^5mg&!\u0011qTAH\u0005i\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^#yi\u0016t7/[8o\u000f\u001d\t\u0019K\u0001E\u0001\u0003K\u000b\u0011c\u0012:pkB<\u0016N\u001c3po&#6)Y:f!\r\u0001\u0015q\u0015\u0004\u0007\u0003\tA\t!!+\u0014\t\u0005\u001d\u00161\u0016\t\u0004o\u00055\u0016bAAXq\t1\u0011I\\=SK\u001aDq!PAT\t\u0003\t\u0019\f\u0006\u0002\u0002&\"A\u0011qWAT\t\u0003\tI,\u0001\u0006qCJ\fW.\u001a;feN$\"!a/\u0011\r\u0005u\u00161YAd\u001b\t\tyLC\u0002\u0002B*\u000bA!\u001e;jY&!\u0011QYA`\u0005)\u0019u\u000e\u001c7fGRLwN\u001c\t\u0006o\u0005%\u0017QZ\u0005\u0004\u0003\u0017D$!B!se\u0006L\b\u0003BAh\u0003+l!!!5\u000b\u0007\u0005M'*\u0001\u0003mC:<\u0017\u0002BAl\u0003#\u0014aa\u00142kK\u000e$\b\u0006CA[\u00037\f\t/a9\u0011\t\u00055\u0015Q\\\u0005\u0005\u0003?\fyI\u0001\u0006QCJ\fW.\u001a;feN\fAA\\1nK\u0006\u0012\u0011Q]\u0001('R\fG/\u001a\"bG.,g\u000eZ\u001f|aud\u0003%V:f)&lWm\u001d;b[BdEO\u001f\u0011>Am\fT\u0010")
public class GroupWindowITCase
extends StreamingWithStateTestBase {
    private final boolean useTimestampLtz;
    private final ZoneId SHANGHAI_ZONE;
    private final List<Row> upsertSourceCurrencyData;

    @Parameters(name="StateBackend={0}, UseTimestampLtz = {1}")
    public static Collection<Object[]> parameters() {
        return GroupWindowITCase$.MODULE$.parameters();
    }

    public ZoneId SHANGHAI_ZONE() {
        return this.SHANGHAI_ZONE;
    }

    public List<Row> upsertSourceCurrencyData() {
        return this.upsertSourceCurrencyData;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        String timestampDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.timestampData());
        String timestampLtzDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.timestampLtzData());
        this.tEnv().getConfig().setLocalTimeZone(this.SHANGHAI_ZONE());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(505).append("\n         |CREATE TABLE testTable (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | proctime as PROCTIME(),\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '0.01' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object)(this.useTimestampLtz ? timestampLtzDataId : timestampDataId)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
    }

    @TestTemplate
    public void testEventTimeSlidingWindow() {
        this.tEnv().createTemporarySystemFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  COUNT(DISTINCT `float`),\n        |  concat_distinct_agg(name)\n        |FROM testTable\n        |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hallo,1970-01-01T00:00,1969-12-31T16:00:00.004Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.004,1969-12-31T16:00:00.008Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.008,1969-12-31T16:00:00.012Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.012,1969-12-31T16:00:00.016Z,1,1,1,1,b", "Hello world,1970-01-01T00:00:00.016,1969-12-31T16:00:00.020Z,1,1,1,1,b", "Hello,1970-01-01T00:00,1969-12-31T16:00:00.004Z,2,2,2,2,a", "Hello,1970-01-01T00:00:00.004,1969-12-31T16:00:00.008Z,3,3,3,2,a|b", "Hi,1970-01-01T00:00,1969-12-31T16:00:00.004Z,1,1,1,1,a", "null,1970-01-01T00:00:00.028,1969-12-31T16:00:00.032Z,1,1,1,1,null", "null,1970-01-01T00:00:00.032,1969-12-31T16:00:00.036Z,1,1,1,1,null"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hallo,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.008,1970-01-01T00:00:00.012,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.012,1970-01-01T00:00:00.016,1,1,1,1,b", "Hello world,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,1,b", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.004,2,2,2,2,a", "Hello,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,3,3,3,2,a|b", "Hi,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "null,1970-01-01T00:00:00.028,1970-01-01T00:00:00.032,1,1,1,1,null", "null,1970-01-01T00:00:00.032,1970-01-01T00:00:00.036,1,1,1,1,null"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testCascadingTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT SUM(cnt)\n        |FROM (\n        |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS ts\n        |  FROM testTable\n        |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)\n        |)\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"9"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testMinMaxWithTumblingWindow() {
        this.tEnv().getConfig().set("table.exec.emit.early-fire.enabled", "true");
        this.tEnv().getConfig().set("table.exec.emit.early-fire.delay", "1000 ms");
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        | MAX(max_ts),\n        | MIN(min_ts),\n        | `string`\n        |FROM(\n        | SELECT\n        | `string`,\n        | `int`,\n        | MAX(rowtime) as max_ts,\n        | MIN(rowtime) as min_ts\n        | FROM testTable\n        | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))\n        |GROUP BY `string`\n      ")).stripMargin();
        TestingRetractSink sink = new TestingRetractSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1969-12-31T16:00:00.001Z,1969-12-31T16:00:00.001Z,Hi", "1969-12-31T16:00:00.002Z,1969-12-31T16:00:00.002Z,Hallo", "1969-12-31T16:00:00.007Z,1969-12-31T16:00:00.003Z,Hello", "1969-12-31T16:00:00.016Z,1969-12-31T16:00:00.008Z,Hello world", "1969-12-31T16:00:00.032Z,1969-12-31T16:00:00.032Z,null"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1970-01-01T00:00:00.001,1970-01-01T00:00:00.001,Hi", "1970-01-01T00:00:00.002,1970-01-01T00:00:00.002,Hallo", "1970-01-01T00:00:00.007,1970-01-01T00:00:00.003,Hello", "1970-01-01T00:00:00.016,1970-01-01T00:00:00.008,Hello world", "1970-01-01T00:00:00.032,1970-01-01T00:00:00.032,null"}));
        Assertions.assertThat((Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testWindowAggregateOnConstantValue() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(name)\n        |FROM testTable\n        | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1970-01-01T00:00:00.003,2", "1970-01-01T00:00:00.006,2", "1970-01-01T00:00:00.009,3", "1970-01-01T00:00:00.018,1", "1970-01-01T00:00:00.033,0"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testProctimeCascadeWindowAgg() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT\n        |  cnt AS key,\n        |  TUMBLE_START(pt1, INTERVAL '0.01' SECOND) AS window_start,\n        |  TUMBLE_END(pt1, INTERVAL '0.01' SECOND) AS window_start,\n        |  TUMBLE_PROCTIME(pt1, INTERVAL '0.01' SECOND) as window_proctime,\n        |  MAX(s1) AS v1,\n        |  MAX(e1) AS v2\n        | FROM\n        | (SELECT\n        |   TUMBLE_START(proctime, INTERVAL '0.005' SECOND) as s1,\n        |   TUMBLE_END(proctime, INTERVAL '0.005' SECOND) e1,\n        |   TUMBLE_PROCTIME(proctime, INTERVAL '0.005' SECOND) as pt1,\n        |   COUNT(name) as cnt\n        |  FROM testTable\n        |  GROUP BY 'a', TUMBLE(proctime, INTERVAL '0.005' SECOND)\n        |  ) as T\n        | GROUP BY cnt, TUMBLE(pt1, INTERVAL '0.01' SECOND)\n      ")).stripMargin();
        ResolvedSchema resolvedSchema = this.tEnv().sqlQuery(sql).getResolvedSchema();
        Assertions.assertThat((String)resolvedSchema.toString()).isEqualTo(new StringOps(Predef$.MODULE$.augmentString("\n           |(\n           |  `key` BIGINT NOT NULL,\n           |  `window_start` TIMESTAMP(3) NOT NULL,\n           |  `window_start0` TIMESTAMP(3) NOT NULL,\n           |  `window_proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,\n           |  `v1` TIMESTAMP(3) NOT NULL,\n           |  `v2` TIMESTAMP(3) NOT NULL\n           |)\n         ")).stripMargin().trim());
    }

    @TestTemplate
    public void testEventTimeSessionWindow() {
        if (this.useTimestampLtz) {
            return;
        }
        .colon.colon sessionData = new .colon.colon((Object)new Tuple4((Object)BoxesRunTime.boxToLong((long)1L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hello", (Object)"a"), (List)new .colon.colon((Object)new Tuple4((Object)BoxesRunTime.boxToLong((long)2L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello", (Object)"b"), (List)new .colon.colon((Object)new Tuple4((Object)BoxesRunTime.boxToLong((long)8L), (Object)BoxesRunTime.boxToInteger((int)8), (Object)"Hello", (Object)"a"), (List)new .colon.colon((Object)new Tuple4((Object)BoxesRunTime.boxToLong((long)9L), (Object)BoxesRunTime.boxToInteger((int)9), (Object)"Hello World", (Object)"b"), (List)new .colon.colon((Object)new Tuple4((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)4), (Object)"Hello", (Object)"c"), (List)new .colon.colon((Object)new Tuple4((Object)BoxesRunTime.boxToLong((long)16L), (Object)BoxesRunTime.boxToInteger((int)16), (Object)"Hello", (Object)"d"), (List)Nil$.MODULE$))))));
        DataStream stream = this.failingDataSource(sessionData, new CaseClassTypeInfo<Tuple4<Object, Object, String, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<Object, Object, String, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple4<Object, Object, String, String>> unused = new ScalaCaseClassSerializer<Tuple4<Object, Object, String, String>>(this, fieldSerializers){

                    public Tuple4<Object, Object, String, String> createInstance(Object[] fields) {
                        return new Tuple4((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]), (Object)((String)fields[3]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple4<Object, Object, String, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$4 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L));
        Table table = org.apache.flink.table.api.bridge.scala.package$.MODULE$.dataStreamConversions(stream).toTable(this.tEnv(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{(Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime(), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "int")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "string")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "name"))}));
        this.tEnv().createTemporaryView("T1", table);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  SESSION_START(rowtime, INTERVAL '0.005' SECOND),\n        |  SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  SUM(`int`),\n        |  COUNT(DISTINCT name)\n        |FROM T1\n        |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1", "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1", "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTumblingWindowWithAllowLateness() {
        if (this.useTimestampLtz) {
            return;
        }
        this.tEnv().getConfig().set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_ALLOW_LATENESS(), (Object)Duration.ofMillis(10L));
        this.withLateFireDelay(this.tEnv().getConfig(), Time.of((long)0L, (TimeUnit)TimeUnit.NANOSECONDS));
        List data = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple3[]{new Tuple3((Object)BoxesRunTime.boxToLong((long)1L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hi"), new Tuple3((Object)BoxesRunTime.boxToLong((long)2L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)8L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello world"), new Tuple3((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)16L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello world"), new Tuple3((Object)BoxesRunTime.boxToLong((long)9L), (Object)BoxesRunTime.boxToInteger((int)4), (Object)"Hello world"), new Tuple3((Object)BoxesRunTime.boxToLong((long)3L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hi")}));
        DataStream stream = this.failingDataSource(data, new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$5 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(0L));
        Table table = org.apache.flink.table.api.bridge.scala.package$.MODULE$.dataStreamConversions(stream).toTable(this.tEnv(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "long")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "int")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "string")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        this.tEnv().createTemporaryView("T1", table);
        this.tEnv().createTemporarySystemFunction("weightAvgFun", JavaUserDefinedAggFunctions.WeightedAvg.class);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,\n        |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(DISTINCT `long`),\n        |  COUNT(`int`),\n        |  CAST(AVG(`int`) AS INT),\n        |  weightAvgFun(`long`, `int`),\n        |  MIN(`int`),\n        |  MAX(`int`),\n        |  SUM(`int`)\n        |FROM T1\n        |GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        Table result = this.tEnv().sqlQuery(sql);
        TypeInformation[] fieldTypes = (TypeInformation[])((Object[])new TypeInformation[]{Types.STRING(), Types.LOCAL_DATE_TIME(), Types.LOCAL_DATE_TIME(), Types.LONG(), Types.LONG(), Types.INT(), Types.LONG(), Types.INT(), Types.INT(), Types.INT()});
        String[] fieldNames = (String[])((TraversableOnce)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])fieldTypes)).indices().map((Function1 & Serializable & scala.Serializable)x$1 -> GroupWindowITCase.$anonfun$testEventTimeTumblingWindowWithAllowLateness$1(BoxesRunTime.unboxToInt((Object)x$1)), IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class));
        TestingUpsertTableSink sink = new TestingUpsertTableSink(new int[]{0, 1}).configure(fieldNames, (TypeInformation<?>[])fieldTypes);
        ((TableEnvironmentInternal)this.tEnv()).registerTableSinkInternal("MySink", (TableSink)sink);
        result.executeInsert("MySink").await();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1,1,1,1,1,1,1", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,2,3,2,3,2,3,7", "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1,1,3,16,3,3,3", "Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,2,2,3,8,3,4,7"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getUpsertResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testWindowAggregateOnUpsertSource() {
        this.env().setParallelism(1);
        String upsertSourceDataId = TestValuesTableFactory.registerData(this.upsertSourceCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(621).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(upsertSourceDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |currency,\n        |COUNT(1) AS cnt,\n        |MAX(rate),\n        |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM upsert_currency\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"US Dollar,1,102,1970-01-01T00:00,1970-01-01T00:00:05", "Yen,1,1,1970-01-01T00:00,1970-01-01T00:00:05", "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20", "RMB,1,702,1970-01-01T00:00,1970-01-01T00:00:05"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testWindowAggregateOnUpsertSourceWithAllowLateness() {
        this.tEnv().getConfig().set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_ALLOW_LATENESS(), (Object)Duration.ofSeconds(15L));
        this.withLateFireDelay(this.tEnv().getConfig(), Time.of((long)0L, (TimeUnit)TimeUnit.NANOSECONDS));
        String upsertSourceDataId = TestValuesTableFactory.registerData(this.upsertSourceCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(621).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(upsertSourceDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |currency,\n        |COUNT(1) AS cnt,\n        |MAX(rate),\n        |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM upsert_currency\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        Table table = this.tEnv().sqlQuery(sql);
        TableSchema schema = table.getSchema();
        TestingRetractTableSink sink = new TestingRetractTableSink().configure(schema.getFieldNames(), (TypeInformation<?>[])((TypeInformation[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])schema.getFieldDataTypes())).map((Function1 & Serializable & scala.Serializable)x$2 -> (DataType)x$2.nullable(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(DataType.class))))).map((Function1 & Serializable & scala.Serializable)x$1 -> TypeInfoDataTypeConverter.fromDataTypeToTypeInfo((DataType)x$1), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(TypeInformation.class)))));
        ((TableEnvironmentInternal)this.tEnv()).registerTableSinkInternal("MySink1", (TableSink)sink);
        table.executeInsert("MySink1").await();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"US Dollar,1,104,1970-01-01T00:00,1970-01-01T00:00:05", "Yen,1,1,1970-01-01T00:00,1970-01-01T00:00:05", "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20"}));
        Assertions.assertThat((Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testWindowAggregateOnUpsertSourcePushdownWatermark() {
        this.env().setParallelism(1);
        String upsertSourceDataId = TestValuesTableFactory.registerData(this.upsertSourceCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(621).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(upsertSourceDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,\n        |MAX(rate) AS max_rate\n        |FROM upsert_currency\n        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1970-01-01T00:00,1970-01-01T00:00:05,702", "1970-01-01T00:00:15,1970-01-01T00:00:20,118"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testWindowAggregateOnRetractStream() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |`string`,\n        |TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,\n        |TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) as w_end,\n        |COUNT(1) AS cnt\n        |FROM\n        | (\n        | SELECT `string`, rowtime\n        | FROM (\n        |  SELECT *,\n        |  ROW_NUMBER() OVER (PARTITION BY `string` ORDER BY rowtime DESC) as rowNum\n        |   FROM testTable\n        | )\n        | WHERE rowNum = 1\n        |)\n        |GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1", "Hallo,1970-01-01T00:00,1970-01-01T00:00:00.005,1", "Hello,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1", "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1", "null,1970-01-01T00:00:00.030,1970-01-01T00:00:00.035,1"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testDistinctAggWithMergeOnEventTimeSessionGroupWindow() {
        if (this.useTimestampLtz) {
            return;
        }
        .colon.colon sessionWindowTestData = new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)1L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)2L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)8L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)10L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)9L), (Object)BoxesRunTime.boxToInteger((int)9), (Object)"Hello World"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hello"), (List)new .colon.colon((Object)new Tuple3((Object)BoxesRunTime.boxToLong((long)16L), (Object)BoxesRunTime.boxToInteger((int)16), (Object)"Hello"), (List)Nil$.MODULE$)))))));
        DataStream stream = this.failingDataSource(sessionWindowTestData, new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$6 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$6 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L));
        Table table = org.apache.flink.table.api.bridge.scala.package$.MODULE$.dataStreamConversions(stream).toTable(this.tEnv(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}));
        this.tEnv().createTemporaryView("MyTable", table);
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT c,\n        |   COUNT(DISTINCT b),\n        |   SESSION_END(rowtime, INTERVAL '0.005' SECOND)\n        |FROM MyTable\n        |GROUP BY c, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sqlQuery)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hello World,1,1970-01-01T00:00:00.014", "Hello,1,1970-01-01T00:00:00.021", "Hello,3,1970-01-01T00:00:00.015"}));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    private void withLateFireDelay(TableConfig tableConfig, Time interval) {
        long intervalInMillis = interval.toMilliseconds();
        Duration lateFireDelay = tableConfig.getOptional(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY()).orElse(null);
        if (lateFireDelay != null && lateFireDelay.toMillis() != intervalInMillis) {
            throw new RuntimeException("Currently not support different lateFireInterval configs in one job");
        }
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_ENABLED(), (Object)BoxesRunTime.boxToBoolean((boolean)true));
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY(), (Object)Duration.ofMillis(intervalInMillis));
    }

    private LocalDateTime localDateTime(long epochSecond) {
        return LocalDateTime.ofEpochSecond(epochSecond, 0, ZoneOffset.UTC);
    }

    public static final /* synthetic */ String $anonfun$testEventTimeTumblingWindowWithAllowLateness$1(int x$1) {
        return new StringBuilder(1).append("f").append(x$1).toString();
    }

    public GroupWindowITCase(StreamingWithStateTestBase.StateBackendMode mode, boolean useTimestampLtz) {
        this.useTimestampLtz = useTimestampLtz;
        super(mode);
        this.SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
        this.upsertSourceCurrencyData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{TestValuesTableFactory.changelogRow("+U", "Euro", "no1", 114L, this.localDateTime(1L)), TestValuesTableFactory.changelogRow("+U", "US Dollar", "no1", 100L, this.localDateTime(1L)), TestValuesTableFactory.changelogRow("+U", "US Dollar", "no1", 102L, this.localDateTime(2L)), TestValuesTableFactory.changelogRow("+U", "Yen", "no1", 1L, this.localDateTime(3L)), TestValuesTableFactory.changelogRow("+U", "RMB", "no1", 702L, this.localDateTime(4L)), TestValuesTableFactory.changelogRow("+U", "Euro", "no1", 118L, this.localDateTime(18L)), TestValuesTableFactory.changelogRow("+U", "US Dollar", "no1", 104L, this.localDateTime(4L)), TestValuesTableFactory.changelogRow("-D", "RMB", "no1", 702L, this.localDateTime(4L))}));
    }
}

