/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.utils.LegacyRowExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\t]a\u0001B\u0001\u0003\u0001M\u0011!\u0003V3na>\u0014\u0018\r\u001c&pS:LEkQ1tK*\u00111\u0001B\u0001\u0004gFd'BA\u0003\u0007\u0003\u0019\u0019HO]3b[*\u0011q\u0001C\u0001\beVtG/[7f\u0015\tI!\"A\u0004qY\u0006tg.\u001a:\u000b\u0005-a\u0011!\u0002;bE2,'BA\u0007\u000f\u0003\u00151G.\u001b8l\u0015\ty\u0001#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002#\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\u0006\t\u0003+ai\u0011A\u0006\u0006\u0003/\u0019\tQ!\u001e;jYNL!!\u0007\f\u00035M#(/Z1nS:<w+\u001b;i'R\fG/\u001a+fgR\u0014\u0015m]3\t\u0011m\u0001!\u0011!Q\u0001\nq\tQa\u001d;bi\u0016\u0004\"!H\u0019\u000f\u0005yycBA\u0010/\u001d\t\u0001SF\u0004\u0002\"Y9\u0011!e\u000b\b\u0003G)r!\u0001J\u0015\u000f\u0005\u0015BS\"\u0001\u0014\u000b\u0005\u001d\u0012\u0012A\u0002\u001fs_>$h(C\u0001\u0012\u0013\ty\u0001#\u0003\u0002\u000e\u001d%\u00111\u0002D\u0005\u0003\u0013)I!a\u0002\u0005\n\u0005]1\u0011B\u0001\u0019\u0017\u0003i\u0019FO]3b[&twmV5uQN#\u0018\r^3UKN$()Y:f\u0013\t\u00114G\u0001\tTi\u0006$XMQ1dW\u0016tG-T8eK*\u0011\u0001G\u0006\u0005\u0006k\u0001!\tAN\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005]J\u0004C\u0001\u001d\u0001\u001b\u0005\u0011\u0001\"B\u000e5\u0001\u0004a\u0002bB\u001e\u0001\u0005\u0004%I\u0001P\u0001\u0002?V\tQ\bE\u0002?\u0007\u0016k\u0011a\u0010\u0006\u0003\u0001\u0006\u000b\u0011\u0002^3tiV$\u0018\u000e\\:\u000b\u0005\tc\u0011\u0001B2pe\u0016L!\u0001R \u0003'\u0015\u000b7\r[\"bY2\u0014\u0017mY6Xe\u0006\u0004\b/\u001a:\u0011\u0005\u0019CU\"A$\u000b\u0005]Q\u0011BA%H\u0005IaUmZ1dsJ{w/\u0012=uK:\u001c\u0018n\u001c8\t\r-\u0003\u0001\u0015!\u0003>\u0003\ty\u0006\u0005\u000b\u0002K\u001bB\u0011ajV\u0007\u0002\u001f*\u0011\u0001+U\u0001\nKb$XM\\:j_:T!AU*\u0002\u0007\u0005\u0004\u0018N\u0003\u0002U+\u00069!.\u001e9ji\u0016\u0014(B\u0001,\u0011\u0003\u0015QWO\\5u\u0013\tAvJA\tSK\u001eL7\u000f^3s\u000bb$XM\\:j_:DqA\u0017\u0001C\u0002\u0013\u00051,A\tqe>\u001cG+[7f\u001fJ$WM\u001d#bi\u0006,\u0012\u0001\u0018\t\u0004;\u00124W\"\u00010\u000b\u0005}\u0003\u0017!C5n[V$\u0018M\u00197f\u0015\t\t'-\u0001\u0006d_2dWm\u0019;j_:T\u0011aY\u0001\u0006g\u000e\fG.Y\u0005\u0003Kz\u0013A\u0001T5tiB\u0011qM[\u0007\u0002Q*\u0011\u0011\u000eD\u0001\u0006if\u0004Xm]\u0005\u0003W\"\u00141AU8x\u0011\u0019i\u0007\u0001)A\u00059\u0006\u0011\u0002O]8d)&lWm\u0014:eKJ$\u0015\r^1!\u0011\u001dy\u0007A1A\u0005\u0002m\u000bA\u0003\u001d:pGRKW.Z\"veJ,gnY=ECR\f\u0007BB9\u0001A\u0003%A,A\u000bqe>\u001cG+[7f\u0007V\u0014(/\u001a8ds\u0012\u000bG/\u0019\u0011\t\u000fM\u0004!\u0019!C\u00017\u0006i\u0002O]8d)&lWmQ;se\u0016t7-_\"iC:<W\r\\8h\t\u0006$\u0018\r\u0003\u0004v\u0001\u0001\u0006I\u0001X\u0001\u001faJ|7\rV5nK\u000e+(O]3oGf\u001c\u0005.\u00198hK2|w\rR1uC\u0002Bqa\u001e\u0001C\u0002\u0013\u00051,\u0001\ts_^$\u0016.\\3Pe\u0012,'\u000fR1uC\"1\u0011\u0010\u0001Q\u0001\nq\u000b\u0011C]8x)&lWm\u0014:eKJ$\u0015\r^1!\u0011\u001dY\bA1A\u0005\u0002m\u000b\u0001E]8x)&lWmQ;se\u0016t7-\u001f#bi\u0006,6/\u001b8h\u001b\u0016$\u0018\rV5nK\"1Q\u0010\u0001Q\u0001\nq\u000b\u0011E]8x)&lWmQ;se\u0016t7-\u001f#bi\u0006,6/\u001b8h\u001b\u0016$\u0018\rV5nK\u0002Bqa \u0001C\u0002\u0013\u00051,\u0001\u0012s_^$\u0016.\\3DkJ\u0014XM\\2z\t\u0006$\u0018-V:j]\u001e\u0014UMZ8sKRKW.\u001a\u0005\b\u0003\u0007\u0001\u0001\u0015!\u0003]\u0003\r\u0012xn\u001e+j[\u0016\u001cUO\u001d:f]\u000eLH)\u0019;b+NLgn\u001a\"fM>\u0014X\rV5nK\u0002B\u0001\"a\u0002\u0001\u0005\u0004%\taW\u0001\u0019kB\u001cXM\u001d;T_V\u00148-Z\"veJ,gnY=ECR\f\u0007bBA\u0006\u0001\u0001\u0006I\u0001X\u0001\u001akB\u001cXM\u001d;T_V\u00148-Z\"veJ,gnY=ECR\f\u0007\u0005\u0003\u0005\u0002\u0010\u0001\u0011\r\u0011\"\u0001\\\u0003u\u0011xn\u001e+j[\u0016Len]3si>sG._\"veJ,gnY=ECR\f\u0007bBA\n\u0001\u0001\u0006I\u0001X\u0001\u001fe><H+[7f\u0013:\u001cXM\u001d;P]2L8)\u001e:sK:\u001c\u0017\u0010R1uC\u0002Bq!a\u0006\u0001\t\u0003\tI\"A\u0004qe\u0016\u0004\u0018M]3\u0015\u0005\u0005m\u0001\u0003BA\u000f\u0003?i\u0011AY\u0005\u0004\u0003C\u0011'\u0001B+oSRDC!!\u0006\u0002&A!\u0011qEA\u0015\u001b\u0005\t\u0016bAA\u0016#\nQ!)\u001a4pe\u0016,\u0015m\u00195\t\u000f\u0005=\u0002\u0001\"\u0001\u0002\u001a\u0005AB/Z:u!J|7\rV5nKR+W\u000e]8sC2Tu.\u001b8)\t\u00055\u00121\u0007\t\u0005\u0003O\t)$C\u0002\u00028E\u0013A\u0002V3tiR+W\u000e\u001d7bi\u0016Dq!a\u000f\u0001\t\u0003\tI\"\u0001\u000fuKN$\bK]8d)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8)\t\u0005e\u00121\u0007\u0005\b\u0003\u0003\u0002A\u0011AA\r\u0003\u001d\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\\"iC:<W\r\\8h'>,(oY3)\t\u0005}\u00121\u0007\u0005\b\u0003\u000f\u0002A\u0011AA\r\u0003\u0001\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"4\u0016.Z<)\t\u0005\u0015\u00131\u0007\u0005\b\u0003\u001b\u0002A\u0011AA\r\u0003\u0011\"Xm\u001d;Qe>\u001cG+[7f\u0019\u00164G\u000fV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<\b\u0006BA&\u0003gAq!a\u0015\u0001\t\u0003\tI\"A\u0014uKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<hj\u001c8FcVL\u0007\u0006BA)\u0003gAq!!\u0017\u0001\t\u0003\tI\"\u0001\u001auKN$\bK]8d)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8XSRDg+[3x/&$\b\u000e\u0015:fI&\u001c\u0017\r^3tQ\u0011\t9&a\r\t\u000f\u0005}\u0003\u0001\"\u0001\u0002\u001a\u0005iB/Z:u!J|7\rV5nK6+H\u000e^5UK6\u0004xN]1m\u0015>Lg\u000e\u000b\u0003\u0002^\u0005M\u0002bBA3\u0001\u0011\u0005\u0011\u0011D\u0001\u001ai\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>Lg\u000e\u000b\u0003\u0002d\u0005M\u0002bBA6\u0001\u0011\u0005\u0011\u0011D\u0001/i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>Lg\u000e\u00165bi*{\u0017N\\6fs\u000e{g\u000e^1j]N\u00046\u000e\u000b\u0003\u0002j\u0005M\u0002bBA9\u0001\u0011\u0005\u0011\u0011D\u0001$i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>LgnV5uQ\u001aKG\u000e^3sQ\u0011\ty'a\r\t\u000f\u0005]\u0004\u0001\"\u0001\u0002\u001a\u0005iB/Z:u\u000bZ,g\u000e\u001e+j[\u0016dUM\u001a;UK6\u0004xN]1m\u0015>Lg\u000e\u000b\u0003\u0002v\u0005M\u0002bBA?\u0001\u0011\u0005\u0011\u0011D\u00012i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>Lgn\u00115b]\u001e,Gn\\4Vg&twMQ3g_J,G+[7fQ\u0011\tY(a\r\t\u000f\u0005\r\u0005\u0001\"\u0001\u0002\u001a\u0005IC/Z:u\u000bZ,g\u000e\u001e+j[\u0016dUM\u001a;UK6\u0004xN]1m\u0015>Lg.\u00169tKJ$8k\\;sG\u0016DC!!!\u00024!9\u0011\u0011\u0012\u0001\u0005\u0002\u0005e\u0011A\n;fgR,e/\u001a8u)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5Nk2$\u0018nS3zg\"\"\u0011qQA\u001a\u0011\u001d\ty\t\u0001C\u0001\u00033\ta\u0006^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]^KG\u000f\u001b(p]\u0016\u000bX/\u00197D_:$\u0017\u000e^5p]\"\"\u0011QRA\u001a\u0011\u001d\t)\n\u0001C\u0001\u00033\tA\u0006^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]\u0016\u000bX/\u00197D_:$\u0017\u000e^5p]>s7*Z=)\t\u0005M\u00151\u0007\u0005\b\u00037\u0003A\u0011AA\r\u0003y!Xm\u001d;Fm\u0016tG\u000fV5nK6+H\u000e^5UK6\u0004xN]1m\u0015>Lg\u000e\u000b\u0003\u0002\u001a\u0006M\u0002bBAQ\u0001\u0011\u0005\u0011\u0011D\u00012i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>LgnV5uQ\u0012+G-\u001e9mS\u000e\fG/\u001a$jeN$h+[3xQ\u0011\ty*a\r\t\u000f\u0005\u001d\u0006\u0001\"\u0001\u0002\u001a\u0005\u0001D/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o/&$\b\u000eR3ekBd\u0017nY1uK2\u000b7\u000f\u001e,jK^DC!!*\u00024!9\u0011Q\u0016\u0001\u0005\u0002\u0005e\u0011!\n;fgR,e/\u001a8u)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8XSRDg+[3xQ\u0011\tY+a\r\t\u000f\u0005M\u0006\u0001\"\u0001\u0002\u001a\u00051C/Z:u\u001b&t\u0017NQ1uG\",e/\u001a8u)&lWMV5foR+W\u000e]8sC2Tu.\u001b8)\t\u0005E\u00161\u0007\u0005\b\u0003s\u0003A\u0011BA^\u0003=\u0019'/Z1uKNKgn\u001b+bE2,GCBA\u000e\u0003{\u000b\t\u000e\u0003\u0005\u0002@\u0006]\u0006\u0019AAa\u0003%!\u0018M\u00197f\u001d\u0006lW\r\u0005\u0003\u0002D\u0006-g\u0002BAc\u0003\u000f\u0004\"!\n2\n\u0007\u0005%'-\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003\u001b\fyM\u0001\u0004TiJLgn\u001a\u0006\u0004\u0003\u0013\u0014\u0007\u0002CAj\u0003o\u0003\r!!6\u0002\u000f\r|G.^7ogB1\u0011QDAl\u0003\u0003L1!!7c\u0005\u0019y\u0005\u000f^5p]\"9\u0011Q\u001c\u0001\u0005\n\u0005}\u0017\u0001D2iC:<W\r\\8h%><H#\u00024\u0002b\u0006\u0015\b\u0002CAr\u00037\u0004\r!!1\u0002\t-Lg\u000e\u001a\u0005\t\u0003O\fY\u000e1\u0001\u0002j\u00061a/\u00197vKN\u0004b!!\b\u0002l\u0006=\u0018bAAwE\nQAH]3qK\u0006$X\r\u001a \u0011\t\u0005u\u0011\u0011_\u0005\u0004\u0003g\u0014'aA!os\":\u0001!a>\u0002~\u0006}\bc\u0001(\u0002z&\u0019\u00111`(\u0003\u0015\u0015CH/\u001a8e/&$\b.A\u0003wC2,X\r\f\u0002\u0003\u0002\r\u0012!1\u0001\t\u0005\u0005\u000b\u0011\u0019\"\u0004\u0002\u0003\b)!!\u0011\u0002B\u0006\u00035\u0001\u0018M]1nKR,'/\u001b>fI*!!Q\u0002B\b\u0003))\u0007\u0010^3og&|gn\u001d\u0006\u0004-\nE!B\u0001!\r\u0013\u0011\u0011)Ba\u0002\u00035A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0012=uK:\u001c\u0018n\u001c8")
public class TemporalJoinITCase
extends StreamingWithStateTestBase {
    @RegisterExtension
    private final EachCallbackWrapper<LegacyRowExtension> _ = new EachCallbackWrapper((CustomExtension)new LegacyRowExtension());
    private final List<Row> procTimeOrderData = new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), "Euro", "no1", BoxesRunTime.boxToLong((long)12L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)14L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), "US Dollar", "no2", BoxesRunTime.boxToLong((long)18L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)4L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L)})), (List)Nil$.MODULE$))));
    private final List<Row> procTimeCurrencyData = new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L)})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no2", BoxesRunTime.boxToLong((long)106L)})), (List)Nil$.MODULE$))))));
    private final List<Row> procTimeCurrencyChangelogData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L)})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L)})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)802L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L)})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no2", BoxesRunTime.boxToLong((long)106L)}))}));
    private final List<Row> rowTimeOrderData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), "Euro", "no1", BoxesRunTime.boxToLong((long)12L), "2020-08-15T00:01:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:02:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-15T00:03:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)4L), "Euro", "no1", BoxesRunTime.boxToLong((long)14L), "2020-08-16T00:04:00"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-16T00:03:00"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), "US Dollar", "no1", BoxesRunTime.boxToLong((long)18L), "2020-08-16T00:03:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)5L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-16T00:03:00"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)6L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-16T00:04:00"})), this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)6L), "RMB", "no1", BoxesRunTime.boxToLong((long)40L), "2020-08-16T00:04:00"}))}));
    private final List<Row> rowTimeCurrencyDataUsingMetaTime = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-16T00:01:00"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-16T00:02:00"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)106L), "2020-08-16T00:02:00"})), this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-16T00:02:00"}))}));
    private final List<Row> rowTimeCurrencyDataUsingBeforeTime = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Row[]{this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), this.changelogRow("-U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)106L), "2020-08-16T00:02:00"})), this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"}))}));
    private final List<Row> upsertSourceCurrencyData = new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), (List)new .colon.colon((Object)this.changelogRow("+U", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)104L), "2020-08-16T00:02:00"})), (List)new .colon.colon((Object)this.changelogRow("-D", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), (List)Nil$.MODULE$)))))));
    private final List<Row> rowTimeInsertOnlyCurrencyData = new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)114L), "2020-08-15T00:00:01"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-15T00:00:02"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Yen", "no1", BoxesRunTime.boxToLong((long)1L), "2020-08-15T00:00:03"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"RMB", "no1", BoxesRunTime.boxToLong((long)702L), "2020-08-15T00:00:04"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"Euro", "no1", BoxesRunTime.boxToLong((long)118L), "2020-08-16T00:01:00"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)102L), "2020-08-16T00:02:00"})), (List)new .colon.colon((Object)this.changelogRow("+I", (Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"US Dollar", "no1", BoxesRunTime.boxToLong((long)106L), "2020-08-16T00:02:00"})), (List)Nil$.MODULE$)))))));

    private EachCallbackWrapper<LegacyRowExtension> _() {
        return this._;
    }

    public List<Row> procTimeOrderData() {
        return this.procTimeOrderData;
    }

    public List<Row> procTimeCurrencyData() {
        return this.procTimeCurrencyData;
    }

    public List<Row> procTimeCurrencyChangelogData() {
        return this.procTimeCurrencyChangelogData;
    }

    public List<Row> rowTimeOrderData() {
        return this.rowTimeOrderData;
    }

    public List<Row> rowTimeCurrencyDataUsingMetaTime() {
        return this.rowTimeCurrencyDataUsingMetaTime;
    }

    public List<Row> rowTimeCurrencyDataUsingBeforeTime() {
        return this.rowTimeCurrencyDataUsingBeforeTime;
    }

    public List<Row> upsertSourceCurrencyData() {
        return this.upsertSourceCurrencyData;
    }

    public List<Row> rowTimeInsertOnlyCurrencyData() {
        return this.rowTimeInsertOnlyCurrencyData;
    }

    @BeforeEach
    public void prepare() {
        String procTimeOrderDataId = TestValuesTableFactory.registerData(this.procTimeOrderData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(499).append("\n                       |CREATE TABLE orders_proctime (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  amount BIGINT,\n                       |  proctime as PROCTIME()\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'bounded' = 'false',\n                       |  'data-id' = '").append(procTimeOrderDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String procTimeCurrencyDataId = TestValuesTableFactory.registerData(this.procTimeCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(584).append("\n                       |CREATE TABLE currency_proctime (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate BIGINT,\n                       |  proctime as PROCTIME(),\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'bounded' = 'false',\n                       |  'disable-lookup' = 'true',\n                       |  'data-id' = '").append(procTimeCurrencyDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String procTimeCurrencyChangelogDataId = TestValuesTableFactory.registerData(this.procTimeCurrencyChangelogData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(652).append("\n                       |CREATE TABLE changelog_currency_proctime (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate BIGINT,\n                       |  proctime as PROCTIME(),\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'bounded' = 'false',\n                       |  'disable-lookup' = 'true',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(procTimeCurrencyChangelogDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE VIEW latest_rates AS\n                       |SELECT\n                       |  currency,\n                       |  currency_no,\n                       |  rate,\n                       |  proctime FROM\n                       |      ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency, currency_no\n                       |        ORDER BY proctime DESC) AS rowNum\n                       |        FROM currency_proctime) T\n                       | WHERE rowNum = 1")).stripMargin());
        this.createSinkTable("proctime_default_sink", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n              |  order_id BIGINT,\n              |  currency STRING,\n              |  amount BIGINT,\n              |  l_time TIMESTAMP_LTZ(3),\n              |  rate BIGINT,\n              |  r_time TIMESTAMP_LTZ(3),\n              |  PRIMARY KEY(order_id) NOT ENFORCED\n              |")).stripMargin()));
        String rowTimeOrderDataId = TestValuesTableFactory.registerData(this.rowTimeOrderData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(639).append("\n                       |CREATE TABLE orders_rowtime (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  amount BIGINT,\n                       |  order_time TIMESTAMP(3),\n                       |  WATERMARK FOR order_time AS order_time,\n                       |  PRIMARY KEY (order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(rowTimeOrderDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String rowTimeCurrencyDataId = TestValuesTableFactory.registerData(this.rowTimeCurrencyDataUsingMetaTime());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(646).append("\n                       |CREATE TABLE versioned_currency_with_single_key (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '10' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(rowTimeCurrencyDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(658).append("\n                       |CREATE TABLE versioned_currency_with_multi_key (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '10' SECOND,\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(rowTimeCurrencyDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String currencyDataUsingBeforeTimeId = TestValuesTableFactory.registerData(this.rowTimeCurrencyDataUsingBeforeTime());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(641).append("\n                       |CREATE TABLE currency_using_update_before_time (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '2' DAY,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = '").append(currencyDataUsingBeforeTimeId).append("'\n                       |)\n                       |").toString())).stripMargin());
        String upsertSourceDataId = TestValuesTableFactory.registerData(this.upsertSourceCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(618).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '2' DAY,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(upsertSourceDataId).append("'\n                       |)\n                       |").toString())).stripMargin());
        this.createSinkTable("rowtime_default_sink", (Option<String>)None$.MODULE$);
        String rowTimeInsertOnlyCurrencyDataId = TestValuesTableFactory.registerData(this.rowTimeInsertOnlyCurrencyData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(538).append("\n                       |CREATE TABLE currency_history (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '0.001' SECOND\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(rowTimeInsertOnlyCurrencyDataId).append("',\n                       |  'changelog-mode' = 'I')\n                       |  ").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE VIEW currency_deduplicated_first_row AS\n         |SELECT\n         |  currency,\n         |  currency_no,\n         |  rate,\n         |  currency_time FROM\n         |      (SELECT *, ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time)\n         |       AS rowNum FROM currency_history) T\n         | WHERE rowNum = 1")).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE VIEW currency_deduplicated_last_row AS\n         |SELECT\n         |  currency,\n         |  currency_no,\n         |  rate,\n         |  currency_time FROM\n         |      (SELECT *, ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time DESC)\n         |       AS rowNum FROM currency_history) T\n         | WHERE rowNum = 1")).stripMargin());
    }

    @TestTemplate
    public void testProcTimeTemporalJoin() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  JOIN currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testProcTimeLeftTemporalJoin() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  LEFT JOIN currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testProcTimeTemporalJoinChangelogSource() {
        this.createSinkTable("proctime_sink1", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n              | currency STRING,\n              | currency_no STRING,\n              | rate BIGINT,\n              | proctime TIMESTAMP_LTZ(3)\n              | ")).stripMargin()));
        String sql = "INSERT INTO proctime_sink1  SELECT r.* FROM orders_proctime AS o  JOIN changelog_currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testProcTimeTemporalJoinWithView() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testProcTimeLeftTemporalJoinWithView() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testProcTimeTemporalJoinWithViewNonEqui() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime AS r  ON o.currency = r.currency and o.currency_no = r.currency_no  AND o.amount > r.rate";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testProcTimeLeftTemporalJoinWithViewWithPredicates() {
        String sql = "INSERT INTO proctime_default_sink  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r.proctime  FROM orders_proctime AS o  LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime AS r  ON o.currency = r.currency and o.currency_no = r.currency_no AND o.amount > r.rate";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testProcTimeMultiTemporalJoin() {
        this.createSinkTable("proctime_sink2", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n              |  order_id BIGINT,\n              |  currency STRING,\n              |  amount BIGINT,\n              |  l_time TIMESTAMP_LTZ(3),\n              |  rate BIGINT,\n              |  r_time TIMESTAMP_LTZ(3),\n              |  PRIMARY KEY(order_id) NOT ENFORCED\n              |")).stripMargin()));
        String sql = "INSERT INTO proctime_sink2  SELECT o.order_id, o.currency, o.amount, o.proctime, r.rate, r1.proctime  FROM orders_proctime AS o  JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime as r  ON o.currency = r.currency and o.currency_no = r.currency_no  JOIN currency_proctime FOR SYSTEM_TIME AS OF o.proctime as r1 ON o.currency = r1.currency and o.currency_no = r1.currency_no";
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.tEnv().executeSql(sql).await()).hasMessage("Processing-time temporal join is not supported yet.") instanceof TableException;
    }

    @TestTemplate
    public void testEventTimeTemporalJoin() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinThatJoinkeyContainsPk() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency AND o.currency_no = r.currency_no";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinWithFilter() {
        this.tEnv().executeSql("CREATE VIEW v1 AS SELECT * FROM versioned_currency_with_single_key");
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  JOIN v1 FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency WHERE rate < 115";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeLeftTemporalJoin() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o LEFT JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinChangelogUsingBeforeTime() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o LEFT JOIN currency_using_update_before_time  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,null,null", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeLeftTemporalJoinUpsertSource() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o LEFT JOIN upsert_currency  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency ";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,104,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,null,null", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinWithMultiKeys() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN versioned_currency_with_multi_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency_no = r.currency_no AND o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinWithNonEqualCondition() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN currency_using_update_before_time  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency and o.currency_no = r.currency_no  and o.order_id < 5 and r.rate > 102";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinEqualConditionOnKey() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN currency_using_update_before_time  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency and o.currency_no = r.currency_no  and o.currency = 'Euro' and r.rate > 102";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeMultiTemporalJoin() {
        this.createSinkTable("rowtime_sink1", (Option<String>)new Some((Object)new StringOps(Predef$.MODULE$.augmentString("\n           |  order_id BIGINT,\n           |  currency STRING,\n           |  amount BIGINT,\n           |  l_time TIMESTAMP(3),\n           |  rate BIGINT,\n           |  r_time TIMESTAMP(3),\n           |  r1_rate BIGINT,\n           |  r1_time TIMESTAMP(3),\n           |  PRIMARY KEY(order_id) NOT ENFORCED\n           |")).stripMargin()));
        String sql = "INSERT INTO rowtime_sink1  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time, r1.rate, r1.currency_time FROM orders_rowtime AS o  LEFT JOIN versioned_currency_with_multi_key  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency and o.currency_no = r.currency_no  LEFT JOIN versioned_currency_with_single_key  FOR SYSTEM_TIME AS OF o.order_time as r1  ON o.currency = r1.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null,null,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_sink1")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinWithDeduplicateFirstView() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  LEFT JOIN currency_deduplicated_first_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,102,2020-08-15T00:00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,702,2020-08-15T00:00:04", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeTemporalJoinWithDeduplicateLastView() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  JOIN currency_deduplicated_last_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,702,2020-08-15T00:00:04", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testEventTimeLeftTemporalJoinWithView() {
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o  LEFT JOIN currency_deduplicated_last_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency AND substr(o.currency, 1, 2) = 'US' ";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,null,null", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,null,null", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,null,null", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,null,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testMiniBatchEventTimeViewTemporalJoin() {
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY.key(), "10 s");
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, (Object)BoxesRunTime.boxToLong((long)4L));
        String sql = "INSERT INTO rowtime_default_sink  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time  FROM orders_rowtime AS o JOIN  currency_deduplicated_last_row  FOR SYSTEM_TIME AS OF o.order_time as r  ON o.currency = r.currency";
        this.tEnv().executeSql(sql).await();
        .colon.colon expected = new .colon.colon((Object)"1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01", (List)new .colon.colon((Object)"2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02", (List)new .colon.colon((Object)"3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04", (List)new .colon.colon((Object)"4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01", (List)new .colon.colon((Object)"5,RMB,40,2020-08-16T00:03,702,2020-08-15T00:00:04", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)JavaConversions$.MODULE$.deprecated$u0020asScalaBuffer(TestValuesTableFactory.getResultsAsStrings("rowtime_default_sink")).sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    private void createSinkTable(String tableName, Option<String> columns) {
        String string;
        Option<String> option = columns;
        if (option instanceof Some) {
            String cols;
            Some some = (Some)option;
            string = cols = (String)some.value();
        } else {
            string = new StringOps(Predef$.MODULE$.augmentString("\n           |  order_id BIGINT,\n           |  currency STRING,\n           |  amount BIGINT,\n           |  l_time TIMESTAMP(3),\n           |  rate BIGINT,\n           |  r_time TIMESTAMP(3),\n           |  PRIMARY KEY(order_id) NOT ENFORCED\n           |")).stripMargin();
        }
        String columnsDDL = string;
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(313).append("\n                       |CREATE TABLE ").append(tableName).append(" (\n                       | ").append(columnsDDL).append("\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'false',\n                       |  'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |").toString())).stripMargin());
    }

    private Row changelogRow(String kind, Seq<Object> values) {
        Seq objects = (Seq)values.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Object object;
            Object object2 = x0$1;
            if (object2 instanceof Long) {
                long l = BoxesRunTime.unboxToLong((Object)object2);
                object = BoxesRunTime.boxToLong((long)l);
            } else if (object2 instanceof Integer) {
                int n = BoxesRunTime.unboxToInt((Object)object2);
                object = BoxesRunTime.boxToInteger((int)n);
            } else if (object2 instanceof String) {
                Object object3;
                String string = (String)object2;
                try {
                    object3 = LocalDateTime.parse(string);
                }
                catch (DateTimeParseException dateTimeParseException) {
                    object3 = string;
                }
                object = object3;
            } else if (object2 instanceof Object) {
                Object object4;
                object = object4 = object2;
            } else {
                throw new MatchError(object2);
            }
            return object;
        }, Seq$.MODULE$.canBuildFrom());
        return TestValuesTableFactory.changelogRow(kind, (Object[])objects.toArray(ClassTag$.MODULE$.Object()));
    }

    public TemporalJoinITCase(StreamingWithStateTestBase.StateBackendMode state) {
        super(state);
    }
}

