/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql.join;

import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005]d\u0001B\u0001\u0003\u0001U\u0011\u0001\u0003V3na>\u0014\u0018\r\u001c&pS:$Vm\u001d;\u000b\u0005\r!\u0011\u0001\u00026pS:T!!\u0002\u0004\u0002\u0007M\fHN\u0003\u0002\b\u0011\u000511\u000f\u001e:fC6T!!\u0003\u0006\u0002\tAd\u0017M\u001c\u0006\u0003\u00171\tq\u0001\u001d7b]:,'O\u0003\u0002\u000e\u001d\u0005)A/\u00192mK*\u0011q\u0002E\u0001\u0006M2Lgn\u001b\u0006\u0003#I\ta!\u00199bG\",'\"A\n\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0005\u00011\u0002CA\f\u001b\u001b\u0005A\"BA\r\u000b\u0003\u0015)H/\u001b7t\u0013\tY\u0002DA\u0007UC\ndW\rV3ti\n\u000b7/\u001a\u0005\u0006;\u0001!\tAH\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003}\u0001\"\u0001\t\u0001\u000e\u0003\tAqA\t\u0001C\u0002\u0013\u00051%\u0001\u0003vi&dW#\u0001\u0013\u0011\u0005])\u0013B\u0001\u0014\u0019\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0011\u0019A\u0003\u0001)A\u0005I\u0005)Q\u000f^5mA!)!\u0006\u0001C\u0001W\u00051!-\u001a4pe\u0016$\u0012\u0001\f\t\u0003[Aj\u0011A\f\u0006\u0002_\u0005)1oY1mC&\u0011\u0011G\f\u0002\u0005+:LG\u000f\u000b\u0002*gA\u0011AgO\u0007\u0002k)\u0011agN\u0001\u0004CBL'B\u0001\u001d:\u0003\u001dQW\u000f]5uKJT!A\u000f\n\u0002\u000b),h.\u001b;\n\u0005q*$A\u0003\"fM>\u0014X-R1dQ\")a\b\u0001C\u0001W\u00059C/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o\u001f:dUmZ1dsN{WO]2fQ\ti\u0004\t\u0005\u00025\u0003&\u0011!)\u000e\u0002\u0005)\u0016\u001cH\u000fC\u0003E\u0001\u0011\u00051&\u0001\u0014uKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:|e\u000eT3hC\u000eL8k\\;sG\u0016D#a\u0011!\t\u000b\u001d\u0003A\u0011A\u0016\u00023Q,7\u000f^#wK:$H+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\u001c\u0015\u0003\r\u0002CQA\u0013\u0001\u0005\u0002-\na\u0006^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]>sG+[7fgR\fW\u000e\u001d'uuJ{w\u000f^5nK\"\u0012\u0011\n\u0011\u0005\u0006\u001b\u0002!\taK\u0001\"i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>LgnV5uQZKWm\u001e\u0015\u0003\u0019\u0002CQ\u0001\u0015\u0001\u0005\u0002-\na\u0007^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]^KG\u000f\u001b,jK^<\u0016\u000e\u001e5D_:\u001cH/\u00198u\u0007>tG-\u001b;j_:D#a\u0014!\t\u000bM\u0003A\u0011A\u0016\u0002mQ,7\u000f^#wK:$H+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"4\u0016.Z<XSRDg)\u001e8di&|gnQ8oI&$\u0018n\u001c8)\u0005I\u0003\u0005\"\u0002,\u0001\t\u0003Y\u0013\u0001\u000b;fgR,e/\u001a8u)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<hj\u001c8FcVL\u0007FA+A\u0011\u0015I\u0006\u0001\"\u0001,\u0003=\"Xm\u001d;Fm\u0016tG\u000fV5nKR+W\u000e]8sC2Tu.\u001b8XSRDg+[3x/&$\b\u000e\u0015:fI&\u001c\u0017\r^3tQ\tA\u0006\tC\u0003]\u0001\u0011\u00051&A\u001auKN$XI^3oiRKW.\u001a'fMR$V-\u001c9pe\u0006d'j\\5o/&$\bNV5fo^KG\u000f\u001b)sK\u0012L7-\u0019;fg\"\u00121\f\u0011\u0005\u0006?\u0002!\taK\u0001(i\u0016\u001cH\u000f\u0015:pGRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i\u0019\u0006\u001cHOU8x-&,w\u000f\u000b\u0002_\u0001\")!\r\u0001C\u0001W\u0005IC/Z:u!J|7\rV5nKR+W\u000e]8sC2Tu.\u001b8XSRDG*Y:u-\u0006dW/\u001a,jK^D#!\u0019!\t\u000b\u0015\u0004A\u0011A\u0016\u0002OQ,7\u000f\u001e)s_\u000e$\u0016.\\3UK6\u0004xN]1m\u0015>LgnV5uQZKWm\u001e(p]\u0016\u000bX/\u001b\u0015\u0003I\u0002CQ\u0001\u001b\u0001\u0005\u0002-\na\u0006^3tiB\u0013xn\u0019+j[\u0016$V-\u001c9pe\u0006d'j\\5o/&$\bNV5fo^KG\u000f\u001b)sK\u0012L7-\u0019;fg\"\u0012q\r\u0011\u0005\u0006W\u0002!\taK\u00016i\u0016\u001cH\u000f\u0015:pGRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i\u0007>l\u0007/\u001e;fI\u000e{G.^7o\u0003:$\u0007+^:i\t><h\u000e\u000b\u0002k\u0001\")a\u000e\u0001C\u0001W\u00051D/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o/&$\bnQ8naV$X\rZ\"pYVlg.\u00118e!V\u001c\b\u000eR8x]\"\u0012Q\u000e\u0011\u0005\u0006c\u0002!\taK\u0001)i\u0016\u001cH\u000f\u0015:pGRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i\u0005&tGn\\4T_V\u00148-\u001a\u0015\u0003a\u0002CQ\u0001\u001e\u0001\u0005\u0002-\nQ\u0007^3tiB\u0013xn\u0019+j[\u0016$V-\u001c9pe\u0006d'j\\5o/&$\bNV5fo^KG\u000f[\"p]N$\u0018M\u001c;D_:$\u0017\u000e^5p]\"\u00121\u000f\u0011\u0005\u0006o\u0002!\taK\u0001:i\u0016\u001cH\u000f\u0015:pGRKW.\u001a'fMR$V-\u001c9pe\u0006d'j\\5o/&$\bNV5fo^KG\u000f[\"p]N$\u0018M\u001c;D_:$\u0017\u000e^5p]\"\u0012a\u000f\u0011\u0005\u0006u\u0002!\taK\u00016i\u0016\u001cH\u000f\u0015:pGRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i-&,woV5uQ\u001a+hn\u0019;j_:\u001cuN\u001c3ji&|g\u000e\u000b\u0002z\u0001\")Q\u0010\u0001C\u0001W\u0005YB/Z:u\u0013:4\u0018\r\\5e)\u0016l\u0007o\u001c:bYR\u000b'\r\u001c&pS:D#\u0001 !\t\r\u0005\u0005\u0001\u0001\"\u0001,\u0003\u0015\"Xm\u001d;Fm\u0016tG\u000fV5nKR+W\u000e]8sC2Tu.\u001b8U_NKgn[,ji\"\u00046\u000e\u000b\u0002\u0000\u0001\"1\u0011q\u0001\u0001\u0005\u0002-\n!\u0006^3tiR+W\u000e]8sC2Tu.\u001b8VaN,'\u000f^*pkJ\u001cWmV5uQB{7\u000f\u001e$jYR,'\u000fK\u0002\u0002\u0006\u0001Ca!!\u0004\u0001\t\u0003Y\u0013!\u000b;fgR$V-\u001c9pe\u0006d'j\\5o+B\u001cXM\u001d;T_V\u00148-Z,ji\"\u0004&/\u001a$jYR,'\u000fK\u0002\u0002\f\u0001Cq!a\u0005\u0001\t\u0013\t)\"A\u000bfqB,7\r^#yG\u0016\u0004H/[8o)\"\u0014xn\u001e8\u0015\u000f1\n9\"a\f\u00024!9Q!!\u0005A\u0002\u0005e\u0001\u0003BA\u000e\u0003SqA!!\b\u0002&A\u0019\u0011q\u0004\u0018\u000e\u0005\u0005\u0005\"bAA\u0012)\u00051AH]8pizJ1!a\n/\u0003\u0019\u0001&/\u001a3fM&!\u00111FA\u0017\u0005\u0019\u0019FO]5oO*\u0019\u0011q\u0005\u0018\t\u0011\u0005E\u0012\u0011\u0003a\u0001\u00033\t\u0001b[3zo>\u0014Hm\u001d\u0005\u000b\u0003k\t\t\u0002%AA\u0002\u0005]\u0012!B2mCjT\b\u0007BA\u001d\u0003\u0007\u0002b!a\u0007\u0002<\u0005}\u0012\u0002BA\u001f\u0003[\u0011Qa\u00117bgN\u0004B!!\u0011\u0002D1\u0001A\u0001DA#\u0003g\t\t\u0011!A\u0003\u0002\u0005\u001d#aA0%cE!\u0011\u0011JA(!\ri\u00131J\u0005\u0004\u0003\u001br#a\u0002(pi\"Lgn\u001a\t\u0005\u0003#\nYF\u0004\u0003\u0002T\u0005]c\u0002BA\u0010\u0003+J\u0011aL\u0005\u0004\u00033r\u0013a\u00029bG.\fw-Z\u0005\u0005\u0003;\nyFA\u0005UQJ|w/\u00192mK*\u0019\u0011\u0011\f\u0018\t\u000f\u0005\r\u0004\u0001\"\u0003\u0002f\u0005Ab/\u001a:jMf$&/\u00198tY\u0006$\u0018n\u001c8Tk\u000e\u001cWm]:\u0015\u00071\n9\u0007C\u0004\u0006\u0003C\u0002\r!!\u0007\t\u0013\u0005-\u0004!%A\u0005\n\u00055\u0014aH3ya\u0016\u001cG/\u0012=dKB$\u0018n\u001c8UQJ|wO\u001c\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011q\u000e\u0019\u0005\u0003c\n)\b\u0005\u0004\u0002\u001c\u0005m\u00121\u000f\t\u0005\u0003\u0003\n)\b\u0002\u0007\u0002F\u0005%\u0014\u0011!A\u0001\u0006\u0003\t9\u0005")
public class TemporalJoinTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    public StreamTableTestUtil util() {
        return this.util;
    }

    @BeforeEach
    public void before() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE Orders (\n                    | amount INT,\n                    | currency STRING,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistory (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistoryWithPK (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesBinlogWithComputedColumn (\n                    | currency STRING,\n                    | rate INT,\n                    | rate1 AS rate + 1,\n                    | proctime AS PROCTIME(),\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UB,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesBinlogWithoutWatermark (\n                    | currency STRING,\n                    | rate INT,\n                    | rate1 AS rate + 1,\n                    | proctime AS PROCTIME(),\n                    | rowtime TIMESTAMP(3),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UB,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE UpsertRates (\n                    | currency STRING,\n                    | rate INT,\n                    | valid VARCHAR,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesOnly (\n                    | currency STRING,\n                    | rate INT,\n                    | proctime AS PROCTIME()\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistoryLegacy (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().addTable(" CREATE VIEW rates_last_row_rowtime AS SELECT currency, rate, rowtime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime DESC) AS rowNum    FROM RatesHistory  ) T   WHERE rowNum = 1");
        this.util().addTable(" CREATE VIEW rates_last_row_proctime AS SELECT T.currency, T.rate, T.proctime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY proctime DESC) AS rowNum    FROM RatesOnly  ) T   WHERE T.rowNum = 1");
        this.util().addTable("CREATE VIEW rates_last_value AS SELECT currency, LAST_VALUE(rate) AS rate FROM RatesHistory GROUP BY currency ");
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE OrdersLtz (\n                                | amount INT,\n                                | currency STRING,\n                                | ts BIGINT,\n                                | rowtime AS TO_TIMESTAMP_LTZ(ts, 3),\n                                | WATERMARK FOR rowtime AS rowtime\n                                |) WITH (\n                                | 'connector' = 'values'\n                                |)\n      ")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE RatesLtz (\n                                | currency STRING,\n                                | rate INT,\n                                | ts BIGINT,\n                                | rowtime as TO_TIMESTAMP_LTZ(ts, 3),\n                                | WATERMARK FOR rowtime AS rowtime,\n                                | PRIMARY KEY(currency) NOT ENFORCED\n                                |) WITH (\n                                | 'connector' = 'values'\n                                |)\n      ")).stripMargin());
    }

    @Test
    public void testEventTimeTemporalJoinOnLegacySource() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryLegacy FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinOnLegacySource() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryLegacy FOR SYSTEM_TIME AS OF o.proctime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoin() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinOnTimestampLtzRowtime() {
        String sqlQuery = "SELECT * FROM OrdersLtz AS o JOIN RatesLtz FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithFunctionCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND 'RMB-100' = concat('RMB-', cast(r.rate AS STRING))";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewNonEqui() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND o.amount > r.rate";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeLeftTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o LEFT JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithLastRowView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_proctime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithLastValueView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewNonEqui() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > r.rate";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithComputedColumnAndPushDown() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithComputedColumn FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithComputedColumnAndPushDown() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithComputedColumn FOR SYSTEM_TIME AS OF o.rowtime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithBinlogSource() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithoutWatermark FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeLeftTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o LEFT JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithFunctionCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND 'RMB-100' = concat('RMB-', cast(r.rate AS STRING))";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testInvalidTemporalTablJoin() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE leftTableWithoutTimeAttribute (\n                    | amount INT,\n                    | currency STRING,\n                    | ts TIMESTAMP(3)\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery1 = "SELECT * FROM leftTableWithoutTimeAttribute AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.ts AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery1, new StringBuilder(101).append("Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'").append(" left table's time attribute field").toString(), ValidationException.class);
        String sqlQuery2 = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.amount = r.rate";
        this.expectExceptionThrown(sqlQuery2, "Temporal table's primary key [currency0] must be included in the equivalence condition of temporal join, but current temporal join condition is [amount=rate].", ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutPk (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery3 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutPk FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery3, "Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:\nFlinkLogicalJoin(condition=[AND(=($1, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $6, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])", ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutTimeAttribute (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery4 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutTimeAttribute FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery4, new StringBuilder(139).append("Event-Time Temporal Table Join requires both primary key and row time attribute in ").append("versioned table, but no row time attribute can be found.").toString(), ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutRowtime (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery5 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutRowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery5, new StringBuilder(139).append("Event-Time Temporal Table Join requires both primary key and row time attribute in ").append("versioned table, but no row time attribute can be found.").toString(), ValidationException.class);
        String sqlQuery8 = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT *\n         | FROM OrdersLtz AS o JOIN\n         | RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r\n         | ON o.currency = r.currency\n          ")).stripMargin();
        this.expectExceptionThrown(sqlQuery8, "Event-Time Temporal Table Join requires same rowtime type in left table and versioned table, but the rowtime types are TIMESTAMP_LTZ(3) *ROWTIME* and TIMESTAMP(3) *ROWTIME*.", ValidationException.class);
    }

    @Test
    public void testEventTimeTemporalJoinToSinkWithPk() {
        TableEnvironment tEnv = this.util().tableEnv();
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE orders_rowtime (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  amount BIGINT,\n                       |  order_time TIMESTAMP(3),\n                       |  WATERMARK FOR order_time AS order_time,\n                       |  PRIMARY KEY (order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = 'rowTimeOrderDataId'\n                       |)\n                       |")).stripMargin());
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE versioned_currency_with_multi_key (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '10' SECOND,\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = 'rowTimeCurrencyDataId'\n                       |)\n                       |")).stripMargin());
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE rowtime_default_sink (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  amount BIGINT,\n                       |  l_time TIMESTAMP(3),\n                       |  rate BIGINT,\n                       |  r_time TIMESTAMP(3),\n                       |  PRIMARY KEY(order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'false',\n                       |  'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO rowtime_default_sink\n        |  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time\n        |  FROM orders_rowtime AS o JOIN versioned_currency_with_multi_key\n        |    FOR SYSTEM_TIME AS OF o.order_time as r\n        |      ON o.currency_no = r.currency_no AND o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinUpsertSourceWithPostFilter() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN UpsertRates FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency WHERE valid = 'true'";
        this.util().verifyExplain(sqlQuery, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinUpsertSourceWithPreFilter() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY VIEW V1 AS\n                                |SELECT * FROM UpsertRates WHERE valid = 'true'\n                                |")).stripMargin());
        String sqlQuery = "SELECT * FROM Orders AS o JOIN V1 FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery, "Filter is not allowed for right changelog input of event time temporal join, it will corrupt the versioning of data.", TableException.class);
    }

    private void expectExceptionThrown(String sql, String keywords, Class<? extends Throwable> clazz) {
        ThrowableAssert.ThrowingCallable callable = () -> this.verifyTranslationSuccess(sql);
        if (keywords != null) {
            Assertions.assertThatExceptionOfType(clazz).isThrownBy(callable).withMessageContaining(keywords);
        } else {
            Assertions.assertThatExceptionOfType(clazz).isThrownBy(callable);
        }
    }

    private Class<? extends Throwable> expectExceptionThrown$default$3() {
        return ValidationException.class;
    }

    private void verifyTranslationSuccess(String sql) {
        this.util().tableEnv().sqlQuery(sql).explain(new ExplainDetail[0]);
    }
}

