/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql.join;

import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015d\u0001B\u0001\u0003\u0001U\u0011abV5oI><(j\\5o)\u0016\u001cHO\u0003\u0002\u0004\t\u0005!!n\\5o\u0015\t)a!A\u0002tc2T!a\u0002\u0005\u0002\rM$(/Z1n\u0015\tI!\"\u0001\u0003qY\u0006t'BA\u0006\r\u0003\u001d\u0001H.\u00198oKJT!!\u0004\b\u0002\u000bQ\f'\r\\3\u000b\u0005=\u0001\u0012!\u00024mS:\\'BA\t\u0013\u0003\u0019\t\u0007/Y2iK*\t1#A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001-A\u0011qCG\u0007\u00021)\u0011\u0011DC\u0001\u0006kRLGn]\u0005\u00037a\u0011Q\u0002V1cY\u0016$Vm\u001d;CCN,\u0007\"B\u000f\u0001\t\u0003q\u0012A\u0002\u001fj]&$h\bF\u0001 !\t\u0001\u0003!D\u0001\u0003\u0011\u001d\u0011\u0003A1A\u0005\n\r\nA!\u001e;jYV\tA\u0005\u0005\u0002\u0018K%\u0011a\u0005\u0007\u0002\u0014'R\u0014X-Y7UC\ndW\rV3tiV#\u0018\u000e\u001c\u0005\u0007Q\u0001\u0001\u000b\u0011\u0002\u0013\u0002\u000bU$\u0018\u000e\u001c\u0011\t\u000b)\u0002A\u0011A\u0016\u0002mQ,7\u000f^*j[Bd\u0017NZ=Uk6\u0014G.Z,j]\u0012|w\u000f\u0016,G\u0005\u00164wN]3XS:$wn\u001e&pS:<\u0016\u000e\u001e5Uo>\u001c\u0015\r\\2\u0015\u00031\u0002\"!\f\u0019\u000e\u00039R\u0011aL\u0001\u0006g\u000e\fG.Y\u0005\u0003c9\u0012A!\u00168ji\"\u0012\u0011f\r\t\u0003imj\u0011!\u000e\u0006\u0003m]\n1!\u00199j\u0015\tA\u0014(A\u0004kkBLG/\u001a:\u000b\u0005i\u0012\u0012!\u00026v]&$\u0018B\u0001\u001f6\u0005\u0011!Vm\u001d;\t\u000by\u0002A\u0011A\u0016\u0002oQ,7\u000f^*j[Bd\u0017NZ=Uk6\u0014G.Z,j]\u0012|w\u000f\u0016,G\u0005\u00164wN]3XS:$wn\u001e&pS:<\u0016\u000e\u001e5MK\u001a$8)\u00197dQ\ti4\u0007C\u0003B\u0001\u0011\u00051&\u0001\u001duKN$8+[7qY&4\u0017\u0010V;nE2,w+\u001b8e_^$fK\u0012\"fM>\u0014XmV5oI><(j\\5o/&$\bNU5hQR\u001c\u0015\r\\2)\u0005\u0001\u001b\u0004\"\u0002#\u0001\t\u0003Y\u0013a\u000b;fgR\u001c\u0016.\u001c9mS\u001aLH+^7cY\u0016<\u0016N\u001c3poR3fIQ3g_J,w+\u001b8e_^Tu.\u001b8)\u0005\r\u001b\u0004\"B$\u0001\t\u0003Y\u0013!\u000b;fgR,fn];qa>\u0014H/\u001a3XS:$wn\u001e+W\r~#V/\u001c2mK>s\u0007K]8di&lW\r\u000b\u0002Gg!)!\n\u0001C\u0001W\u0005\u0019D/Z:u'&l\u0007\u000f\\5gs\"{\u0007oV5oI><HK\u0016$CK\u001a|'/Z,j]\u0012|wOS8j]^KG\u000f\u001b+x_\u000e\u000bGn\u0019\u0015\u0003\u0013NBQ!\u0014\u0001\u0005\u0002-\nA\u0007^3tiNKW\u000e\u001d7jMfDu\u000e],j]\u0012|w\u000f\u0016,G\u0005\u00164wN]3XS:$wn\u001e&pS:<\u0016\u000e\u001e5MK\u001a$8)\u00197dQ\ta5\u0007C\u0003Q\u0001\u0011\u00051&A\u001buKN$8+[7qY&4\u0017\u0010S8q/&tGm\\<U-\u001a\u0013UMZ8sK^Kg\u000eZ8x\u0015>LgnV5uQJKw\r\u001b;DC2\u001c\u0007FA(4\u0011\u0015\u0019\u0006\u0001\"\u0001,\u0003!\"Xm\u001d;TS6\u0004H.\u001b4z\u0011>\u0004x+\u001b8e_^$fK\u0012\"fM>\u0014XmV5oI><(j\\5oQ\t\u00116\u0007C\u0003W\u0001\u0011\u00051&\u0001\u0014uKN$XK\\:vaB|'\u000f^3e/&tGm\\<U-\u001a{\u0006j\u001c9P]B\u0013xn\u0019;j[\u0016D#!V\u001a\t\u000be\u0003A\u0011A\u0016\u0002qQ,7\u000f^*j[Bd\u0017NZ=Dk6,H.\u0019;f/&tGm\\<U-\u001a\u0013UMZ8sK^Kg\u000eZ8x\u0015>LgnV5uQR;xnQ1mG\"\u0012\u0001l\r\u0005\u00069\u0002!\taK\u0001:i\u0016\u001cHoU5na2Lg-_\"v[Vd\u0017\r^3XS:$wn\u001e+W\r\n+gm\u001c:f/&tGm\\<K_&tw+\u001b;i\u0019\u00164GoQ1mG\"\u00121l\r\u0005\u0006?\u0002!\taK\u0001;i\u0016\u001cHoU5na2Lg-_\"v[Vd\u0017\r^3XS:$wn\u001e+W\r\n+gm\u001c:f/&tGm\\<K_&tw+\u001b;i%&<\u0007\u000e^\"bY\u000eD#AX\u001a\t\u000b\t\u0004A\u0011A\u0016\u0002[Q,7\u000f^*j[Bd\u0017NZ=Dk6,H.\u0019;f/&tGm\\<U-\u001a\u0013UMZ8sK^Kg\u000eZ8x\u0015>Lg\u000e\u000b\u0002bg!)Q\r\u0001C\u0001W\u0005YC/Z:u+:\u001cX\u000f\u001d9peR,GmV5oI><HK\u0016$`\u0007VlW\u000f\\1uK>s\u0007K]8di&lW\r\u000b\u0002eg!)\u0001\u000e\u0001C\u0001W\u0005)B/Z:u\u001d>$8+Y7f/&tGm\\<UsB,\u0007FA44\u0011\u0015Y\u0007\u0001\"\u0001,\u0003U!Xm\u001d;O_R\u001c\u0016-\\3XS:$wn^*qK\u000eD#A[\u001a\t\u000b9\u0004A\u0011A\u0016\u00029Q,7\u000f\u001e(piN\u000bW.\u001a+j[\u0016\fE\u000f\u001e:jEV$X\rV=qK\"\u0012Qn\r\u0005\u0006c\u0002!\taK\u0001,i\u0016\u001cH/T5tg^Kg\u000eZ8x\u000b:$\u0017J\\\"p]\u0012LG/[8o\r>\u0014H+^7cY\u0016<\u0016N\u001c3po\"\u0012\u0001o\r\u0005\u0006i\u0002!\taK\u0001.i\u0016\u001cH/T5tg^Kg\u000eZ8x'R\f'\u000f^%o\u0007>tG-\u001b;j_:4uN\u001d+v[\ndWmV5oI><\bFA:4\u0011\u00159\b\u0001\"\u0001,\u0003!\"Xm\u001d;NSN\u001cx+\u001b8e_^,e\u000eZ%o\u0007>tG-\u001b;j_:4uN\u001d%pa^Kg\u000eZ8xQ\t18\u0007C\u0003{\u0001\u0011\u00051&\u0001\u0016uKN$X*[:t/&tGm\\<Ti\u0006\u0014H/\u00138D_:$\u0017\u000e^5p]\u001a{'\u000fS8q/&tGm\\<)\u0005e\u001c\u0004\"B?\u0001\t\u0003Y\u0013!\f;fgRl\u0015n]:XS:$wn^#oI&s7i\u001c8eSRLwN\u001c$pe\u000e+X.\u001e7bi\u0016<\u0016N\u001c3po\"\u0012Ap\r\u0005\u0007\u0003\u0003\u0001A\u0011A\u0016\u0002_Q,7\u000f^'jgN<\u0016N\u001c3poN#\u0018M\u001d;J]\u000e{g\u000eZ5uS>tgi\u001c:Dk6,H.\u0019;f/&tGm\\<)\u0005}\u001c\u0004BBA\u0004\u0001\u0011\u00051&A\u000euKN$xJ\u001c+v[\ndWmV5oI><\u0018iZ4sK\u001e\fG/\u001a\u0015\u0004\u0003\u000b\u0019\u0004BBA\u0007\u0001\u0011\u00051&A\u0013uKN$xJ\u001c+v[\ndWmV5oI><\u0018iZ4sK\u001e\fG/Z(o!J|7\r^5nK\"\u001a\u00111B\u001a\t\r\u0005M\u0001\u0001\"\u0001,\u0003a!Xm\u001d;P]\"{\u0007oV5oI><\u0018iZ4sK\u001e\fG/\u001a\u0015\u0004\u0003#\u0019\u0004BBA\r\u0001\u0011\u00051&\u0001\u0012uKN$xJ\u001c%pa^Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016|e\u000e\u0015:pGRLW.\u001a\u0015\u0004\u0003/\u0019\u0004BBA\u0010\u0001\u0011\u00051&A\u000fuKN$xJ\\\"v[Vd\u0017\r^3XS:$wn^!hOJ,w-\u0019;fQ\r\tib\r\u0005\u0007\u0003K\u0001A\u0011A\u0016\u0002OQ,7\u000f^(o\u0007VlW\u000f\\1uK^Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016|e\u000e\u0015:pGRLW.\u001a\u0015\u0004\u0003G\u0019\u0004BBA\u0016\u0001\u0011\u00051&A\ruKN$x+\u001b8e_^Tu.\u001b8XSRDgj\u001c8FcVL\u0007fAA\u0015g!1\u0011\u0011\u0007\u0001\u0005\u0002-\nq\u0005^3tiRKW.Z!uiJL'-\u001e;f!J|\u0007/Y4bi\u00164uN],j]\u0012|wOS8j]\"\u001a\u0011qF\u001a\t\r\u0005]\u0002\u0001\"\u0001,\u0003!\"Xm\u001d;US6,\u0017\t\u001e;sS\n,H/\u001a)s_B\fw-\u0019;f\r>\u0014x+\u001b8e_^Tu.\u001b82Q\r\t)d\r\u0005\u0007\u0003{\u0001A\u0011A\u0016\u0002QQ,7\u000f^,j]\u0012|w\u000f\u0015:pa\u0016\u0014H/\u001f)s_B\fw-\u0019;f\r>\u0014x+\u001b8e_^Tu.\u001b8)\u0007\u0005m2\u0007\u0003\u0004\u0002D\u0001!\taK\u0001\u000fi\u0016\u001cHoU3nS*{\u0017N\\%OQ\r\t\te\r\u0005\u0007\u0003\u0013\u0002A\u0011A\u0016\u0002\u001bQ,7\u000f^*f[&,\u00050[:uQ\r\t9e\r\u0005\u0007\u0003\u001f\u0002A\u0011A\u0016\u0002#Q,7\u000f^!oi&Tu.\u001b8O_RLe\nK\u0002\u0002NMBa!!\u0016\u0001\t\u0003Y\u0013\u0001\u0006;fgR\fe\u000e^5K_&tgj\u001c;Fq&\u001cH\u000fK\u0002\u0002TMBa!a\u0017\u0001\t\u0003Y\u0013!\b;fgRTu.\u001b8XSRD\u0017j\u001d(pi\u0012K7\u000f^5oGR4%o\\7)\u0007\u0005e3\u0007\u0003\u0004\u0002b\u0001!\taK\u0001\u0014i\u0016\u001cHOS8j]R{W*\u001e7uSNKgn\u001b\u0015\u0004\u0003?\u001a\u0004")
public class WindowJoinTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoinWithTwoCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoinWithLeftCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoinWithRightCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_TumbleOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.a, L.b, L.c, R.a, R.b, R.c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExplain(sql)).hasMessageContaining("Processing time Window Join is not supported yet.") instanceof TableException;
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoinWithTwoCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoinWithLeftCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoinWithRightCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_HopOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.a, L.b, L.c, R.a, R.b, R.c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExplain(sql)).hasMessageContaining("Processing time Window Join is not supported yet.") instanceof TableException;
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoinWithTwoCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoinWithLeftCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoinWithRightCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_CumulateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.a, L.b, L.c, R.a, R.b, R.c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExplain(sql)).hasMessageContaining("Processing time Window Join is not supported yet.") instanceof TableException;
    }

    @Test
    public void testNotSameWindowType() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testNotSameWindowSpec() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '2' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testNotSameTimeAttributeType() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowEndInConditionForTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowStartInConditionForTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowEndInConditionForHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowStartInConditionForHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowEndInConditionForCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowStartInConditionForCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnTumbleWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnTumbleWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnHopWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnHopWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnCumulateWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnCumulateWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testWindowJoinWithNonEqui() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a AND\n        | CAST(L.window_start AS BIGINT) > R.uv\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTimeAttributePropagateForWindowJoin() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyTable3 (\n                                |  a INT,\n                                |  b STRING NOT NULL,\n                                |  c BIGINT,\n                                |  rowtime TIMESTAMP(3),\n                                |  proctime as PROCTIME(),\n                                |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                                |) with (\n                                |  'connector' = 'values'\n                                |)\n                                |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp AS\n        |SELECT\n        |  L.window_time as rowtime,\n        |  L.a as a,\n        |  L.b as l_b,\n        |  L.c as l_c,\n        |  R.b as r_b,\n        |  R.c as r_c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT tmp.*, MyTable3.* FROM tmp JOIN MyTable3 ON\n        | tmp.a = MyTable3.a AND\n        | tmp.rowtime BETWEEN\n        |   MyTable3.rowtime - INTERVAL '10' SECOND AND\n        |   MyTable3.rowtime + INTERVAL '1' HOUR\n        |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTimeAttributePropagateForWindowJoin1() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyTable4 (\n                                |  a INT,\n                                |  b STRING NOT NULL,\n                                |  c BIGINT,\n                                |  rowtime TIMESTAMP(3),\n                                |  proctime as PROCTIME(),\n                                |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                                |) with (\n                                |  'connector' = 'values'\n                                |)\n                                |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp1 AS\n        |SELECT\n        |  L.window_time as rowtime,\n        |  L.a,\n        |  L.cnt as l_cnt,\n        |  L.uv as l_uv,\n        |  R.cnt as r_cnt,\n        |  R.uv as r_uv\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT tmp1.*, MyTable4.* FROM tmp1 JOIN MyTable4 ON\n        | tmp1.a = MyTable4.a AND\n        | tmp1.rowtime BETWEEN\n        |   MyTable4.rowtime - INTERVAL '10' SECOND AND\n        |   MyTable4.rowtime + INTERVAL '1' HOUR\n        |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testWindowPropertyPropagateForWindowJoin() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp2 AS\n        |SELECT\n        |  L.window_start as window_start,\n        |  L.window_end as window_end,\n        |  L.a,\n        |  L.cnt as l_cnt,\n        |  L.uv as l_uv,\n        |  R.cnt as r_cnt,\n        |  R.uv as r_uv\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY l_cnt DESC) as rownum\n        |  FROM tmp2\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSemiJoinIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE L.a IN (\n        |SELECT a FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSemiExist() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE EXISTS (\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testAntiJoinNotIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE L.a NOT IN (\n        |SELECT a FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testAntiJoinNotExist() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE NOT EXISTS (\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |L.a IS NOT DISTINCT FROM R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testJoinToMultiSink() {
        String sourceDdl = new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE food_order (\n        | user_id STRING,\n        | order_id STRING,\n        | amount INT,\n        | event_time TIMESTAMP(3),\n        | WATERMARK FOR event_time AS event_time\n        |) WITH (\n        |'connector' = 'values')\n        |")).stripMargin();
        this.util().tableEnv().executeSql(sourceDdl);
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TEMPORARY VIEW food_view AS\n        |WITH food AS ( \n        |  SELECT user_id, \n        |         window_start, \n        |         window_end \n        |  FROM TABLE(TUMBLE(TABLE food_order, DESCRIPTOR(event_time), INTERVAL '1' MINUTES)) \n        |  GROUP BY \n        |  user_id,\n        |  window_start,\n        |  window_end)\n        |SELECT food.window_start\n        |     ,food.window_end\n        |     ,food.user_id\n        |     ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'yyyyMMdd') AS dt\n        |     ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'HH') AS `hour`\n        |FROM food\n        |LEFT JOIN food AS a ON food.user_id = a.user_id\n        |AND food.window_start = a.window_start\n        |AND food.window_end = a.window_end\n        |")).stripMargin();
        this.util().tableEnv().executeSql(query);
        String sinkDdl = new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE %s (\n        | window_start TIMESTAMP(3),\n        | window_end TIMESTAMP(3),\n        | user_id STRING,\n        | dt STRING,\n        | `hour` STRING\n        |) WITH (\n        | 'connector' = 'values')\n        |")).stripMargin();
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(sinkDdl)).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"sink1"})));
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(sinkDdl)).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"sink2"})));
        StatementSet statementSet = this.util().tableEnv().createStatementSet();
        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM food_view");
        statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM food_view");
        this.util().verifyRelPlan(statementSet);
    }

    public WindowJoinTest() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                              |CREATE TABLE MyTable (\n                              |  a INT,\n                              |  b STRING NOT NULL,\n                              |  c BIGINT,\n                              |  rowtime TIMESTAMP(3),\n                              |  proctime as PROCTIME(),\n                              |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                              |) with (\n                              |  'connector' = 'values'\n                              |)\n                              |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                              |CREATE TABLE MyTable2 (\n                              |  a INT,\n                              |  b STRING NOT NULL,\n                              |  c BIGINT,\n                              |  rowtime TIMESTAMP(3),\n                              |  proctime as PROCTIME(),\n                              |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                              |) with (\n                              |  'connector' = 'values'\n                              |)\n                              |")).stripMargin());
    }
}

