/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder$;
import org.apache.flink.table.planner.plan.stream.sql.RankTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005me\u0001B\u0001\u0003\u0001M\u0011\u0001BU1oWR+7\u000f\u001e\u0006\u0003\u0007\u0011\t1a]9m\u0015\t)a!\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u000f!\tA\u0001\u001d7b]*\u0011\u0011BC\u0001\ba2\fgN\\3s\u0015\tYA\"A\u0003uC\ndWM\u0003\u0002\u000e\u001d\u0005)a\r\\5oW*\u0011q\u0002E\u0001\u0007CB\f7\r[3\u000b\u0003E\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\u000b\u0011\u0005UAR\"\u0001\f\u000b\u0005]A\u0011!B;uS2\u001c\u0018BA\r\u0017\u00055!\u0016M\u00197f)\u0016\u001cHOQ1tK\")1\u0004\u0001C\u00019\u00051A(\u001b8jiz\"\u0012!\b\t\u0003=\u0001i\u0011A\u0001\u0005\bA\u0001\u0011\r\u0011\"\u0003\"\u0003\u0011)H/\u001b7\u0016\u0003\t\u0002\"!F\u0012\n\u0005\u00112\"aE*ue\u0016\fW\u000eV1cY\u0016$Vm\u001d;Vi&d\u0007B\u0002\u0014\u0001A\u0003%!%A\u0003vi&d\u0007\u0005C\u0003)\u0001\u0011\u0005\u0011&\u0001\ruKN$(+\u00198l\u000b:$W*^:u'B,7-\u001b4jK\u0012$\u0012A\u000b\t\u0003W9j\u0011\u0001\f\u0006\u0002[\u0005)1oY1mC&\u0011q\u0006\f\u0002\u0005+:LG\u000f\u000b\u0002(cA\u0011!'N\u0007\u0002g)\u0011A\u0007E\u0001\u0006UVt\u0017\u000e^\u0005\u0003mM\u0012A\u0001V3ti\")\u0001\b\u0001C\u0001S\u00059B/Z:u%\u0006t7.\u00128e\u0019\u0016\u001c8\u000f\u00165b]j+'o\u001c\u0015\u0003oEBQa\u000f\u0001\u0005\u0002%\nA\u0003^3tiJ\u000bgn[#oI2+7o\u001d+iC:\f\u0004F\u0001\u001e2\u0011\u0015q\u0004\u0001\"\u0001*\u0003a!Xm\u001d;SC:\\g)\u001e8di&|g.\u00138NS\u0012$G.\u001a\u0015\u0003{EBQ!\u0011\u0001\u0005\u0002%\n1\u0007^3tiJ{wOT;nE\u0016\u0014x+\u001b;i%\u0006t7.\u00128e\u0019\u0016\u001c8\u000f\u00165b]Fz%\u000fZ3s\u0005f\u0004&o\\2uS6,\u0017i]2)\u0005\u0001\u000b\u0004\"\u0002#\u0001\t\u0003I\u0013\u0001\u000e;fgR\u0014vn\u001e(v[\n,'oV5uQJ\u000bgn[#oI2+7o\u001d+iC:\ftJ\u001d3fe\nK\bK]8di&lW\rR3tG\"\u00121)\r\u0005\u0006\u000f\u0002!\t!K\u00013i\u0016\u001cHOU8x\u001dVl'-\u001a:XSRD'+\u00198l\u000b:$G*Z:t)\"\fg.M(sI\u0016\u0014()\u001f*poRLW.Z!tG\"\u0012a)\r\u0005\u0006\u0015\u0002!\t!K\u00014i\u0016\u001cHOU8x\u001dVl'-\u001a:XSRD'+\u00198l\u000b:$G*Z:t)\"\fg.M(sI\u0016\u0014()\u001f*poRLW.\u001a#fg\u000eD#!S\u0019\t\u000b5\u0003A\u0011A\u0015\u0002]Q,7\u000f\u001e*b].<\u0016\u000e\u001e5SC:\\WI\u001c3MKN\u001cH\u000b[1oc=\u0013H-\u001a:CsB\u0013xn\u0019;j[\u0016\f5o\u0019\u0015\u0003\u0019FBQ\u0001\u0015\u0001\u0005\u0002%\nq\u0006^3tiJ\u000bgn[,ji\"\u0014\u0016M\\6F]\u0012dUm]:UQ\u0006t\u0017g\u0014:eKJ\u0014\u0015\u0010\u0015:pGRLW.\u001a#fg\u000eD#aT\u0019\t\u000bM\u0003A\u0011A\u0015\u00027Q,7\u000f\u001e*po:+XNY3s/&$\bnT;u\u001fJ$WM\u001d\"zQ\u0011\u0011\u0016'\u0016,\u0002\u0011\u0015D\b/Z2uK\u0012\u001c\u0013a\u0016\t\u00031\u0002t!!\u00170\u000f\u0005ikV\"A.\u000b\u0005q\u0013\u0012A\u0002\u001fs_>$h(C\u0001.\u0013\tyF&A\u0004qC\u000e\\\u0017mZ3\n\u0005\u0005\u0014'\u0001\u0005*v]RLW.Z#yG\u0016\u0004H/[8o\u0015\tyF\u0006C\u0003e\u0001\u0011\u0005\u0011&\u0001\fuKN$(+\u00198l/&$\bnT;u\u001fJ$WM\u001d\"zQ\u0011\u0019\u0017'\u00164$\u0003\u001d\u0004\"\u0001[6\u000e\u0003%T!A\u001b\u0006\u0002\u0007\u0005\u0004\u0018.\u0003\u0002mS\n\u0019b+\u00197jI\u0006$\u0018n\u001c8Fq\u000e,\u0007\u000f^5p]\")a\u000e\u0001C\u0001S\u0005YB/Z:u\t\u0016t7/\u001a*b].<\u0016\u000e\u001e5PkR|%\u000fZ3s\u0005fDC!\\\u0019VM\")\u0011\u000f\u0001C\u0001S\u0005aB/Z:u%><h*^7cKJ<\u0016\u000e\u001e5Nk2$\u0018n\u0012:pkB\u001c\b\u0006\u000292+ZCQ\u0001\u001e\u0001\u0005\u0002%\nq\u0003^3tiJ\u000bgn[,ji\"lU\u000f\u001c;j\u000fJ|W\u000f]:)\tM\fTK\u001a\u0005\u0006o\u0002!\t!K\u0001\u001di\u0016\u001cH\u000fR3og\u0016\u0014\u0016M\\6XSRDW*\u001e7uS\u001e\u0013x.\u001e9tQ\u00111\u0018'\u00164\t\u000bi\u0004A\u0011A\u0015\u0002\u0011Q,7\u000f\u001e+pa:C#!_\u0019\t\u000bu\u0004A\u0011A\u0015\u0002\u0013Q,7\u000f\u001e+pa:\u0013\u0004F\u0001?2\u0011\u0019\t\t\u0001\u0001C\u0001S\u0005QA/Z:u)>\u0004h\n\u001e5)\u0005}\f\u0004BBA\u0004\u0001\u0011\u0005\u0011&\u0001\nuKN$Hk\u001c9O/&$\bNR5mi\u0016\u0014\bfAA\u0003c!1\u0011Q\u0002\u0001\u0005\u0002%\n\u0001\u0003^3tiR{\u0007OT!gi\u0016\u0014\u0018iZ4)\u0007\u0005-\u0011\u0007\u0003\u0004\u0002\u0014\u0001!\t!K\u0001\u0017i\u0016\u001cH\u000fV8q\u001d^KG\u000f[&fs\u000eC\u0017M\\4fI\"\u001a\u0011\u0011C\u0019\t\r\u0005e\u0001\u0001\"\u0001*\u0003e!Xm\u001d;V]\u0006\u0014\u0018pU8siR{\u0007OT(o'R\u0014\u0018N\\4)\u0007\u0005]\u0011\u0007\u0003\u0004\u0002 \u0001!\t!K\u0001\u0015i\u0016\u001cH\u000fV8q\u001d>\u0013H-\u001a:Cs\u000e{WO\u001c;)\u0007\u0005u\u0011\u0007\u0003\u0004\u0002&\u0001!\t!K\u0001\u001bi\u0016\u001cH\u000fV8q\u001d>\u0013H-\u001a:CsN+XnV5uQ\u000e{g\u000e\u001a\u0015\u0004\u0003G\t\u0004BBA\u0016\u0001\u0011\u0005\u0011&\u0001\u0010uKN$Hk\u001c9O\u001fJ$WM\u001d\"z'Vlw+\u001b;i\u0007\u0006\u001cXm\u00165f]\"\u001a\u0011\u0011F\u0019\t\r\u0005E\u0002\u0001\"\u0001*\u0003a!Xm\u001d;U_BtuJ\u001d3fe\nK8+^7XSRD\u0017J\u001a\u0015\u0004\u0003_\t\u0004BBA\u001c\u0001\u0011\u0005\u0011&\u0001\u0012uKN$Hk\u001c9O\u001fJ$WM\u001d\"z'Vlw+\u001b;i\r&dG/\u001a:DY\u0006,8/\u001a\u0015\u0004\u0003k\t\u0004BBA\u001f\u0001\u0011\u0005\u0011&A\u0012uKN$Hk\u001c9O\u001fJ$WM\u001d\"z'Vlw+\u001b;i\r&dG/\u001a:DY\u0006,8/\u001a\u001a)\u0007\u0005m\u0012\u0007\u0003\u0004\u0002D\u0001!\t!K\u0001\"i\u0016\u001cH\u000fV8q\u001d>\u0013H-\u001a:Cs\u000e{WO\u001c;B]\u0012|E\u000f[3s\r&,G\u000e\u001a\u0015\u0004\u0003\u0003\n\u0004BBA%\u0001\u0011\u0005\u0011&\u0001\u0010uKN$Hk\u001c9O/&$\bn\u0012:pkB\u0014\u0015pQ8ogR\fg\u000e^&fs\"\u001a\u0011qI\u0019\t\r\u0005=\u0003\u0001\"\u0001*\u00039!Xm\u001d;OKN$X\r\u001a+pa:C3!!\u00142\u0011\u0019\t)\u0006\u0001C\u0001S\u00059B/Z:u)>\u0004hJR8s-\u0006\u0014\u0018.\u00192mKNK'0\u001a\u0015\u0006\u0003'\nTK\u001a\u0005\u0007\u00037\u0002A\u0011A\u0015\u00027Q,7\u000f^\"sK\u0006$XMV5fo^KG\u000f\u001b*po:+XNY3sQ\r\tI&\r\u0005\u0007\u0003C\u0002A\u0011A\u0015\u0002/Q,7\u000f^\"peJ,G.\u0019;f'>\u0014H\u000fV8SC:\\\u0007fAA0c!1\u0011q\r\u0001\u0005\u0002%\nA\u0006^3ti\u000e{'O]3mCR,7k\u001c:u)>\u0014\u0016M\\6XSRDW*\u001e7uSBdWm\u0012:pkB\\U-_:)\u0007\u0005\u0015\u0014\u0007\u0003\u0004\u0002n\u0001!\t!K\u0001\u001fi\u0016\u001cHOU1oW^KG\u000f[!o_RDWM\u001d*b].\f5/\u00138qkRD3!a\u001b2\u0011\u0019\t\u0019\b\u0001C\u0001S\u0005\u0019C/Z:u%\u0016$WO\u001c3b]R\u0014\u0016M\\6Ok6\u0014WM]\"pYVlgNU3n_Z,\u0007fAA9c!1\u0011\u0011\u0010\u0001\u0005\u0002%\n\u0001\u0005^3tiV\u0003H-\u0019;bE2,'+\u00198l/&$\b\u000eR3ekBd\u0017nY1uK\"\u001a\u0011qO\u0019\t\r\u0005}\u0004\u0001\"\u0001*\u0003\u0001\"Xm\u001d;Va\u0012\fG/\u00192mKJ\u000bgn[!gi\u0016\u0014Hj\\8lkBTu.\u001b8)\u0007\u0005u\u0014\u0007\u0003\u0004\u0002\u0006\u0002!\t!K\u0001'i\u0016\u001cH/\u00169eCR\f'\r\\3SC:\\\u0017I\u001a;fe&sG/\u001a:nK\u0012L\u0017\r^3TG\u0006t\u0007fAABc!1\u00111\u0012\u0001\u0005\u0002%\nQ\u0005^3tiJ\u000bgn[(viB,H/\u00169tKJ$8*Z=O_Rl\u0015\r^2i'&t7\u000eU6)\u0007\u0005%\u0015\u0007\u0003\u0004\u0002\u0012\u0002!\t!K\u0001 i\u0016\u001cHOU1oW>+H\u000f];u+B\u001cXM\u001d;LKfLenU5oWB[\u0007fAAHc!1\u0011q\u0013\u0001\u0005\u0002%\nQ\u0005^3tiJ\u000bgn[(viB,H\u000fT8tiV\u00038/\u001a:u\u0017\u0016Lx+\u001b;i'&t7\u000eU6)\u0007\u0005U\u0015\u0007")
public class RankTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testRankEndMustSpecified() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as rank_num\n        |  FROM MyTable)\n        |WHERE rank_num >= 10\n      ")).stripMargin();
        this.thrown().expectMessage("Rank end is not specified.");
        this.thrown().expect(TableException.class);
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankEndLessThanZero() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as rank_num\n        |  FROM MyTable)\n        |WHERE rank_num <= 0\n      ")).stripMargin();
        this.thrown().expectMessage("Rank end should not less than zero");
        this.thrown().expect(TableException.class);
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankEndLessThan1() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankFunctionInMiddle() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        | SELECT a, RANK() OVER (PARTITION BY a ORDER BY a) rk, b, c FROM MyTable) t\n        |WHERE rk < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByProctimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByProctimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByRowtimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, rowtime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByRowtimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, rowtime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankWithRankEndLessThan1OrderByProctimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       RANK() OVER (PARTITION BY a ORDER BY proctime ASC) as rk\n        |  FROM MyTable)\n        |WHERE rk <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankWithRankEndLessThan1OrderByProctimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       RANK() OVER (PARTITION BY a ORDER BY proctime DESC) as rk\n        |  FROM MyTable)\n        |WHERE rk <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test(expected=RuntimeException.class)
    public void testRowNumberWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, ROW_NUMBER() OVER (PARTITION BY b) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test(expected=ValidationException.class)
    public void testRankWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, RANK() OVER (PARTITION BY b) as rk\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test(expected=ValidationException.class)
    public void testDenseRankWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, DENSE_RANK() OVER (PARTITION BY b) as rk\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test(expected=RuntimeException.class)
    public void testRowNumberWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, ROW_NUMBER() OVER (PARTITION BY b ORDER BY a) as row_num,\n        |         ROW_NUMBER() OVER (PARTITION BY a) as row_num1\n        |  FROM MyTable)\n        |WHERE row_num <= a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test(expected=ValidationException.class)
    public void testRankWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, RANK() OVER (PARTITION BY b ORDER BY a) as rk,\n        |         RANK() OVER (PARTITION BY a) as rk1\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test(expected=ValidationException.class)
    public void testDenseRankWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, DENSE_RANK() OVER (PARTITION BY b ORDER BY a) as rk,\n        |         DENSE_RANK() OVER (PARTITION BY a) as rk1\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 10\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopN2() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE 10 >= row_num\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNth() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num = 10\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithFilter() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT row_num, a, c\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable\n        |  WHERE c > 1000)\n        |WHERE row_num <= 10 AND b IS NOT NULL\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNAfterAgg() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithKeyChanged() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, last_value(b) as b, SUM(c) as sum_c\n        |FROM MyTable\n        |GROUP BY a\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnarySortTopNOnString() {
        this.util().addTableSource("T", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "category")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "shopId")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "price"))}), new CaseClassTypeInfo<Tuple3<String, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$6 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<String, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<String, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<String, Object, String>>(this, fieldSerializers){

                    public Tuple3<String, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$6 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT category, shopId, max_price,\n        |      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as row_num\n        |  FROM (\n        |     SELECT category, shopId, MAX(price) as max_price\n        |     FROM T\n        |     GROUP BY category, shopId\n        |  ))\n        |WHERE row_num <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNOrderByCount() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) as count_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY count_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        String sql2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(40).append("\n         |SELECT max(a) FROM (").append(sql).append(")\n       ").toString())).stripMargin();
        this.util().verifyRelPlan(sql2, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNOrderBySumWithCond() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) AS sum_c\n        |FROM MyTable\n        |WHERE c >= 0\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNOrderBySumWithCaseWhen() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(CASE WHEN c > 10 THEN 1 WHEN c < 0 THEN 0 ELSE null END) AS sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithIf() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(IF(c > 10, 1, 0)) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithFilterClause() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) filter (where c >= 0 and a < 0) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithFilterClause2() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) FILTER (WHERE c <= 0 AND a < 0) AS sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(207).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c ASC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderByCountAndOtherField() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) AS count_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(219).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY count_c DESC, a ASC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithGroupByConstantKey() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) AS count_c\n        |FROM (\n        |SELECT *, 'cn' AS cn\n        |FROM MyTable\n        |)\n        |GROUP BY a, b, cn\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testNestedTopN() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) as count_c\n        |FROM (\n        |SELECT *, 'cn' as cn\n        |FROM MyTable\n        |)\n        |GROUP BY a, b, cn\n      ")).stripMargin();
        String subquery2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(197).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |    ROW_NUMBER() OVER (ORDER BY count_c DESC) as rank_num\n         |  FROM (").append(subquery2).append("))\n         |WHERE rank_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test(expected=ValidationException.class)
    public void testTopNForVariableSize() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, add(max_c) as c\n        |FROM (\n        |  SELECT MAX(a) as a, b, MAX(c) as max_c\n        |  FROM MyTable\n        |  GROUP BY b\n        |)\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(199).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= a\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testCreateViewWithRowNumber() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE test_source (\n        |  name STRING,\n        |  eat STRING,\n        |  age BIGINT\n        |) WITH (\n        |  'connector' = 'values',\n        |  'bounded' = 'false'\n        |)\n      ")).stripMargin());
        this.util().tableEnv().executeSql("create view view1 as select name, eat ,sum(age) as cnt\nfrom test_source group by name, eat");
        this.util().tableEnv().executeSql("create view view2 as\nselect *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY cnt DESC) as row_num\nfrom view1");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |create table sink (\n         |  name varchar,\n         |  eat varchar,\n         |  cnt bigint\n         |)\n         |with(\n         |  'connector' = 'print'\n         |)\n         |")).stripMargin());
        this.util().verifyExecPlanInsert("insert into sink select name, eat, cnt\nfrom view2 where row_num <= 3");
    }

    @Test
    public void testCorrelateSortToRank() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT a, b\n         |FROM\n         |  (SELECT DISTINCT a FROM MyTable) T1,\n         |  LATERAL (\n         |    SELECT b, c\n         |    FROM MyTable\n         |    WHERE a = T1.a\n         |    ORDER BY c\n         |    DESC LIMIT 3\n         |  )\n      ")).stripMargin();
        this.util().verifyExecPlan(query);
    }

    @Test
    public void testCorrelateSortToRankWithMultipleGroupKeys() {
        this.util().addDataStream("T", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "d")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple4<Object, String, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$7 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<Object, String, Object, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>>(this, fieldSerializers){

                    public Tuple4<Object, String, Object, Object> createInstance(Object[] fields) {
                        return new Tuple4((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[3])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$7 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        String query = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT a, b, c\n         |FROM\n         |  (SELECT DISTINCT a, b FROM T) T1,\n         |  LATERAL (\n         |    SELECT c, d\n         |    FROM T\n         |    WHERE a = T1.a and b = T1.b\n         |    ORDER BY d\n         |    DESC LIMIT 3\n         |  )\n      ")).stripMargin();
        this.util().verifyExecPlan(query);
    }

    @Test
    public void testRankWithAnotherRankAsInput() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT CAST(rna AS INT) AS rn1, CAST(rnb AS INT) AS rn2 FROM (\n        |  SELECT *, row_number() over (partition by a order by b desc) AS rnb\n        |  FROM (\n        |    SELECT *, row_number() over (partition by a, c order by b desc) AS rna\n        |    FROM MyTable\n        |  )\n        |  WHERE rna <= 100\n        |)\n        |WHERE rnb <= 200\n        |")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRedundantRankNumberColumnRemove() {
        this.util().addDataStream("MyTable1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "uri")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "reqcount")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "start_time")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "bucket_id"))}), new CaseClassTypeInfo<Tuple4<String, Object, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$8 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<String, Object, Object, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$4[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple4<String, Object, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple4<String, Object, Object, Object>>(this, fieldSerializers){

                    public Tuple4<String, Object, Object, Object> createInstance(Object[] fields) {
                        return new Tuple4((Object)((String)fields[0]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[3])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$4(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$8 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  CONCAT('http://txmov2.a.yximgs.com', uri) AS url,\n        |  reqcount AS download_count,\n        |  start_time AS `timestamp`\n        |FROM\n        |  (\n        |    SELECT\n        |      uri,\n        |      reqcount,\n        |      rownum_2,\n        |      start_time\n        |    FROM\n        |      (\n        |        SELECT\n        |          uri,\n        |          reqcount,\n        |          start_time,\n        |          ROW_NUMBER() OVER (\n        |            PARTITION BY start_time\n        |            ORDER BY\n        |              reqcount DESC\n        |          ) AS rownum_2\n        |        FROM\n        |          (\n        |            SELECT\n        |            uri,\n        |            reqcount,\n        |            start_time,\n        |            ROW_NUMBER() OVER (\n        |                PARTITION BY start_time, bucket_id\n        |                ORDER BY\n        |                reqcount DESC\n        |            ) AS rownum_1\n        |            FROM MyTable1\n        |          )\n        |        WHERE\n        |          rownum_1 <= 100000\n        |      )\n        |    WHERE\n        |      rownum_2 <= 100000\n        |  )\n        |")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testUpdatableRankWithDeduplicate() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW v0 AS\n        |SELECT *\n        |FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `c`\n        |        ORDER BY `PROCTIME`()) AS `rowNum`\n        |        FROM MyTable)\n        |WHERE `rowNum` = 1\n        |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW v1 AS\n        |SELECT c, b, SUM(a) FILTER (WHERE a > 0) AS d FROM v0 GROUP BY c, b\n        |")).stripMargin());
        this.util().verifyRelPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT c, b, d\n        |FROM (\n        |    SELECT\n        |       c, b, d,\n        |       ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn FROM v1\n        |) WHERE rn < 10\n        |")).stripMargin());
    }

    @Test
    public void testUpdatableRankAfterLookupJoin() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE LookupTable (\n         |  `id` INT,\n         |  `name` STRING,\n         |  `age` INT\n         |) WITH (\n         |  'connector' = 'values'\n         |)\n         |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW V1 AS\n        |SELECT *\n        |FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id\n        |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT *\n         |FROM (\n         |  SELECT name, ids,\n         |      ROW_NUMBER() OVER (PARTITION BY name ORDER BY ids DESC) as rank_num\n         |  FROM (\n         |     SELECT name, SUM(id) FILTER (WHERE id > 0) as ids\n         |     FROM V1\n         |     GROUP BY name\n         |  ))\n         |WHERE rank_num <= 3\n         |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUpdatableRankAfterIntermediateScan() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), true);
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW v1 AS\n        |SELECT a, MAX(b) AS b, MIN(c) AS c\n        |FROM MyTable GROUP BY a\n        |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE sink(\n         |  `id` INT,\n         |  `name` STRING,\n         |  `age` BIGINT,\n         |   primary key (id) not enforced\n         |) WITH (\n         |  'connector' = 'values',\n         |  'sink-insert-only' = 'false'\n         |)\n         |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT * FROM v1\n        |")).stripMargin());
        stmtSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, b, c FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn\n        |  FROM v1\n        |) WHERE rn < 3\n        |")).stripMargin());
        this.util().verifyExecPlan(stmtSet);
    }

    @Test
    public void testRankOutputUpsertKeyNotMatchSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE sink (\n        | a INT,\n        | b VARCHAR,\n        | c BIGINT,\n        | PRIMARY KEY (a) NOT ENFORCED\n        |) WITH (\n        | 'connector' = 'values'\n        | ,'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, b, c FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRankOutputUpsertKeyInSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE sink (\n        | a INT,\n        | b VARCHAR,\n        | c BIGINT,\n        | PRIMARY KEY (a, b) NOT ENFORCED\n        |) WITH (\n        | 'connector' = 'values'\n        | ,'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, b, c FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRankOutputLostUpsertKeyWithSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE sink (\n        | a INT,\n        | c BIGINT,\n        | rn BIGINT,\n        | PRIMARY KEY (a) NOT ENFORCED\n        |) WITH (\n        | 'connector' = 'values'\n        | ,'sink-insert-only' = 'false'\n        |)\n        |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, c, rn FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    public RankTest() {
        this.util().addDataStream("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$5 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
    }
}

