/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\u0005Eh\u0001B\u0001\u0003\u0001M\u0011!\u0003V3na>\u0014\u0018\r\u001c&pS:LEkQ1tK*\u00111\u0001B\u0001\u0004gFd'BA\u0003\u0007\u0003\u0019\u0019HO]3b[*\u0011q\u0001C\u0001\beVtG/[7f\u0015\tI!\"A\u0004qY\u0006tg.\u001a:\u000b\u0005-a\u0011!\u0002;bE2,'BA\u0007\u000f\u0003\u00151G.\u001b8l\u0015\ty\u0001#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002#\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\u0006\t\u0003+ai\u0011A\u0006\u0006\u0003/\u0019\tQ!\u001e;jYNL!!\u0007\f\u00035M#(/Z1nS:<w+\u001b;i'R\fG/\u001a+fgR\u0014\u0015m]3\t\u0011m\u0001!\u0011!Q\u0001\nq\tQa\u001d;bi\u0016\u0004\"!H\u0019\u000f\u0005yycBA\u0010/\u001d\t\u0001SF\u0004\u0002\"Y9\u0011!e\u000b\b\u0003G)r!\u0001J\u0015\u000f\u0005\u0015BS\"\u0001\u0014\u000b\u0005\u001d\u0012\u0012A\u0002\u001fs_>$h(C\u0001\u0012\u0013\ty\u0001#\u0003\u0002\u000e\u001d%\u00111\u0002D\u0005\u0003\u0013)I!a\u0002\u0005\n\u0005]1\u0011B\u0001\u0019\u0017\u0003i\u0019FO]3b[&twmV5uQN#\u0018\r^3UKN$()Y:f\u0013\t\u00114G\u0001\tTi\u0006$XMQ1dW\u0016tG-T8eK*\u0011\u0001G\u0006\u0005\u0006k\u0001!\tAN\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005]J\u0004C\u0001\u001d\u0001\u001b\u0005\u0011\u0001\"B\u000e5\u0001\u0004a\u0002\"B\u001e\u0001\t\u0003a\u0014AD;tKNdUmZ1dsJ{wo]\u000b\u0002{A\u0011a\bQ\u0007\u0002\u007f)\u0011qCC\u0005\u0003\u0003~\u0012\u0011\u0003T3hC\u000eL(k\\<SKN|WO]2fQ\tQ4\t\u0005\u0002E\u000f6\tQI\u0003\u0002G!\u0005)!.\u001e8ji&\u0011\u0001*\u0012\u0002\u0005%VdW\rC\u0004K\u0001\t\u0007I\u0011A&\u0002#A\u0014xn\u0019+j[\u0016|%\u000fZ3s\t\u0006$\u0018-F\u0001M!\riEKV\u0007\u0002\u001d*\u0011q\nU\u0001\nS6lW\u000f^1cY\u0016T!!\u0015*\u0002\u0015\r|G\u000e\\3di&|gNC\u0001T\u0003\u0015\u00198-\u00197b\u0013\t)fJ\u0001\u0003MSN$\bCA,[\u001b\u0005A&BA-\r\u0003\u0015!\u0018\u0010]3t\u0013\tY\u0006LA\u0002S_^Da!\u0018\u0001!\u0002\u0013a\u0015A\u00059s_\u000e$\u0016.\\3Pe\u0012,'\u000fR1uC\u0002Bqa\u0018\u0001C\u0002\u0013\u00051*\u0001\u000bqe>\u001cG+[7f\u0007V\u0014(/\u001a8ds\u0012\u000bG/\u0019\u0005\u0007C\u0002\u0001\u000b\u0011\u0002'\u0002+A\u0014xn\u0019+j[\u0016\u001cUO\u001d:f]\u000eLH)\u0019;bA!91\r\u0001b\u0001\n\u0003Y\u0015!\b9s_\u000e$\u0016.\\3DkJ\u0014XM\\2z\u0007\"\fgnZ3m_\u001e$\u0015\r^1\t\r\u0015\u0004\u0001\u0015!\u0003M\u0003y\u0001(o\\2US6,7)\u001e:sK:\u001c\u0017p\u00115b]\u001e,Gn\\4ECR\f\u0007\u0005C\u0004h\u0001\t\u0007I\u0011A&\u0002!I|w\u000fV5nK>\u0013H-\u001a:ECR\f\u0007BB5\u0001A\u0003%A*A\ts_^$\u0016.\\3Pe\u0012,'\u000fR1uC\u0002Bqa\u001b\u0001C\u0002\u0013\u00051*\u0001\u0011s_^$\u0016.\\3DkJ\u0014XM\\2z\t\u0006$\u0018-V:j]\u001elU\r^1US6,\u0007BB7\u0001A\u0003%A*A\u0011s_^$\u0016.\\3DkJ\u0014XM\\2z\t\u0006$\u0018-V:j]\u001elU\r^1US6,\u0007\u0005C\u0004p\u0001\t\u0007I\u0011A&\u0002EI|w\u000fV5nK\u000e+(O]3oGf$\u0015\r^1Vg&twMQ3g_J,G+[7f\u0011\u0019\t\b\u0001)A\u0005\u0019\u0006\u0019#o\\<US6,7)\u001e:sK:\u001c\u0017\u0010R1uCV\u001b\u0018N\\4CK\u001a|'/\u001a+j[\u0016\u0004\u0003bB:\u0001\u0005\u0004%\taS\u0001\u0019kB\u001cXM\u001d;T_V\u00148-Z\"veJ,gnY=ECR\f\u0007BB;\u0001A\u0003%A*A\rvaN,'\u000f^*pkJ\u001cWmQ;se\u0016t7-\u001f#bi\u0006\u0004\u0003bB<\u0001\u0005\u0004%\taS\u0001\u001ee><H+[7f\u0013:\u001cXM\u001d;P]2L8)\u001e:sK:\u001c\u0017\u0010R1uC\"1\u0011\u0010\u0001Q\u0001\n1\u000baD]8x)&lW-\u00138tKJ$xJ\u001c7z\u0007V\u0014(/\u001a8ds\u0012\u000bG/\u0019\u0011\t\u000bm\u0004A\u0011\u0001?\u0002\u000fA\u0014X\r]1sKR\tQ\u0010\u0005\u0002\u007f\u007f6\t!+C\u0002\u0002\u0002I\u0013A!\u00168ji\"\u001a!0!\u0002\u0011\u0007\u0011\u000b9!C\u0002\u0002\n\u0015\u0013aAQ3g_J,\u0007BBA\u0007\u0001\u0011\u0005A0\u0001\ruKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:DC!a\u0003\u0002\u0012A\u0019A)a\u0005\n\u0007\u0005UQI\u0001\u0003UKN$\bBBA\r\u0001\u0011\u0005A0\u0001\u000fuKN$\bK]8d)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8)\t\u0005]\u0011\u0011\u0003\u0005\u0007\u0003?\u0001A\u0011\u0001?\u0002OQ,7\u000f\u001e)s_\u000e$\u0016.\\3UK6\u0004xN]1m\u0015>Lgn\u00115b]\u001e,Gn\\4T_V\u00148-\u001a\u0015\u0005\u0003;\t\t\u0002\u0003\u0004\u0002&\u0001!\t\u0001`\u0001!i\u0016\u001cH\u000f\u0015:pGRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i-&,w\u000f\u000b\u0003\u0002$\u0005E\u0001BBA\u0016\u0001\u0011\u0005A0\u0001\u0013uKN$\bK]8d)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8XSRDg+[3xQ\u0011\tI#!\u0005\t\r\u0005E\u0002\u0001\"\u0001}\u0003\u001d\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"4\u0016.Z<O_:,\u0015/^5)\t\u0005=\u0012\u0011\u0003\u0005\u0007\u0003o\u0001A\u0011\u0001?\u0002eQ,7\u000f\u001e)s_\u000e$\u0016.\\3MK\u001a$H+Z7q_J\fGNS8j]^KG\u000f\u001b,jK^<\u0016\u000e\u001e5Qe\u0016$\u0017nY1uKNDC!!\u000e\u0002\u0012!1\u0011Q\b\u0001\u0005\u0002q\fQ\u0004^3tiB\u0013xn\u0019+j[\u0016lU\u000f\u001c;j)\u0016l\u0007o\u001c:bY*{\u0017N\u001c\u0015\u0005\u0003w\t\t\u0002\u0003\u0004\u0002D\u0001!\t\u0001`\u0001\u001ai\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>Lg\u000e\u000b\u0003\u0002B\u0005E\u0001BBA%\u0001\u0011\u0005A0\u0001\u0018uKN$XI^3oiRKW.\u001a+f[B|'/\u00197K_&tG\u000b[1u\u0015>Lgn[3z\u0007>tG/Y5ogB[\u0007\u0006BA$\u0003#Aa!a\u0014\u0001\t\u0003a\u0018a\t;fgR,e/\u001a8u)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5GS2$XM\u001d\u0015\u0005\u0003\u001b\n\t\u0002\u0003\u0004\u0002V\u0001!\t\u0001`\u0001\u001ei\u0016\u001cH/\u0012<f]R$\u0016.\\3MK\u001a$H+Z7q_J\fGNS8j]\"\"\u00111KA\t\u0011\u0019\tY\u0006\u0001C\u0001y\u0006\tD/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o\u0007\"\fgnZ3m_\u001e,6/\u001b8h\u0005\u00164wN]3US6,\u0007\u0006BA-\u0003#Aa!!\u0019\u0001\t\u0003a\u0018!\u000b;fgR,e/\u001a8u)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8VaN,'\u000f^*pkJ\u001cW\r\u000b\u0003\u0002`\u0005E\u0001BBA4\u0001\u0011\u0005A0\u0001\u0014uKN$XI^3oiRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i\u001bVdG/[&fsNDC!!\u001a\u0002\u0012!1\u0011Q\u000e\u0001\u0005\u0002q\fa\u0006^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]^KG\u000f\u001b(p]\u0016\u000bX/\u00197D_:$\u0017\u000e^5p]\"\"\u00111NA\t\u0011\u0019\t\u0019\b\u0001C\u0001y\u0006aC/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o\u000bF,\u0018\r\\\"p]\u0012LG/[8o\u001f:\\U-\u001f\u0015\u0005\u0003c\n\t\u0002\u0003\u0004\u0002z\u0001!\t\u0001`\u0001\u001fi\u0016\u001cH/\u0012<f]R$\u0016.\\3Nk2$\u0018\u000eV3na>\u0014\u0018\r\u001c&pS:DC!a\u001e\u0002\u0012!1\u0011q\u0010\u0001\u0005\u0002q\f\u0011\u0007^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]^KG\u000f\u001b#fIV\u0004H.[2bi\u00164\u0015N]:u-&,w\u000f\u000b\u0003\u0002~\u0005E\u0001BBAC\u0001\u0011\u0005A0\u0001\u0019uKN$XI^3oiRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i\t\u0016$W\u000f\u001d7jG\u0006$X\rT1tiZKWm\u001e\u0015\u0005\u0003\u0007\u000b\t\u0002\u0003\u0004\u0002\f\u0002!\t\u0001`\u0001&i\u0016\u001cH/\u0012<f]R$\u0016.\\3MK\u001a$H+Z7q_J\fGNS8j]^KG\u000f\u001b,jK^DC!!#\u0002\u0012!1\u0011\u0011\u0013\u0001\u0005\u0002q\fa\u0005^3ti6Kg.\u001b\"bi\u000eDWI^3oiRKW.\u001a,jK^$V-\u001c9pe\u0006d'j\\5oQ\u0011\ty)!\u0005\t\u000f\u0005]\u0005\u0001\"\u0003\u0002\u001a\u0006y1M]3bi\u0016\u001c\u0016N\\6UC\ndW\rF\u0003~\u00037\u000by\u000b\u0003\u0005\u0002\u001e\u0006U\u0005\u0019AAP\u0003%!\u0018M\u00197f\u001d\u0006lW\r\u0005\u0003\u0002\"\u0006%f\u0002BAR\u0003K\u0003\"!\n*\n\u0007\u0005\u001d&+\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003W\u000biK\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003O\u0013\u0006\u0002CAY\u0003+\u0003\r!a-\u0002\u000f\r|G.^7ogB)a0!.\u0002 &\u0019\u0011q\u0017*\u0003\r=\u0003H/[8o\u0011\u001d\tY\f\u0001C\u0005\u0003{\u000bAb\u00195b]\u001e,Gn\\4S_^$RAVA`\u0003\u0007D\u0001\"!1\u0002:\u0002\u0007\u0011qT\u0001\u0005W&tG\r\u0003\u0005\u0002F\u0006e\u0006\u0019AAd\u0003\u00191\u0018\r\\;fgB)a0!3\u0002N&\u0019\u00111\u001a*\u0003\u0015q\u0012X\r]3bi\u0016$g\bE\u0002\u007f\u0003\u001fL1!!5S\u0005\r\te.\u001f\u0015\b\u0001\u0005U\u0017\u0011]Ar!\u0011\t9.!8\u000e\u0005\u0005e'bAAn\u000b\u00061!/\u001e8oKJLA!a8\u0002Z\n9!+\u001e8XSRD\u0017!\u0002<bYV,7EAAs!\u0011\t9/!<\u000e\u0005\u0005%(bAAv\u000b\u00069!/\u001e8oKJ\u001c\u0018\u0002BAx\u0003S\u0014Q\u0002U1sC6,G/\u001a:ju\u0016$\u0007")
public class TemporalJoinITCase
extends StreamingWithStateTestBase {
    private final List<Row> procTimeOrderData = new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), "Euro", "no1", BoxesRunTime.boxToLong((long)12L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)14L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), "US Dollar", "no2", BoxesRunTime.boxToLong((long)18L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)4L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L)})), (List)Nil$.MODULE$))));
    private final List<Row> procTimeCurrencyData = new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no2", BoxesRunTime.boxToLong((long)106L)})), (List)Nil$.MODULE$))))));
    private final List<Row> procTimeCurrencyChangelogData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L)})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L)})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)802L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no2", BoxesRunTime.boxToLong((long)106L)}))}));
    private final List<Row> rowTimeOrderData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), "Euro", "no1", BoxesRunTime.boxToLong((long)12L), "2020-08-15T00:01:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:02:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-15T00:03:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)4L), "Euro", "no1", BoxesRunTime.boxToLong((long)14L), "2020-08-16T00:04:00"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-16T00:03:00"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)18L), "2020-08-16T00:03:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)5L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-16T00:03:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)6L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-16T00:04:00"})), this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)6L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-16T00:04:00"}))}));
    private final List<Row> rowTimeCurrencyDataUsingMetaTime = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-16T00:01:00"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-16T00:02:00"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)106L), "2020-08-16T00:02:00"})), this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)708L), "2020-08-16T00:02:00"}))}));
    private final List<Row> rowTimeCurrencyDataUsingBeforeTime = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)106L), "2020-08-16T00:02:00"})), this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"}))}));
    private final List<Row> upsertSourceCurrencyData = new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)104L), "2020-08-16T00:02:00"})), (List)new .colon.colon((Object)this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), (List)Nil$.MODULE$)))))));
    private final List<Row> rowTimeInsertOnlyCurrencyData = new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-16T00:02:00"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)106L), "2020-08-16T00:02:00"})), (List)Nil$.MODULE$)))))));

    @Rule
    public LegacyRowResource usesLegacyRows() {
        return LegacyRowResource.INSTANCE;
    }

    public List<Row> procTimeOrderData() {
        return this.procTimeOrderData;
    }

    public List<Row> procTimeCurrencyData() {
        return this.procTimeCurrencyData;
    }

    public List<Row> procTimeCurrencyChangelogData() {
        return this.procTimeCurrencyChangelogData;
    }

    public List<Row> rowTimeOrderData() {
        return this.rowTimeOrderData;
    }

    public List<Row> rowTimeCurrencyDataUsingMetaTime() {
        return this.rowTimeCurrencyDataUsingMetaTime;
    }

    public List<Row> rowTimeCurrencyDataUsingBeforeTime() {
        return this.rowTimeCurrencyDataUsingBeforeTime;
    }

    public List<Row> upsertSourceCurrencyData() {
        return this.upsertSourceCurrencyData;
    }

    public List<Row> rowTimeInsertOnlyCurrencyData() {
        return this.rowTimeInsertOnlyCurrencyData;
    }

    @Before
    public void prepare() {
        String procTimeOrderDataId = TestValuesTableFactory.registerData(this.procTimeOrderData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(499).append("\n                       |CREATE TABLE orders_proctime (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  amount BIGINT,\n                       |  proctime as PROCTIME()\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'bounded' = 'false',\n                       |  'data-id' = '").append(procTimeOrderDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String procTimeCurrencyDataId = TestValuesTableFactory.registerData(this.procTimeCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(584).append("\n                       |CREATE TABLE currency_proctime (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate BIGINT,\n                       |  proctime as PROCTIME(),\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'bounded' = 'false',\n                       |  'disable-lookup' = 'true',\n                       |  'data-id' = '").append(procTimeCurrencyDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String procTimeCurrencyChangelogDataId = TestValuesTableFactory.registerData(this.procTimeCurrencyChangelogData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(652).append("\n                       |CREATE TABLE changelog_currency_proctime (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate BIGINT,\n                       |  proctime as PROCTIME(),\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'bounded' = 'false',\n                       |  'disable-lookup' = 'true',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(procTimeCurrencyChangelogDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE VIEW latest_rates AS\n                       |SELECT\n                       |  currency,\n                       |  currency_no,\n                       |  rate,\n                       |  proctime FROM\n                       |      ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency, currency_no\n                       |        ORDER BY proctime DESC) AS rowNum\n                       |        FROM currency_proctime) T\n                       | WHERE rowNum = 1")).stripMargin());
        this.createSinkTable("proctime_default_sink", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n              |  order_id BIGINT,\n              |  currency STRING,\n              |  amount BIGINT,\n              |  l_time TIMESTAMP_LTZ(3),\n              |  rate BIGINT,\n              |  r_time TIMESTAMP_LTZ(3),\n              |  PRIMARY KEY(order_id) NOT ENFORCED\n              |")).stripMargin()));
        String rowTimeOrderDataId = TestValuesTableFactory.registerData(this.rowTimeOrderData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(639).append("\n                       |CREATE TABLE orders_rowtime (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  amount BIGINT,\n                       |  order_time TIMESTAMP(3),\n                       |  WATERMARK FOR order_time AS order_time,\n                       |  PRIMARY KEY (order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(rowTimeOrderDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String rowTimeCurrencyDataId = TestValuesTableFactory.registerData(this.rowTimeCurrencyDataUsingMetaTime());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(646).append("\n                       |CREATE TABLE versioned_currency_with_single_key (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '10' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(rowTimeCurrencyDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(658).append("\n                       |CREATE TABLE versioned_currency_with_multi_key (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '10' SECOND,\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(rowTimeCurrencyDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String currencyDataUsingBeforeTimeId = TestValuesTableFactory.registerData(this.rowTimeCurrencyDataUsingBeforeTime());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(641).append("\n                       |CREATE TABLE currency_using_update_before_time (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '2' DAY,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(currencyDataUsingBeforeTimeId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String upsertSourceDataId = TestValuesTableFactory.registerData(this.upsertSourceCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(618).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '2' DAY,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(upsertSourceDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.createSinkTable("rowtime_default_sink", (Option<String>)None$.MODULE$);
        String rowTimeInsertOnlyCurrencyDataId = TestValuesTableFactory.registerData(this.rowTimeInsertOnlyCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(538).append("\n                       |CREATE TABLE currency_history (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '0.001' SECOND\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(rowTimeInsertOnlyCurrencyDataId).append("',\n                       |  'changelog-mode' = 'I')\n                       |  ").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE VIEW currency_deduplicated_first_row AS\n         |SELECT\n         |  currency,\n         |  currency_no,\n         |  rate,\n         |  currency_time FROM\n         |      (SELECT *, ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time)\n         |       AS rowNum FROM currency_history) T\n         | WHERE rowNum = 1")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE VIEW currency_deduplicated_last_row AS\n         |SELECT\n         |  currency,\n         |  currency_no,\n         |  rate,\n         |  currency_time FROM\n         |      (SELECT *, ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time DESC)\n         |       AS rowNum FROM currency_history) T\n         | WHERE rowNum = 1")).stripMargin());
    }

    @Test
    public void testProcTimeTemporalJoin() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  JOIN currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testProcTimeLeftTemporalJoin() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  LEFT JOIN currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testProcTimeTemporalJoinChangelogSource() {
        this.createSinkTable("proctime_sink1", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n              | currency STRING,\n              | currency_no STRING,\n              | rate BIGINT,\n              | proctime TIMESTAMP_LTZ(3)\n              | ")).stripMargin()));
        String sql = "INSERT INTO proctime_sink1  SELECT r.* FROM orders_proctime AS o  JOIN changelog_currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testProcTimeTemporalJoinWithView() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testProcTimeLeftTemporalJoinWithView() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testProcTimeTemporalJoinWithViewNonEqui() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime AS r  ON o.currency = r.currency and o.currency_no = r.currency_no  AND o.amount > r.rate";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testProcTimeLeftTemporalJoinWithViewWithPredicates() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime AS r  ON o.currency = r.currency and o.currency_no = r.currency_no AND o.amount > r.rate";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testProcTimeMultiTemporalJoin() {
        this.createSinkTable("proctime_sink2", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n              |  order_id BIGINT,\n              |  currency STRING,\n              |  amount BIGINT,\n              |  l_time TIMESTAMP_LTZ(3),\n              |  rate BIGINT,\n              |  r_time TIMESTAMP_LTZ(3),\n              |  PRIMARY KEY(order_id) NOT ENFORCED\n              |")).stripMargin()));
        String sql = "INSERT INTO proctime_sink2  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r1.proctime  FROM orders_proctime AS o  JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no  JOIN currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r1 ON o.currency = r1.currency and o.currency_no = r1.currency_no";
        this.expectedException().expect(TableException.class);
        this.expectedException().expectMessage("Processing-time temporal join is not supported yet.");
        this.tEnv().executeSql(sql).await();
    }

    @Test
    public void testEventTimeTemporalJoin() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinThatJoinkeyContainsPk() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency AND o.currency_no = r.currency_no";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinWithFilter() {
        this.tEnv().executeSql("CREATE VIEW v1 AS SELECT * FROM versioned_currency_with_single_key");
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  JOIN v1 FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency WHERE rate < 115";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)Nil$.MODULE$));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeLeftTemporalJoin() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o LEFT JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinChangelogUsingBeforeTime() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o LEFT JOIN currency_using_update_before_time  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,null,null", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeLeftTemporalJoinUpsertSource() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o LEFT JOIN upsert_currency  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency ";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,104,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,null,null", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinWithMultiKeys() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN versioned_currency_with_multi_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency_no = r.currency_no AND o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinWithNonEqualCondition() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN currency_using_update_before_time  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency and o.currency_no = r.currency_no  and o.order_id < 5 and r.rate > 102";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$)));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinEqualConditionOnKey() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN currency_using_update_before_time  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency and o.currency_no = r.currency_no  and o.currency = 'Euro' and r.rate > 102";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeMultiTemporalJoin() {
        this.createSinkTable("rowtime_sink1", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n           |  order_id BIGINT,\n           |  currency STRING,\n           |  amount BIGINT,\n           |  l_time TIMESTAMP(3),\n           |  rate BIGINT,\n           |  r_time TIMESTAMP(3),\n           |  r1_rate BIGINT,\n           |  r1_time TIMESTAMP(3),\n           |  PRIMARY KEY(order_id) NOT ENFORCED\n           |")).stripMargin()));
        String sql = "INSERT INTO rowtime_sink1  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time, r1.rate, r1.currency_time FROM orders_rowtime AS o  LEFT JOIN versioned_currency_with_multi_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency and o.currency_no = r.currency_no  LEFT JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r1  ON o.currency = r1.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null,null,null", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_sink1")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinWithDeduplicateFirstView() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  LEFT JOIN currency_deduplicated_first_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,102,2020-08-15T00:00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,702,2020-08-15T00:00:04", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTemporalJoinWithDeduplicateLastView() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  JOIN currency_deduplicated_last_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,702,2020-08-15T00:00:04", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeLeftTemporalJoinWithView() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  LEFT JOIN currency_deduplicated_last_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency AND substr(o.currency, 1, 2) = 'US' ";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,null,null", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,null,null", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,null,null", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testMiniBatchEventTimeViewTemporalJoin() {
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY.key(), "10 s");
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, (Object)BoxesRunTime.boxToLong((long)4L));
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN  currency_deduplicated_last_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,702,2020-08-15T00:00:04", (List)Nil$.MODULE$)))));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResults("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$));
    }

    private void createSinkTable(String tableName, Option<String> columns) {
        String string;
        Option<String> option = columns;
        if (option instanceof Some) {
            String cols;
            Some some = (Some)option;
            string = cols = (String)some.value();
        } else {
            string = new StringOps(Predef$.MODULE$.augmentString("\n           |  order_id BIGINT,\n           |  currency STRING,\n           |  amount BIGINT,\n           |  l_time TIMESTAMP(3),\n           |  rate BIGINT,\n           |  r_time TIMESTAMP(3),\n           |  PRIMARY KEY(order_id) NOT ENFORCED\n           |")).stripMargin();
        }
        String columnsDDL = string;
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(313).append("\n                       |CREATE TABLE ").append(tableName).append(" (\n                       | ").append(columnsDDL).append("\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'false',\n                       |  'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
    }

    private Row changelogRow(String kind, Seq<Object> values) {
        Seq objects = (Seq)values.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Object object;
            Object object2 = x0$1;
            if (object2 instanceof Long) {
                long l = BoxesRunTime.unboxToLong((Object)object2);
                object = BoxesRunTime.boxToLong((long)l);
            } else if (object2 instanceof Integer) {
                int n = BoxesRunTime.unboxToInt((Object)object2);
                object = BoxesRunTime.boxToInteger((int)n);
            } else if (object2 instanceof String) {
                Object object3;
                String string = (String)object2;
                try {
                    object3 = LocalDateTime.parse(string);
                }
                catch (DateTimeParseException dateTimeParseException) {
                    object3 = string;
                }
                object = object3;
            } else if (object2 instanceof Object) {
                Object object4;
                object = object4 = object2;
            } else {
                throw new MatchError(object2);
            }
            return object;
        }, Seq$.MODULE$.canBuildFrom());
        return TestValuesTableFactory.changelogRow(kind, (Object[])objects.toArray(ClassTag$.MODULE$.Object()));
    }

    public TemporalJoinITCase(StreamingWithStateTestBase.StateBackendMode state) {
        super(state);
    }
}

