/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.StringContext;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005Ub\u0001B\u0001\u0003\u0001M\u0011!$T5oS\n\u000bGo\u00195J]R,'O^1m\u0013:4WM\u001d+fgRT!a\u0001\u0003\u0002\u0007M\fHN\u0003\u0002\u0006\r\u000511\u000f\u001e:fC6T!a\u0002\u0005\u0002\tAd\u0017M\u001c\u0006\u0003\u0013)\tq\u0001\u001d7b]:,'O\u0003\u0002\f\u0019\u0005)A/\u00192mK*\u0011QBD\u0001\u0006M2Lgn\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u0001!\u0002CA\u000b\u0019\u001b\u00051\"BA\f\t\u0003\u0015)H/\u001b7t\u0013\tIbCA\u0007UC\ndW\rV3ti\n\u000b7/\u001a\u0005\u00067\u0001!\t\u0001H\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003u\u0001\"A\b\u0001\u000e\u0003\tAq\u0001\t\u0001C\u0002\u0013%\u0011%\u0001\u0003vi&dW#\u0001\u0012\u0011\u0005U\u0019\u0013B\u0001\u0013\u0017\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0011\u00191\u0003\u0001)A\u0005E\u0005)Q\u000f^5mA!9\u0001\u0006\u0001b\u0001\n\u0003I\u0013AB*U%&su)F\u0001+!\tY\u0003'D\u0001-\u0015\tic&A\u0004m_\u001eL7-\u00197\u000b\u0005=R\u0011!\u0002;za\u0016\u001c\u0018BA\u0019-\u0005-1\u0016M]\"iCJ$\u0016\u0010]3\t\rM\u0002\u0001\u0015!\u0003+\u0003\u001d\u0019FKU%O\u000f\u0002Bq!\u000e\u0001C\u0002\u0013\u0005a'\u0001\u0003M\u001f:;U#A\u001c\u0011\u0005-B\u0014BA\u001d-\u0005)\u0011\u0015nZ%oiRK\b/\u001a\u0005\u0007w\u0001\u0001\u000b\u0011B\u001c\u0002\u000b1{ej\u0012\u0011\t\u000fu\u0002!\u0019!C\u0001}\u0005\u0019\u0011J\u0014+\u0016\u0003}\u0002\"a\u000b!\n\u0005\u0005c#aB%oiRK\b/\u001a\u0005\u0007\u0007\u0002\u0001\u000b\u0011B \u0002\t%sE\u000b\t\u0005\u0006\u000b\u0002!\tAR\u0001\u0006g\u0016$X\u000f\u001d\u000b\u0002\u000fB\u0011\u0001jS\u0007\u0002\u0013*\t!*A\u0003tG\u0006d\u0017-\u0003\u0002M\u0013\n!QK\\5uQ\t!e\n\u0005\u0002P-6\t\u0001K\u0003\u0002R%\u0006\u0019\u0011\r]5\u000b\u0005M#\u0016a\u00026va&$XM\u001d\u0006\u0003+B\tQA[;oSRL!a\u0016)\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007\u000eC\u0003Z\u0001\u0011\u0005a)A\tuKN$X*\u001b8j\u0005\u0006$8\r[(oYfD#\u0001W.\u0011\u0005=c\u0016BA/Q\u0005\u0011!Vm\u001d;\t\u000b}\u0003A\u0011\u0001$\u0002=Q,7\u000f^'j]&\u0014\u0015\r^2i\u001f:d\u0017PR8s\t\u0006$\u0018m\u0015;sK\u0006l\u0007F\u00010\\\u0011\u0015\u0011\u0007\u0001\"\u0001G\u0003\u0001\"Xm\u001d;SK\u0012,h\u000eZ1oi^\u000bG/\u001a:nCJ\\G)\u001a4j]&$\u0018n\u001c8)\u0005\u0005\\\u0006\"B3\u0001\t\u00031\u0015a\u0006;fgR<\u0016N\u001c3po^KG\u000f[#be2Lh)\u001b:fQ\t!7\fC\u0003i\u0001\u0011\u0005a)A\tuKN$x+\u001b8e_^\u001c\u0015m]2bI\u0016D#aZ.\t\u000b-\u0004A\u0011\u0001$\u0002;Q,7\u000f^%oi\u0016\u0014h/\u00197K_&tw+\u001b;i\u001b&t\u0017NQ1uG\"D#A[.\t\u000b9\u0004A\u0011\u0001$\u0002AQ,7\u000f\u001e*poRLW.\u001a*poN|e/\u001a:XSRDW*\u001b8j\u0005\u0006$8\r\u001b\u0015\u0003[nCQ!\u001d\u0001\u0005\u0002\u0019\u000b!\u0006^3tiR+W\u000e]8sC2$\u0016M\u00197f\rVt7\r^5p]*{\u0017N\\,ji\"l\u0015N\\5CCR\u001c\u0007\u000e\u000b\u0002q7\")A\u000f\u0001C\u0001\r\u0006\u0001C/Z:u\u001bVdG/[(qKJ\fGo\u001c:OK\u0016$7oV1uKJl\u0017M]62Q\t\u00198\fC\u0003x\u0001\u0011\u0005a)\u0001\u0011uKN$X*\u001e7uS>\u0003XM]1u_JtU-\u001a3t/\u0006$XM]7be.\u0014\u0004F\u0001<\\\u0011\u0015Q\b\u0001\"\u0001G\u0003\u0001\"Xm\u001d;Nk2$\u0018n\u00149fe\u0006$xN\u001d(fK\u0012\u001cx+\u0019;fe6\f'o[\u001a)\u0005e\\\u0006\"B?\u0001\t\u00031\u0015\u0001\b;fgRlU\u000f\u001c;ja2,w+\u001b8e_^\fum\u001a:fO\u0006$Xm\u001d\u0015\u0003ynCa!!\u0001\u0001\t\u00031\u0015\u0001\n;fgRl\u0015N\\5CCR\u001c\u0007n\u00148ECR\f7\u000b\u001e:fC6<\u0016\u000e\u001e5S_^$\u0016.\\3)\u0005}\\\u0006BBA\u0004\u0001\u0011\u0005a)\u0001\u0018uKN$xJ^3s/&tGm\\<NS:L')\u0019;dQ>sG)\u0019;b'R\u0014X-Y7XSRD'k\\<US6,\u0007fAA\u00037\"9\u0011Q\u0002\u0001\u0005\n\u0005=\u0011AE<ji\",\u0015M\u001d7z\r&\u0014X\rR3mCf$RaRA\t\u0003?A\u0001\"a\u0005\u0002\f\u0001\u0007\u0011QC\u0001\fi\u0006\u0014G.Z\"p]\u001aLw\r\u0005\u0003\u0002\u0018\u0005mQBAA\r\u0015\t\t&\"\u0003\u0003\u0002\u001e\u0005e!a\u0003+bE2,7i\u001c8gS\u001eD\u0001\"!\t\u0002\f\u0001\u0007\u00111E\u0001\tS:$XM\u001d<bYB!\u0011QEA\u0019\u001b\t\t9C\u0003\u0003\u0002*\u0005-\u0012\u0001\u0002;j[\u0016TA!!\f\u00020\u000511m\\7n_:T!!\u0015\u0007\n\t\u0005M\u0012q\u0005\u0002\u0005)&lW\r")
public class MiniBatchIntervalInferTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());
    private final VarCharType STRING = VarCharType.STRING_TYPE;
    private final BigIntType LONG = new BigIntType();
    private final IntType INT = new IntType();

    private StreamTableTestUtil util() {
        return this.util;
    }

    public VarCharType STRING() {
        return this.STRING;
    }

    public BigIntType LONG() {
        return this.LONG;
    }

    public IntType INT() {
        return this.INT;
    }

    @BeforeEach
    public void setup() {
        this.util().addDataStream("MyDataStream1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$5 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().addDataStream("MyDataStream2", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$6 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$6 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE MyTable1 (\n                     |  `a` INT,\n                     |  `b` STRING,\n                     |  `c` BIGINT,\n                     |  proctime AS PROCTIME(),\n                     |  rowtime TIMESTAMP(3)\n                     |) WITH (\n                     |  'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE wmTable1 (\n                     |  WATERMARK FOR rowtime AS rowtime\n                     |) LIKE MyTable1 (INCLUDING ALL)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE wmTable2 (\n                     |  WATERMARK FOR rowtime AS rowtime\n                     |) LIKE MyTable1 (INCLUDING ALL)\n                     |")).stripMargin());
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
    }

    @Test
    public void testMiniBatchOnly() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b";
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMiniBatchOnlyForDataStream() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b";
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRedundantWatermarkDefinition() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b";
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testWindowWithEarlyFire() {
        TableConfig tableConfig = this.util().tableEnv().getConfig();
        tableConfig.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        this.withEarlyFireDelay(tableConfig, Time.milliseconds((long)500L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, SUM(cnt)\n        | FROM (\n        |   SELECT b,\n        |     COUNT(a) as cnt,\n        |     HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,\n        |     HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end\n        |   FROM wmTable1\n        |   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        | )\n        | GROUP BY b\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testWindowCascade() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(3L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b,\n        |   SUM(cnt)\n        | FROM (\n        |   SELECT b,\n        |     COUNT(a) as cnt,\n        |     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt\n        |   FROM wmTable1\n        |   GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)\n        | )\n        | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testIntervalJoinWithMiniBatch() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, COUNT(a)\n        | FROM (\n        |   SELECT t1.a as a, t1.b as b\n        |   FROM\n        |     wmTable1 as t1 JOIN wmTable2 as t2\n        |   ON\n        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |     t2.rowtime + INTERVAL '10' SECOND\n        | )\n        | GROUP BY b\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowtimeRowsOverWithMiniBatch() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT cnt, COUNT(c)\n        | FROM (\n        |   SELECT c, COUNT(a)\n        |   OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt\n        |   FROM wmTable1\n        | )\n        | GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTemporalTableFunctionJoinWithMiniBatch() {
        this.util().addTableWithWatermark("Orders", this.util().tableEnv().from("MyDataStream1"), "rowtime", 0L);
        this.util().addTableWithWatermark("RatesHistory", this.util().tableEnv().from("MyDataStream2"), "rowtime", 0L);
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        this.util().addTemporarySystemFunction("Rates", (UserDefinedFunction)this.util().tableEnv().from("RatesHistory").createTemporalTableFunction(package$.MODULE$.FieldExpression(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"rowtime"}))).$((Seq)Nil$.MODULE$), package$.MODULE$.FieldExpression(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b"}))).$((Seq)Nil$.MODULE$)));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT r_a, COUNT(o_a)\n        | FROM (\n        |   SELECT o.a as o_a, r.a as r_a\n        |   FROM Orders As o,\n        |   LATERAL TABLE (Rates(o.rowtime)) as r\n        |   WHERE o.b = r.b\n        | )\n        | GROUP BY r_a\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testMultiOperatorNeedsWatermark1() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT\n        |   b, COUNT(a),\n        |   TUMBLE_START(rt, INTERVAL '5' SECOND),\n        |   TUMBLE_END(rt, INTERVAL '5' SECOND)\n        | FROM (\n        |   SELECT t1.a as a, t1.b as b, t1.rowtime as rt\n        |   FROM\n        |     wmTable1 as t1 JOIN wmTable2 as t2\n        |   ON\n        |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n        |     t2.rowtime + INTERVAL '10' SECOND\n        | )\n        | GROUP BY b,TUMBLE(rt, INTERVAL '5' SECOND)\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMultiOperatorNeedsWatermark2() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(6L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT b, COUNT(a)\n        | OVER (PARTITION BY b ORDER BY rt ROWS BETWEEN 5 preceding AND CURRENT ROW)\n        | FROM (\n        |  SELECT t1.a as a, t1.b as b, t1.rt as rt\n        |  FROM\n        |  (\n        |    SELECT b,\n        |     COUNT(a) as a,\n        |     TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt\n        |    FROM wmTable1\n        |    GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)\n        |  ) as t1\n        |  JOIN\n        |  (\n        |    SELECT b,\n        |     COUNT(a) as a,\n        |     HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt\n        |    FROM wmTable2\n        |    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        |  ) as t2\n        |  ON\n        |    t1.a = t2.a AND t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND\n        |    t2.rt + INTERVAL '10' SECOND\n        | )\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMultiOperatorNeedsWatermark3() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(6L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |  SELECT t1.a, t1.b\n        |  FROM (\n        |    SELECT a, COUNT(b) as b FROM MyTable1 GROUP BY a\n        |  ) as t1\n        |  JOIN (\n        |    SELECT b, COUNT(a) as a\n        |    FROM (\n        |      SELECT b, COUNT(a) as a,\n        |         HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt\n        |      FROM wmTable1\n        |      GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)\n        |    )\n        |    GROUP BY b\n        |  ) as t2\n        |  ON t1.a = t2.a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testMultipleWindowAggregates() {
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE T1 (\n                     | id1 INT,\n                     | rowtime TIMESTAMP(3),\n                     | `text` STRING,\n                     | WATERMARK FOR rowtime AS rowtime\n                     |) WITH (\n                     | 'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE T2 (\n                     | id2 INT,\n                     | rowtime TIMESTAMP(3),\n                     | cnt INT,\n                     | name STRING,\n                     | goods STRING,\n                     | WATERMARK FOR rowtime AS rowtime\n                     |) WITH (\n                     | 'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofMillis(500L));
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, (Object)BoxesRunTime.boxToLong((long)300L));
        Table table1 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1, T1.rowtime AS ts, text\n                                          |  FROM T1, T2\n                                          |WHERE id1 = id2\n                                          |      AND T1.rowtime > T2.rowtime - INTERVAL '5' MINUTE\n                                          |      AND T1.rowtime < T2.rowtime + INTERVAL '3' MINUTE\n      ")).stripMargin());
        this.util().tableEnv().createTemporaryView("TempTable1", table1);
        Table table2 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1,\n                                          |    LISTAGG(text, '#') as text,\n                                          |    TUMBLE_ROWTIME(ts, INTERVAL '6' SECOND) as ts\n                                          |FROM TempTable1\n                                          |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1\n      ")).stripMargin());
        this.util().tableEnv().createTemporaryView("TempTable2", table2);
        Table table3 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                               |SELECT id1,\n                               |    LISTAGG(text, '*')\n                               |FROM TempTable2\n                               |GROUP BY HOP(ts, INTERVAL '12' SECOND, INTERVAL '4' SECOND), id1\n      ")).stripMargin());
        AppendStreamTableSink<Row> appendSink1 = this.util().createAppendTableSink((String[])((Object[])new String[]{"a", "b"}), (LogicalType[])((Object[])new LogicalType[]{this.INT(), this.STRING()}));
        ((TableEnvironmentInternal)this.util().tableEnv()).registerTableSinkInternal("appendSink1", appendSink1);
        stmtSet.addInsert("appendSink1", table3);
        Table table4 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1,\n                                          |    LISTAGG(text, '-')\n                                          |FROM TempTable1\n                                          |GROUP BY TUMBLE(ts, INTERVAL '9' SECOND), id1\n      ")).stripMargin());
        AppendStreamTableSink<Row> appendSink2 = this.util().createAppendTableSink((String[])((Object[])new String[]{"a", "b"}), (LogicalType[])((Object[])new LogicalType[]{this.INT(), this.STRING()}));
        ((TableEnvironmentInternal)this.util().tableEnv()).registerTableSinkInternal("appendSink2", appendSink2);
        stmtSet.addInsert("appendSink2", table4);
        Table table5 = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                          |SELECT id1,\n                                          |    COUNT(text)\n                                          |FROM TempTable2\n                                          |GROUP BY id1\n      ")).stripMargin());
        RetractStreamTableSink<Row> appendSink3 = this.util().createRetractTableSink((String[])((Object[])new String[]{"a", "b"}), (LogicalType[])((Object[])new LogicalType[]{this.INT(), this.LONG()}));
        ((TableEnvironmentInternal)this.util().tableEnv()).registerTableSinkInternal("appendSink3", appendSink3);
        stmtSet.addInsert("appendSink3", table5);
        this.util().verifyExplain(stmtSet);
    }

    @Test
    public void testMiniBatchOnDataStreamWithRowTime() {
        this.util().addDataStream("T1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "long")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "int")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "str")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$7 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$7 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(1L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT long,\n        |  COUNT(str) as cnt,\n        |  TUMBLE_END(rowtime, INTERVAL '10' SECOND) as rt\n        |FROM T1\n        |GROUP BY long, TUMBLE(rowtime, INTERVAL '10' SECOND)\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testOverWindowMiniBatchOnDataStreamWithRowTime() {
        this.util().addDataStream("T1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "long")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "int")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "str")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$8 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$4[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$4(org.apache.flink.table.planner.plan.stream.sql.MiniBatchIntervalInferTest$$anon$8 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, (Object)Duration.ofSeconds(3L));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT cnt, COUNT(`int`)\n        | FROM (\n        |   SELECT `int`,\n        |    COUNT(str) OVER\n        |      (PARTITION BY long ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt\n        |   FROM T1\n        | )\n        | GROUP BY cnt\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    private void withEarlyFireDelay(TableConfig tableConfig, Time interval) {
        long intervalInMillis = interval.toMilliseconds();
        Duration earlyFireDelay = tableConfig.getOptional(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_DELAY()).orElse(null);
        if (earlyFireDelay != null && earlyFireDelay.toMillis() != intervalInMillis) {
            throw new RuntimeException("Currently not support different earlyFireInterval configs in one job");
        }
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED(), (Object)BoxesRunTime.boxToBoolean((boolean)true));
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_DELAY(), (Object)Duration.ofMillis(intervalInMillis));
    }
}

