/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Test;
import scala.Predef$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001A4A!\u0001\u0002\u0001'\tqq+\u001b8e_^\u0014\u0016M\\6UKN$(BA\u0002\u0005\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000b\u0019\taa\u001d;sK\u0006l'BA\u0004\t\u0003\u0011\u0001H.\u00198\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\u0011\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\t\u000bm\u0001A\u0011\u0001\u000f\u0002\rqJg.\u001b;?)\u0005i\u0002C\u0001\u0010\u0001\u001b\u0005\u0011\u0001b\u0002\u0011\u0001\u0005\u0004%I!I\u0001\u0005kRLG.F\u0001#!\t)2%\u0003\u0002%-\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\"1a\u0005\u0001Q\u0001\n\t\nQ!\u001e;jY\u0002BQ\u0001\u000b\u0001\u0005\u0002%\n1\u0007^3tiNKW\u000e\u001d7jMf$V/\u001c2mK^Kg\u000eZ8x)Z3%)\u001a4pe\u0016<\u0016N\u001c3poJ\u000bgn[,ji\"\u001c\u0015\r\\2\u0015\u0003)\u0002\"a\u000b\u0018\u000e\u00031R\u0011!L\u0001\u0006g\u000e\fG.Y\u0005\u0003_1\u0012A!\u00168ji\"\u0012q%\r\t\u0003eUj\u0011a\r\u0006\u0003iA\tQA[;oSRL!AN\u001a\u0003\tQ+7\u000f\u001e\u0005\u0006q\u0001!\t!K\u0001,i\u0016\u001cHoU5na2Lg-\u001f+v[\ndWmV5oI><HK\u0016$CK\u001a|'/Z,j]\u0012|wOU1oW\"\u0012q'\r\u0005\u0006w\u0001!\t!K\u0001*i\u0016\u001cH/\u00168tkB\u0004xN\u001d;fI^Kg\u000eZ8x)Z3u\fV;nE2,wJ\u001c)s_\u000e$\u0018.\\3)\u0005i\n\u0004\"\u0002 \u0001\t\u0003I\u0013\u0001\r;fgR\u001c\u0016.\u001c9mS\u001aL\bj\u001c9XS:$wn\u001e+W\r\n+gm\u001c:f/&tGm\\<SC:\\w+\u001b;i\u0007\u0006d7\r\u000b\u0002>c!)\u0011\t\u0001C\u0001S\u0005AC/Z:u'&l\u0007\u000f\\5gs\"{\u0007oV5oI><HK\u0016$CK\u001a|'/Z,j]\u0012|wOU1oW\"\u0012\u0001)\r\u0005\u0006\t\u0002!\t!K\u0001'i\u0016\u001cH/\u00168tkB\u0004xN\u001d;fI^Kg\u000eZ8x)Z3u\fS8q\u001f:\u0004&o\\2uS6,\u0007FA\"2\u0011\u00159\u0005\u0001\"\u0001*\u0003U\"Xm\u001d;TS6\u0004H.\u001b4z\u0007VlW\u000f\\1uK^Kg\u000eZ8x)Z3%)\u001a4pe\u0016<\u0016N\u001c3poJ\u000bgn[,ji\"\u001c\u0015\r\\2)\u0005\u0019\u000b\u0004\"\u0002&\u0001\t\u0003I\u0013!\f;fgR\u001c\u0016.\u001c9mS\u001aL8)^7vY\u0006$XmV5oI><HK\u0016$CK\u001a|'/Z,j]\u0012|wOU1oW\"\u0012\u0011*\r\u0005\u0006\u001b\u0002!\t!K\u0001,i\u0016\u001cH/\u00168tkB\u0004xN\u001d;fI^Kg\u000eZ8x)Z3ulQ;nk2\fG/Z(o!J|7\r^5nK\"\u0012A*\r\u0005\u0006!\u0002!\t!K\u0001\u001ci\u0016\u001cHo\u00148Uk6\u0014G.Z,j]\u0012|w/Q4he\u0016<\u0017\r^3)\u0005=\u000b\u0004\"B*\u0001\t\u0003I\u0013!\n;fgR|e\u000eV;nE2,w+\u001b8e_^\fum\u001a:fO\u0006$Xm\u00148Qe>\u001cG/[7fQ\t\u0011\u0016\u0007C\u0003W\u0001\u0011\u0005\u0011&\u0001\ruKN$xJ\u001c%pa^Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016D#!V\u0019\t\u000be\u0003A\u0011A\u0015\u0002EQ,7\u000f^(o\u0011>\u0004x+\u001b8e_^\fum\u001a:fO\u0006$Xm\u00148Qe>\u001cG/[7fQ\tA\u0016\u0007C\u0003]\u0001\u0011\u0005\u0011&A\u000fuKN$xJ\\\"v[Vd\u0017\r^3XS:$wn^!hOJ,w-\u0019;fQ\tY\u0016\u0007C\u0003`\u0001\u0011\u0005\u0011&A\u0014uKN$xJ\\\"v[Vd\u0017\r^3XS:$wn^!hOJ,w-\u0019;f\u001f:\u0004&o\\2uS6,\u0007F\u000102\u0011\u0015\u0011\u0007\u0001\"\u0001*\u0003\u001d\"Xm\u001d;US6,\u0017\t\u001e;sS\n,H/\u001a)s_B\fw-\u0019;f\r>\u0014x+\u001b8e_^\u0014\u0016M\\6)\u0005\u0005\f\u0004\"B3\u0001\t\u0003I\u0013\u0001\u000b;fgR$\u0016.\\3BiR\u0014\u0018NY;uKB\u0013x\u000e]1hCR,gi\u001c:XS:$wn\u001e*b].\f\u0004F\u000132\u0011\u0015A\u0007\u0001\"\u0001*\u0003A!Xm\u001d;SC:\\g)\u001e8di&|g\u000e\u000b\u0002hc!)1\u000e\u0001C\u0001S\u0005)B/Z:u\t\u0016t7/\u001a*b].4UO\\2uS>t\u0007F\u000162\u0011\u0015q\u0007\u0001\"\u0001*\u0003U!Xm\u001d;WCJL\u0017M\u00197f%\u0006t7NU1oO\u0016D#!\\\u0019")
public class WindowRankTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowRankWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, b, c, d, e\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowRank() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_TumbleOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, b, c, d, e\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expectMessage("Processing time Window TopN is not supported yet.");
        this.thrown().expect(TableException.class);
        this.util().verifyExplain(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowRankWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, b, c, d, e\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowRank() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_HopOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, b, c, d, e\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expectMessage("Processing time Window TopN is not supported yet.");
        this.thrown().expect(TableException.class);
        this.util().verifyExplain(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowRankWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, b, c, d, e\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowRank() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_CumulateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, b, c, d, e\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expectMessage("Processing time Window TopN is not supported yet.");
        this.thrown().expect(TableException.class);
        this.util().verifyExplain(sql);
    }

    @Test
    public void testOnTumbleWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnTumbleWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Processing time Window TopN is not supported yet.");
        this.util().verifyExplain(sql);
    }

    @Test
    public void testOnHopWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnHopWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Processing time Window TopN is not supported yet.");
        this.util().verifyExplain(sql);
    }

    @Test
    public void testOnCumulateWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(\n        |     PARTITION BY window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnCumulateWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(\n        |     PARTITION BY a, window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Processing time Window TopN is not supported yet.");
        this.util().verifyExplain(sql);
    }

    @Test
    public void testTimeAttributePropagateForWindowRank() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp AS\n        |SELECT window_time as rowtime, a, b, c, d, e\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTimeAttributePropagateForWindowRank1() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp1 AS\n        |SELECT window_time as rowtime, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY cnt DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      a,\n        |      window_start,\n        |      window_end,\n        |      window_time,\n        |      count(*) as cnt,\n        |      sum(d) as sum_d,\n        |      max(d) filter (where b > 1000) as max_d,\n        |      weightedAvg(b, e) AS wAvg,\n        |      count(distinct c) AS uv\n        |    FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |    GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   sum(cnt),\n        |   sum(sum_d),\n        |   max(max_d)\n        |FROM TABLE(TUMBLE(TABLE tmp1, DESCRIPTOR(rowtime), INTERVAL '1' HOUR))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testRankFunction() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   RANK() OVER(PARTITION BY window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("RANK() function is not supported on Window TopN currently, only ROW_NUMBER() is supported.");
        this.util().verifyExplain(sql);
    }

    @Test
    public void testDenseRankFunction() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   DENSE_RANK() OVER(PARTITION BY window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("DENSE_RANK() function is not supported on Window TopN currently, only ROW_NUMBER() is supported.");
        this.util().verifyExplain(sql);
    }

    @Test
    public void testVariableRankRange() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY cnt DESC) as rownum\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    sum(d) as sum_d,\n        |    max(d) filter (where b > 1000) as max_d,\n        |    max(b) as max_b,\n        |    weightedAvg(b, e) AS wAvg,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |  )\n        |)\n        |WHERE rownum <= max_b\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Rank strategy rankEnd=max_b is not supported on window rank currently.");
        this.util().verifyExplain(sql);
    }

    public WindowRankTest() {
        this.util().addTemporarySystemFunction("weightedAvg", JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class);
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                              |CREATE TABLE MyTable (\n                              |  a INT,\n                              |  b BIGINT,\n                              |  c STRING NOT NULL,\n                              |  d DECIMAL(10, 3),\n                              |  e BIGINT,\n                              |  rowtime TIMESTAMP(3),\n                              |  proctime as PROCTIME(),\n                              |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                              |) with (\n                              |  'connector' = 'values'\n                              |)\n                              |")).stripMargin());
    }
}

