/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder$;
import org.apache.flink.table.planner.plan.stream.sql.RankTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005\re\u0001B\u0001\u0003\u0001M\u0011\u0001BU1oWR+7\u000f\u001e\u0006\u0003\u0007\u0011\t1a]9m\u0015\t)a!\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u000f!\tA\u0001\u001d7b]*\u0011\u0011BC\u0001\ba2\fgN\\3s\u0015\tYA\"A\u0003uC\ndWM\u0003\u0002\u000e\u001d\u0005)a\r\\5oW*\u0011q\u0002E\u0001\u0007CB\f7\r[3\u000b\u0003E\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\u000b\u0011\u0005UAR\"\u0001\f\u000b\u0005]A\u0011!B;uS2\u001c\u0018BA\r\u0017\u00055!\u0016M\u00197f)\u0016\u001cHOQ1tK\")1\u0004\u0001C\u00019\u00051A(\u001b8jiz\"\u0012!\b\t\u0003=\u0001i\u0011A\u0001\u0005\bA\u0001\u0011\r\u0011\"\u0003\"\u0003\u0011)H/\u001b7\u0016\u0003\t\u0002\"!F\u0012\n\u0005\u00112\"aE*ue\u0016\fW\u000eV1cY\u0016$Vm\u001d;Vi&d\u0007B\u0002\u0014\u0001A\u0003%!%A\u0003vi&d\u0007\u0005C\u0003)\u0001\u0011\u0005\u0011&\u0001\ruKN$(+\u00198l\u000b:$W*^:u'B,7-\u001b4jK\u0012$\u0012A\u000b\t\u0003W9j\u0011\u0001\f\u0006\u0002[\u0005)1oY1mC&\u0011q\u0006\f\u0002\u0005+:LG\u000f\u000b\u0002(cA\u0011!'O\u0007\u0002g)\u0011A'N\u0001\u0004CBL'B\u0001\u001c8\u0003\u001dQW\u000f]5uKJT!\u0001\u000f\t\u0002\u000b),h.\u001b;\n\u0005i\u001a$\u0001\u0002+fgRDQ\u0001\u0010\u0001\u0005\u0002%\nq\u0003^3tiJ\u000bgn[#oI2+7o\u001d+iC:TVM]8)\u0005m\n\u0004\"B \u0001\t\u0003I\u0013\u0001\u0006;fgR\u0014\u0016M\\6F]\u0012dUm]:UQ\u0006t\u0017\u0007\u000b\u0002?c!)!\t\u0001C\u0001S\u0005AB/Z:u%\u0006t7NR;oGRLwN\\%o\u001b&$G\r\\3)\u0005\u0005\u000b\u0004\"B#\u0001\t\u0003I\u0013a\r;fgR\u0014vn\u001e(v[\n,'oV5uQJ\u000bgn[#oI2+7o\u001d+iC:\ftJ\u001d3fe\nK\bK]8di&lW-Q:dQ\t!\u0015\u0007C\u0003I\u0001\u0011\u0005\u0011&\u0001\u001buKN$(k\\<Ok6\u0014WM],ji\"\u0014\u0016M\\6F]\u0012dUm]:UQ\u0006t\u0017g\u0014:eKJ\u0014\u0015\u0010\u0015:pGRLW.\u001a#fg\u000eD#aR\u0019\t\u000b-\u0003A\u0011A\u0015\u0002eQ,7\u000f\u001e*po:+XNY3s/&$\bNU1oW\u0016sG\rT3tgRC\u0017M\\\u0019Pe\u0012,'OQ=S_^$\u0018.\\3Bg\u000eD#AS\u0019\t\u000b9\u0003A\u0011A\u0015\u0002gQ,7\u000f\u001e*po:+XNY3s/&$\bNU1oW\u0016sG\rT3tgRC\u0017M\\\u0019Pe\u0012,'OQ=S_^$\u0018.\\3EKN\u001c\u0007FA'2\u0011\u0015\t\u0006\u0001\"\u0001*\u00039\"Xm\u001d;SC:\\w+\u001b;i%\u0006t7.\u00128e\u0019\u0016\u001c8\u000f\u00165b]Fz%\u000fZ3s\u0005f\u0004&o\\2uS6,\u0017i]2)\u0005A\u000b\u0004\"\u0002+\u0001\t\u0003I\u0013a\f;fgR\u0014\u0016M\\6XSRD'+\u00198l\u000b:$G*Z:t)\"\fg.M(sI\u0016\u0014()\u001f)s_\u000e$\u0018.\\3EKN\u001c\u0007FA*2\u0011\u00159\u0006\u0001\"\u0001*\u0003m!Xm\u001d;S_^tU/\u001c2fe^KG\u000f[(vi>\u0013H-\u001a:Cs\"\u0012a+\r\u0005\u00065\u0002!\t!K\u0001\u0017i\u0016\u001cHOU1oW^KG\u000f[(vi>\u0013H-\u001a:Cs\"\u0012\u0011,\r\u0005\u0006;\u0002!\t!K\u0001\u001ci\u0016\u001cH\u000fR3og\u0016\u0014\u0016M\\6XSRDw*\u001e;Pe\u0012,'OQ=)\u0005q\u000b\u0004\"\u00021\u0001\t\u0003I\u0013\u0001\b;fgR\u0014vn\u001e(v[\n,'oV5uQ6+H\u000e^5He>,\bo\u001d\u0015\u0003?FBQa\u0019\u0001\u0005\u0002%\nq\u0003^3tiJ\u000bgn[,ji\"lU\u000f\u001c;j\u000fJ|W\u000f]:)\u0005\t\f\u0004\"\u00024\u0001\t\u0003I\u0013\u0001\b;fgR$UM\\:f%\u0006t7nV5uQ6+H\u000e^5He>,\bo\u001d\u0015\u0003KFBQ!\u001b\u0001\u0005\u0002%\n\u0001\u0002^3tiR{\u0007O\u0014\u0015\u0003QFBQ\u0001\u001c\u0001\u0005\u0002%\n\u0011\u0002^3tiR{\u0007O\u0014\u001a)\u0005-\f\u0004\"B8\u0001\t\u0003I\u0013A\u0003;fgR$v\u000e\u001d(uQ\"\u0012a.\r\u0005\u0006e\u0002!\t!K\u0001\u0013i\u0016\u001cH\u000fV8q\u001d^KG\u000f\u001b$jYR,'\u000f\u000b\u0002rc!)Q\u000f\u0001C\u0001S\u0005\u0001B/Z:u)>\u0004h*\u00114uKJ\fum\u001a\u0015\u0003iFBQ\u0001\u001f\u0001\u0005\u0002%\na\u0003^3tiR{\u0007OT,ji\"\\U-_\"iC:<W\r\u001a\u0015\u0003oFBQa\u001f\u0001\u0005\u0002%\n\u0011\u0004^3tiVs\u0017M]=T_J$Hk\u001c9O\u001f:\u001cFO]5oO\"\u0012!0\r\u0005\u0006}\u0002!\t!K\u0001\u0015i\u0016\u001cH\u000fV8q\u001d>\u0013H-\u001a:Cs\u000e{WO\u001c;)\u0005u\f\u0004BBA\u0002\u0001\u0011\u0005\u0011&A\u000euKN$(k\\<Ok6\u0014WM],ji\"|W\u000f^(sI\u0016\u0014()\u001f\u0005\u0007\u0003\u000f\u0001A\u0011A\u0015\u0002AQ,7\u000f\u001e*po:+XNY3s/&$\bn\u0014:eKJ\u0014\u0015pQ8ogR\fg\u000e\u001e\u0015\u0004\u0003\u000b\t\u0004BBA\u0007\u0001\u0011\u0005\u0011&\u0001\u000euKN$Hk\u001c9O\u001fJ$WM\u001d\"z'Vlw+\u001b;i\u0007>tG\rK\u0002\u0002\fEBa!a\u0005\u0001\t\u0003I\u0013A\b;fgR$v\u000e\u001d(Pe\u0012,'OQ=Tk6<\u0016\u000e\u001e5DCN,w\u000b[3oQ\r\t\t\"\r\u0005\u0007\u00033\u0001A\u0011A\u0015\u00021Q,7\u000f\u001e+pa:{%\u000fZ3s\u0005f\u001cV/\\,ji\"Le\rK\u0002\u0002\u0018EBa!a\b\u0001\t\u0003I\u0013A\t;fgR$v\u000e\u001d(Pe\u0012,'OQ=Tk6<\u0016\u000e\u001e5GS2$XM]\"mCV\u001cX\rK\u0002\u0002\u001eEBa!!\n\u0001\t\u0003I\u0013a\t;fgR$v\u000e\u001d(Pe\u0012,'OQ=Tk6<\u0016\u000e\u001e5GS2$XM]\"mCV\u001cXM\r\u0015\u0004\u0003G\t\u0004BBA\u0016\u0001\u0011\u0005\u0011&A\u0011uKN$Hk\u001c9O\u001fJ$WM\u001d\"z\u0007>,h\u000e^!oI>#\b.\u001a:GS\u0016dG\rK\u0002\u0002*EBa!!\r\u0001\t\u0003I\u0013A\b;fgR$v\u000e\u001d(XSRDwI]8va\nK8i\u001c8ti\u0006tGoS3zQ\r\ty#\r\u0005\u0007\u0003o\u0001A\u0011A\u0015\u0002\u001dQ,7\u000f\u001e(fgR,G\rV8q\u001d\"\u001a\u0011QG\u0019\t\r\u0005u\u0002\u0001\"\u0001*\u0003]!Xm\u001d;U_Btei\u001c:WCJL\u0017M\u00197f'&TX\rK\u0002\u0002<EBa!a\u0011\u0001\t\u0003I\u0013a\u0007;fgR\u001c%/Z1uKZKWm^,ji\"\u0014vn\u001e(v[\n,'\u000fK\u0002\u0002BEBa!!\u0013\u0001\t\u0003I\u0013a\u0006;fgR\u001cuN\u001d:fY\u0006$XmU8siR{'+\u00198lQ\r\t9%\r\u0005\u0007\u0003\u001f\u0002A\u0011A\u0015\u0002YQ,7\u000f^\"peJ,G.\u0019;f'>\u0014H\u000fV8SC:\\w+\u001b;i\u001bVdG/\u001b9mK\u001e\u0013x.\u001e9LKf\u001c\bfAA'c!1\u0011Q\u000b\u0001\u0005\u0002%\na\u0004^3tiJ\u000bgn[,ji\"\fen\u001c;iKJ\u0014\u0016M\\6Bg&s\u0007/\u001e;)\u0007\u0005M\u0013\u0007\u0003\u0004\u0002\\\u0001!\t!K\u0001$i\u0016\u001cHOU3ek:$\u0017M\u001c;SC:\\g*^7cKJ\u001cu\u000e\\;n]J+Wn\u001c<fQ\r\tI&\r\u0005\u0007\u0003C\u0002A\u0011A\u0015\u0002AQ,7\u000f^+qI\u0006$\u0018M\u00197f%\u0006t7nV5uQ\u0012+G-\u001e9mS\u000e\fG/\u001a\u0015\u0004\u0003?\n\u0004BBA4\u0001\u0011\u0005\u0011&\u0001\u0011uKN$X\u000b\u001d3bi\u0006\u0014G.\u001a*b].\fe\r^3s\u0019>|7.\u001e9K_&t\u0007fAA3c!1\u0011Q\u000e\u0001\u0005\u0002%\na\u0005^3tiV\u0003H-\u0019;bE2,'+\u00198l\u0003\u001a$XM]%oi\u0016\u0014X.\u001a3jCR,7kY1oQ\r\tY'\r\u0005\u0007\u0003g\u0002A\u0011A\u0015\u0002KQ,7\u000f\u001e*b].|U\u000f\u001e9viV\u00038/\u001a:u\u0017\u0016Lhj\u001c;NCR\u001c\u0007nU5oWB[\u0007fAA9c!1\u0011\u0011\u0010\u0001\u0005\u0002%\nq\u0004^3tiJ\u000bgn[(viB,H/\u00169tKJ$8*Z=J]NKgn\u001b)lQ\r\t9(\r\u0005\u0007\u0003\u007f\u0002A\u0011A\u0015\u0002KQ,7\u000f\u001e*b].|U\u000f\u001e9vi2{7\u000f^+qg\u0016\u0014HoS3z/&$\bnU5oWB[\u0007fAA?c\u0001")
public class RankTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testRankEndMustSpecified() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as rank_num\n        |  FROM MyTable)\n        |WHERE rank_num >= 10\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExecPlan(sql)).hasMessageContaining("Rank end is not specified.") instanceof TableException;
    }

    @Test
    public void testRankEndLessThanZero() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as rank_num\n        |  FROM MyTable)\n        |WHERE rank_num <= 0\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExecPlan(sql)).hasMessageContaining("Rank end should not less than zero") instanceof TableException;
    }

    @Test
    public void testRankEndLessThan1() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankFunctionInMiddle() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        | SELECT a, RANK() OVER (PARTITION BY a ORDER BY a) rk, b, c FROM MyTable) t\n        |WHERE rk < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByProctimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByProctimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByRowtimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, rowtime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByRowtimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, rowtime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankWithRankEndLessThan1OrderByProctimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       RANK() OVER (PARTITION BY a ORDER BY proctime ASC) as rk\n        |  FROM MyTable)\n        |WHERE rk <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankWithRankEndLessThan1OrderByProctimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       RANK() OVER (PARTITION BY a ORDER BY proctime DESC) as rk\n        |  FROM MyTable)\n        |WHERE rk <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, ROW_NUMBER() OVER (PARTITION BY b) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testRankWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, RANK() OVER (PARTITION BY b) as rk\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testDenseRankWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, DENSE_RANK() OVER (PARTITION BY b) as rk\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testRowNumberWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, ROW_NUMBER() OVER (PARTITION BY b ORDER BY a) as row_num,\n        |         ROW_NUMBER() OVER (PARTITION BY a) as row_num1\n        |  FROM MyTable)\n        |WHERE row_num <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testRankWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, RANK() OVER (PARTITION BY b ORDER BY a) as rk,\n        |         RANK() OVER (PARTITION BY a) as rk1\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testDenseRankWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, DENSE_RANK() OVER (PARTITION BY b ORDER BY a) as rk,\n        |         DENSE_RANK() OVER (PARTITION BY a) as rk1\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testTopN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 10\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopN2() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE 10 >= row_num\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNth() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num = 10\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithFilter() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT row_num, a, c\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable\n        |  WHERE c > 1000)\n        |WHERE row_num <= 10 AND b IS NOT NULL\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNAfterAgg() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithKeyChanged() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, last_value(b) as b, SUM(c) as sum_c\n        |FROM MyTable\n        |GROUP BY a\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnarySortTopNOnString() {
        this.util().addTableSource("T", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "category")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "shopId")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "price"))}), new CaseClassTypeInfo<Tuple3<String, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$6 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<String, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<String, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<String, Object, String>>(this, fieldSerializers){

                    public Tuple3<String, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<String, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$6 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT category, shopId, max_price,\n        |      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as row_num\n        |  FROM (\n        |     SELECT category, shopId, MAX(price) as max_price\n        |     FROM T\n        |     GROUP BY category, shopId\n        |  ))\n        |WHERE row_num <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNOrderByCount() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) as count_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY count_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        String sql2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(40).append("\n         |SELECT max(a) FROM (").append(sql).append(")\n       ").toString())).stripMargin();
        this.util().verifyRelPlan(sql2, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    public void testRowNumberWithoutOrderBy() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT ROW_NUMBER() over (partition by a) FROM MyTable\n      ")).stripMargin();
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(sqlQuery)).hasRootCauseMessage("Over Agg: The window rank function requires order by clause with non-constant fields. please re-check the over window statement.");
    }

    @Test
    public void testRowNumberWithOrderByConstant() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b,\n        |  ROW_NUMBER() OVER (PARTITION BY b ORDER BY key) AS row_num\n        |  FROM (\n        |  SELECT *, '2023-03-29' AS key\n        |  FROM MyTable\n        |  ) tmp)\n        |WHERE row_num <= 10\n      ")).stripMargin();
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(sqlQuery)).hasRootCauseMessage("Over Agg: The window rank function requires order by clause with non-constant fields. please re-check the over window statement.");
    }

    @Test
    public void testTopNOrderBySumWithCond() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) AS sum_c\n        |FROM MyTable\n        |WHERE c >= 0\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNOrderBySumWithCaseWhen() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(CASE WHEN c > 10 THEN 1 WHEN c < 0 THEN 0 ELSE null END) AS sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithIf() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(IF(c > 10, 1, 0)) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithFilterClause() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) filter (where c >= 0 and a < 0) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithFilterClause2() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) FILTER (WHERE c <= 0 AND a < 0) AS sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(207).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c ASC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderByCountAndOtherField() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) AS count_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(219).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY count_c DESC, a ASC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithGroupByConstantKey() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) AS count_c\n        |FROM (\n        |SELECT *, 'cn' AS cn\n        |FROM MyTable\n        |)\n        |GROUP BY a, b, cn\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testNestedTopN() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) as count_c\n        |FROM (\n        |SELECT *, 'cn' as cn\n        |FROM MyTable\n        |)\n        |GROUP BY a, b, cn\n      ")).stripMargin();
        String subquery2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(197).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |    ROW_NUMBER() OVER (ORDER BY count_c DESC) as rank_num\n         |  FROM (").append(subquery2).append("))\n         |WHERE rank_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNForVariableSize() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, add(max_c) as c\n        |FROM (\n        |  SELECT MAX(a) as a, b, MAX(c) as max_c\n        |  FROM MyTable\n        |  GROUP BY b\n        |)\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(199).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= a\n      ").toString())).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE})));
    }

    @Test
    public void testCreateViewWithRowNumber() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE test_source (\n                    |  name STRING,\n                    |  eat STRING,\n                    |  age BIGINT\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().tableEnv().executeSql("create view view1 as select name, eat ,sum(age) as cnt\nfrom test_source group by name, eat");
        this.util().tableEnv().executeSql("create view view2 as\nselect *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY cnt DESC) as row_num\nfrom view1");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |create table sink (\n         |  name varchar,\n         |  eat varchar,\n         |  cnt bigint\n         |)\n         |with(\n         |  'connector' = 'print'\n         |)\n         |")).stripMargin());
        this.util().verifyExecPlanInsert("insert into sink select name, eat, cnt\nfrom view2 where row_num <= 3");
    }

    @Test
    public void testCorrelateSortToRank() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT a, b\n         |FROM\n         |  (SELECT DISTINCT a FROM MyTable) T1,\n         |  LATERAL (\n         |    SELECT b, c\n         |    FROM MyTable\n         |    WHERE a = T1.a\n         |    ORDER BY c\n         |    DESC LIMIT 3\n         |  )\n      ")).stripMargin();
        this.util().verifyExecPlan(query);
    }

    @Test
    public void testCorrelateSortToRankWithMultipleGroupKeys() {
        this.util().addDataStream("T", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "d")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple4<Object, String, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$7 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<Object, String, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>>(this, fieldSerializers){

                    public Tuple4<Object, String, Object, Object> createInstance(Object[] fields) {
                        return new Tuple4((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[3])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple4<Object, String, Object, Object>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$7 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        String query = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT a, b, c\n         |FROM\n         |  (SELECT DISTINCT a, b FROM T) T1,\n         |  LATERAL (\n         |    SELECT c, d\n         |    FROM T\n         |    WHERE a = T1.a and b = T1.b\n         |    ORDER BY d\n         |    DESC LIMIT 3\n         |  )\n      ")).stripMargin();
        this.util().verifyExecPlan(query);
    }

    @Test
    public void testRankWithAnotherRankAsInput() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT CAST(rna AS INT) AS rn1, CAST(rnb AS INT) AS rn2 FROM (\n        |  SELECT *, row_number() over (partition by a order by b desc) AS rnb\n        |  FROM (\n        |    SELECT *, row_number() over (partition by a, c order by b desc) AS rna\n        |    FROM MyTable\n        |  )\n        |  WHERE rna <= 100\n        |)\n        |WHERE rnb <= 200\n        |")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRedundantRankNumberColumnRemove() {
        this.util().addDataStream("MyTable1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "uri")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "reqcount")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "start_time")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "bucket_id"))}), new CaseClassTypeInfo<Tuple4<String, Object, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$8 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<String, Object, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$4[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple4<String, Object, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple4<String, Object, Object, Object>>(this, fieldSerializers){

                    public Tuple4<String, Object, Object, Object> createInstance(Object[] fields) {
                        return new Tuple4((Object)((String)fields[0]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[3])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple4<String, Object, Object, Object>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$4(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$8 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  CONCAT('http://txmov2.a.yximgs.com', uri) AS url,\n        |  reqcount AS download_count,\n        |  start_time AS `timestamp`\n        |FROM\n        |  (\n        |    SELECT\n        |      uri,\n        |      reqcount,\n        |      rownum_2,\n        |      start_time\n        |    FROM\n        |      (\n        |        SELECT\n        |          uri,\n        |          reqcount,\n        |          start_time,\n        |          ROW_NUMBER() OVER (\n        |            PARTITION BY start_time\n        |            ORDER BY\n        |              reqcount DESC\n        |          ) AS rownum_2\n        |        FROM\n        |          (\n        |            SELECT\n        |            uri,\n        |            reqcount,\n        |            start_time,\n        |            ROW_NUMBER() OVER (\n        |                PARTITION BY start_time, bucket_id\n        |                ORDER BY\n        |                reqcount DESC\n        |            ) AS rownum_1\n        |            FROM MyTable1\n        |          )\n        |        WHERE\n        |          rownum_1 <= 100000\n        |      )\n        |    WHERE\n        |      rownum_2 <= 100000\n        |  )\n        |")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testUpdatableRankWithDeduplicate() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v0 AS\n                               |SELECT *\n                               |FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `c`\n                               |        ORDER BY `PROCTIME`()) AS `rowNum`\n                               |        FROM MyTable)\n                               |WHERE `rowNum` = 1\n                               |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v1 AS\n                               |SELECT c, b, SUM(a) FILTER (WHERE a > 0) AS d FROM v0 GROUP BY c, b\n                               |")).stripMargin());
        this.util().verifyRelPlan(new StringOps(Predef$.MODULE$.augmentString("\n                         |SELECT c, b, d\n                         |FROM (\n                         |    SELECT\n                         |       c, b, d,\n                         |       ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn FROM v1\n                         |) WHERE rn < 10\n                         |")).stripMargin());
    }

    @Test
    public void testUpdatableRankAfterLookupJoin() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE LookupTable (\n                     |  `id` INT,\n                     |  `name` STRING,\n                     |  `age` INT\n                     |) WITH (\n                     |  'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW V1 AS\n        |SELECT *\n        |FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id\n        |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT *\n         |FROM (\n         |  SELECT name, ids,\n         |      ROW_NUMBER() OVER (PARTITION BY name ORDER BY ids DESC) as rank_num\n         |  FROM (\n         |     SELECT name, SUM(id) FILTER (WHERE id > 0) as ids\n         |     FROM V1\n         |     GROUP BY name\n         |  ))\n         |WHERE rank_num <= 3\n         |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUpdatableRankAfterIntermediateScan() {
        this.util().tableEnv().getConfig().set(RelNodeBlockPlanBuilder$.MODULE$.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED(), (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v1 AS\n                               |SELECT a, MAX(b) AS b, MIN(c) AS c\n                               |FROM MyTable GROUP BY a\n                               |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE sink(\n                     |  `id` INT,\n                     |  `name` STRING,\n                     |  `age` BIGINT,\n                     |   primary key (id) not enforced\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'sink-insert-only' = 'false'\n                     |)\n                     |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                           |INSERT INTO sink\n                           |SELECT * FROM v1\n                           |")).stripMargin());
        stmtSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                           |INSERT INTO sink\n                           |SELECT a, b, c FROM (\n                           |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn\n                           |  FROM v1\n                           |) WHERE rn < 3\n                           |")).stripMargin());
        this.util().verifyExecPlan(stmtSet);
    }

    @Test
    public void testRankOutputUpsertKeyNotMatchSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE sink (\n                               | a INT,\n                               | b VARCHAR,\n                               | c BIGINT,\n                               | PRIMARY KEY (a) NOT ENFORCED\n                               |) WITH (\n                               | 'connector' = 'values'\n                               | ,'sink-insert-only' = 'false'\n                               |)\n                               |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, b, c FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRankOutputUpsertKeyInSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE sink (\n                               | a INT,\n                               | b VARCHAR,\n                               | c BIGINT,\n                               | PRIMARY KEY (a, b) NOT ENFORCED\n                               |) WITH (\n                               | 'connector' = 'values'\n                               | ,'sink-insert-only' = 'false'\n                               |)\n                               |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, b, c FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRankOutputLostUpsertKeyWithSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE sink (\n                               | a INT,\n                               | c BIGINT,\n                               | rn BIGINT,\n                               | PRIMARY KEY (a) NOT ENFORCED\n                               |) WITH (\n                               | 'connector' = 'values'\n                               | ,'sink-insert-only' = 'false'\n                               |)\n                               |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, c, rn FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    public RankTest() {
        this.util().addDataStream("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                return this.createSerializer(executionConfig.getSerializerConfig());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$5 org.apache.flink.api.common.serialization.SerializerConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
    }
}

