/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.io.IOException;
import java.io.Serializable;
import java.time.DayOfWeek;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Either;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Collector;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class DataStreamJavaITCase
extends AbstractTestBase {
    private StreamExecutionEnvironment env;
    @Parameterized.Parameter
    public ObjectReuse objectReuse;

    @Parameterized.Parameters(name="objectReuse = {0}")
    public static ObjectReuse[] objectReuse() {
        return ObjectReuse.values();
    }

    @Before
    public void before() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        this.env.setParallelism(4);
        if (this.objectReuse == ObjectReuse.ENABLED) {
            this.env.getConfig().enableObjectReuse();
        } else if (this.objectReuse == ObjectReuse.DISABLED) {
            this.env.getConfig().disableObjectReuse();
        }
    }

    @Test
    public void testFromDataStreamAtomic() {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        DataStreamSource dataStream = this.env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5});
        TableResult result = tableEnv.fromDataStream((DataStream)dataStream).execute();
        DataStreamJavaITCase.testSchema(result, new Column[]{Column.physical((String)"f0", (DataType)((DataType)DataTypes.INT().notNull()))});
        DataStreamJavaITCase.testResult(result, Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3}), Row.of((Object[])new Object[]{4}), Row.of((Object[])new Object[]{5}));
    }

    @Test
    public void testToDataStreamAtomic() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        Table table = tableEnv.fromValues(new Object[]{1, 2, 3, 4, 5});
        DataStreamJavaITCase.testResult(tableEnv.toDataStream(table, Integer.class), 1, 2, 3, 4, 5);
    }

    @Test
    public void testFromDataStreamWithRow() {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        TypeInformation typeInfo = Types.ROW_NAMED((String[])new String[]{"b", "c", "a"}, (TypeInformation[])new TypeInformation[]{Types.INT, Types.ROW((TypeInformation[])new TypeInformation[]{Types.BOOLEAN, Types.STRING}), Types.MAP((TypeInformation)Types.STRING, (TypeInformation)Types.DOUBLE)});
        Row[] rows = new Row[]{Row.of((Object[])new Object[]{12, Row.of((Object[])new Object[]{false, "hello"}), Collections.singletonMap("world", 2.0)}), Row.of((Object[])new Object[]{null, Row.of((Object[])new Object[]{false, null}), Collections.singletonMap("world", null)})};
        DataStreamSource dataStream = this.env.fromCollection(Arrays.asList(rows), typeInfo);
        TableResult result = tableEnv.fromDataStream((DataStream)dataStream).execute();
        DataStreamJavaITCase.testSchema(result, new Column[]{Column.physical((String)"b", (DataType)DataTypes.INT()), Column.physical((String)"c", (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.BOOLEAN()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.STRING())})), Column.physical((String)"a", (DataType)DataTypes.MAP((DataType)DataTypes.STRING(), (DataType)DataTypes.DOUBLE()))});
        DataStreamJavaITCase.testResult(result, rows);
    }

    @Test
    public void testToDataStreamWithRow() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        Row[] rows = new Row[]{Row.of((Object[])new Object[]{12, Row.of((Object[])new Object[]{false, "hello"}), Collections.singletonMap("world", 2.0)}), Row.of((Object[])new Object[]{null, Row.of((Object[])new Object[]{false, null}), Collections.singletonMap("world", 1.0)})};
        Table table = tableEnv.fromValues((Object[])rows);
        DataStreamJavaITCase.testResult(tableEnv.toDataStream(table), rows);
    }

    @Test
    public void testFromAndToDataStreamWithPojo() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        Object[] pojos = new ComplexPojo[]{ComplexPojo.of(42, "hello", new ImmutablePojo(42.0, null)), ComplexPojo.of(42, null, null)};
        DataStreamSource dataStream = this.env.fromElements(pojos);
        Table table = tableEnv.fromDataStream((DataStream)dataStream, Schema.newBuilder().column("c", (AbstractDataType)DataTypes.INT()).column("a", (AbstractDataType)DataTypes.STRING()).column("p", (AbstractDataType)DataTypes.of(ImmutablePojo.class)).build());
        DataStreamJavaITCase.testSchema(table, new Column[]{Column.physical((String)"c", (DataType)DataTypes.INT()), Column.physical((String)"a", (DataType)DataTypes.STRING()), Column.physical((String)"p", (DataType)DataTypes.STRUCTURED(ImmutablePojo.class, (DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"d", (DataType)DataTypes.DOUBLE()), DataTypes.FIELD((String)"b", (DataType)DataTypes.BOOLEAN())}))});
        tableEnv.createTemporaryView("t", table);
        TableResult result = tableEnv.executeSql("SELECT p, p.d, p.b FROM t");
        DataStreamJavaITCase.testResult(result, Row.of((Object[])new Object[]{new ImmutablePojo(42.0, null), 42.0, null}), Row.of((Object[])new Object[]{null, null, null}));
        DataStreamJavaITCase.testResult(tableEnv.toDataStream(table, ComplexPojo.class), pojos);
    }

    @Test
    public void testFromAndToDataStreamWithRaw() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        List<Tuple2> rawRecords = Arrays.asList(Tuple2.of((Object)DayOfWeek.MONDAY, (Object)ZoneOffset.UTC), Tuple2.of((Object)DayOfWeek.FRIDAY, (Object)ZoneOffset.ofHours(5)));
        DataStreamSource dataStream = this.env.fromCollection(rawRecords);
        MatcherAssert.assertThat((Object)dataStream.getType(), (Matcher)Matchers.instanceOf(TupleTypeInfo.class));
        TupleTypeInfo tupleInfo = (TupleTypeInfo)dataStream.getType();
        MatcherAssert.assertThat((Object)tupleInfo.getFieldTypes()[0], (Matcher)Matchers.instanceOf(EnumTypeInfo.class));
        MatcherAssert.assertThat((Object)tupleInfo.getFieldTypes()[1], (Matcher)Matchers.instanceOf(GenericTypeInfo.class));
        Table table = tableEnv.fromDataStream((DataStream)dataStream);
        List columnDataTypes = table.getResolvedSchema().getColumnDataTypes();
        MatcherAssert.assertThat((Object)((DataType)columnDataTypes.get(0)).getLogicalType(), (Matcher)Matchers.instanceOf(RawType.class));
        MatcherAssert.assertThat((Object)((DataType)columnDataTypes.get(1)).getLogicalType(), (Matcher)Matchers.instanceOf(RawType.class));
        DataStreamJavaITCase.testResult(table.execute(), Row.of((Object[])new Object[]{DayOfWeek.MONDAY, ZoneOffset.UTC}), Row.of((Object[])new Object[]{DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)}));
        DataStreamJavaITCase.testResult(tableEnv.toDataStream(table, (AbstractDataType)DataTypes.of((TypeInformation)dataStream.getType())), rawRecords.toArray(new Tuple2[0]));
    }

    @Test
    public void testFromAndToDataStreamEventTime() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        DataStream<Tuple3<Long, Integer, String>> dataStream = this.getWatermarkedDataStream();
        Table table = tableEnv.fromDataStream(dataStream, Schema.newBuilder().columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)").watermark("rowtime", "SOURCE_WATERMARK()").build());
        DataStreamJavaITCase.testSchema(table, new ResolvedSchema(Arrays.asList(Column.physical((String)"f0", (DataType)((DataType)DataTypes.BIGINT().notNull())), Column.physical((String)"f1", (DataType)((DataType)DataTypes.INT().notNull())), Column.physical((String)"f2", (DataType)DataTypes.STRING()), Column.metadata((String)"rowtime", (DataType)DataTypes.TIMESTAMP_LTZ((int)3), null, (boolean)false)), Collections.singletonList(WatermarkSpec.of((String)"rowtime", (ResolvedExpression)ResolvedExpressionMock.of((DataType)DataTypes.TIMESTAMP_LTZ((int)3), (String)"`SOURCE_WATERMARK`()"))), null));
        tableEnv.createTemporaryView("t", table);
        TableResult result = tableEnv.executeSql("SELECT f2, SUM(f1) FROM t GROUP BY f2, TUMBLE(rowtime, INTERVAL '0.005' SECOND)");
        DataStreamJavaITCase.testResult(result, Row.of((Object[])new Object[]{"a", 47}), Row.of((Object[])new Object[]{"c", 1000}), Row.of((Object[])new Object[]{"c", 1000}));
        DataStreamJavaITCase.testResult(tableEnv.toDataStream(table).keyBy((KeySelector & Serializable)k -> k.getField("f2")).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)5L))).apply((WindowFunction & Serializable)(key, window, input, out) -> {
            int sum = 0;
            for (Row row : input) {
                sum += ((Integer)row.getFieldAs("f1")).intValue();
            }
            out.collect((Object)Row.of((Object[])new Object[]{key, sum}));
        }).returns(Types.ROW((TypeInformation[])new TypeInformation[]{Types.STRING, Types.INT})), new Row[]{Row.of((Object[])new Object[]{"a", 47}), Row.of((Object[])new Object[]{"c", 1000}), Row.of((Object[])new Object[]{"c", 1000})});
    }

    @Test
    public void testFromAndToChangelogStreamEventTime() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        DataStream<Tuple3<Long, Integer, String>> dataStream = this.getWatermarkedDataStream();
        SingleOutputStreamOperator changelogStream = dataStream.map((MapFunction & Serializable)t -> Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{t.f1, t.f2})).returns(Types.ROW((TypeInformation[])new TypeInformation[]{Types.INT, Types.STRING}));
        Table table = tableEnv.fromChangelogStream((DataStream)changelogStream, Schema.newBuilder().columnByMetadata("rowtime", (AbstractDataType)DataTypes.TIMESTAMP_LTZ((int)3)).columnByExpression("computed", (Expression)Expressions.$((String)"f1").upperCase()).watermark("rowtime", (Expression)Expressions.sourceWatermark()).build());
        tableEnv.createTemporaryView("t", table);
        Table reordered = tableEnv.sqlQuery("SELECT computed, rowtime, f0 FROM t");
        DataStream result = tableEnv.toChangelogStream(reordered, Schema.newBuilder().column("f1", (AbstractDataType)DataTypes.STRING()).columnByMetadata("rowtime", (AbstractDataType)DataTypes.TIMESTAMP_LTZ((int)3)).columnByExpression("ignored", (Expression)Expressions.$((String)"f1").upperCase()).column("f0", (AbstractDataType)DataTypes.INT()).build());
        DataStreamJavaITCase.testResult(result.keyBy((KeySelector & Serializable)k -> k.getField("f1")).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)5L))).apply((WindowFunction & Serializable)(key, window, input, out) -> {
            int sum = 0;
            for (Row row : input) {
                sum += ((Integer)row.getFieldAs("f0")).intValue();
            }
            out.collect((Object)Row.of((Object[])new Object[]{key, sum}));
        }).returns(Types.ROW((TypeInformation[])new TypeInformation[]{Types.STRING, Types.INT})), new Row[]{Row.of((Object[])new Object[]{"A", 47}), Row.of((Object[])new Object[]{"C", 1000}), Row.of((Object[])new Object[]{"C", 1000})});
    }

    @Test
    public void testFromAndToChangelogStreamRetract() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        List<Either<Row, Row>> inputOrOutput = Arrays.asList(DataStreamJavaITCase.input(RowKind.INSERT, "bob", 0), DataStreamJavaITCase.output(RowKind.INSERT, "bob", 0), DataStreamJavaITCase.input(RowKind.UPDATE_BEFORE, "bob", 0), DataStreamJavaITCase.output(RowKind.DELETE, "bob", 0), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "bob", 1), DataStreamJavaITCase.output(RowKind.INSERT, "bob", 1), DataStreamJavaITCase.input(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.output(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.input(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "alice", 1), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 2), DataStreamJavaITCase.input(RowKind.UPDATE_BEFORE, "alice", 1), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "alice", 2), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 1), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "alice", 2), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "alice", 1), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 3), DataStreamJavaITCase.input(RowKind.UPDATE_BEFORE, "alice", 2), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "alice", 3), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 1), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "alice", 100), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "alice", 1), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 101));
        DataStreamSource changelogStream = this.env.fromElements((Object[])DataStreamJavaITCase.getInput(inputOrOutput));
        tableEnv.createTemporaryView("t", tableEnv.fromChangelogStream((DataStream)changelogStream));
        Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0");
        DataStreamJavaITCase.testResult(result.execute(), DataStreamJavaITCase.getOutput(inputOrOutput));
        DataStreamJavaITCase.testResult(tableEnv.toChangelogStream(result), DataStreamJavaITCase.getOutput(inputOrOutput));
    }

    @Test
    public void testFromChangelogStreamUpsert() {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        List<Either<Row, Row>> inputOrOutput = Arrays.asList(DataStreamJavaITCase.input(RowKind.INSERT, "bob", 0), DataStreamJavaITCase.output(RowKind.INSERT, "bob", 0), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "bob", 1), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "bob", 0), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "bob", 1), DataStreamJavaITCase.input(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.output(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.input(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "alice", 2), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "alice", 1), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 2), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "alice", 100), DataStreamJavaITCase.output(RowKind.UPDATE_BEFORE, "alice", 2), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 100));
        DataStreamSource changelogStream = this.env.fromElements((Object[])DataStreamJavaITCase.getInput(inputOrOutput));
        tableEnv.createTemporaryView("t", tableEnv.fromChangelogStream((DataStream)changelogStream, Schema.newBuilder().primaryKey(new String[]{"f0"}).build(), ChangelogMode.upsert()));
        Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0");
        DataStreamJavaITCase.testResult(result.execute(), DataStreamJavaITCase.getOutput(inputOrOutput));
    }

    @Test
    public void testFromAndToChangelogStreamUpsert() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        List<Either<Row, Row>> inputOrOutput = Arrays.asList(DataStreamJavaITCase.input(RowKind.INSERT, "bob", 0), DataStreamJavaITCase.output(RowKind.INSERT, "bob", 0), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "bob", 1), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "bob", 1), DataStreamJavaITCase.input(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.output(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.input(RowKind.INSERT, "alice", 1), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "alice", 2), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 2), DataStreamJavaITCase.input(RowKind.UPDATE_AFTER, "alice", 100), DataStreamJavaITCase.output(RowKind.UPDATE_AFTER, "alice", 100));
        DataStreamSource changelogStream = this.env.fromElements((Object[])DataStreamJavaITCase.getInput(inputOrOutput));
        tableEnv.createTemporaryView("t", tableEnv.fromChangelogStream((DataStream)changelogStream, Schema.newBuilder().primaryKey(new String[]{"f0"}).build(), ChangelogMode.upsert()));
        Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0");
        DataStreamJavaITCase.testResult(tableEnv.toChangelogStream(result, Schema.newBuilder().primaryKey(new String[]{"f0"}).build(), ChangelogMode.upsert()), DataStreamJavaITCase.getOutput(inputOrOutput));
    }

    @Test
    public void testToDataStreamCustomEventTime() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        TableConfig tableConfig = tableEnv.getConfig();
        ZoneId originalZone = tableConfig.getLocalTimeZone();
        tableConfig.setLocalTimeZone(ZoneId.of("Europe/Berlin"));
        LocalDateTime localDateTime1 = LocalDateTime.parse("1970-01-01T00:00:00.000");
        LocalDateTime localDateTime2 = LocalDateTime.parse("1970-01-01T01:00:00.000");
        DataStreamSource dataStream = this.env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)localDateTime1, (Object)"alice"), new Tuple2((Object)localDateTime2, (Object)"bob")});
        Table table = tableEnv.fromDataStream((DataStream)dataStream, Schema.newBuilder().column("f0", "TIMESTAMP(3)").column("f1", "STRING").watermark("f0", "SOURCE_WATERMARK()").build());
        DataStreamJavaITCase.testSchema(table, new ResolvedSchema(Arrays.asList(Column.physical((String)"f0", (DataType)DataTypes.TIMESTAMP((int)3)), Column.physical((String)"f1", (DataType)DataTypes.STRING())), Collections.singletonList(WatermarkSpec.of((String)"f0", (ResolvedExpression)ResolvedExpressionMock.of((DataType)DataTypes.TIMESTAMP((int)3), (String)"`SOURCE_WATERMARK`()"))), null));
        SingleOutputStreamOperator rowtimeStream = tableEnv.toDataStream(table).process((ProcessFunction)new ProcessFunction<Row, Long>(){

            public void processElement(Row value, ProcessFunction.Context ctx, Collector<Long> out) {
                out.collect((Object)ctx.timestamp());
            }
        });
        DataStreamJavaITCase.testResult(rowtimeStream, new Long[]{localDateTime1.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli(), localDateTime2.atOffset(ZoneOffset.UTC).toInstant().toEpochMilli()});
        tableConfig.setLocalTimeZone(originalZone);
    }

    @Test
    public void testComplexUnifiedPipelineBatch() {
        this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        Table resultTable = this.getComplexUnifiedPipeline(this.env);
        DataStreamJavaITCase.testResult(resultTable.execute(), Row.of((Object[])new Object[]{"Bob", 1L}), Row.of((Object[])new Object[]{"Alice", 1L}));
    }

    @Test
    public void testTableStreamConversionBatch() throws Exception {
        this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStreamSource streamSource = this.env.fromElements((Object[])new Row[]{Row.of((Object[])new Object[]{"Alice"}), Row.of((Object[])new Object[]{"alice"}), Row.of((Object[])new Object[]{"lily"}), Row.of((Object[])new Object[]{"Bob"}), Row.of((Object[])new Object[]{"lily"}), Row.of((Object[])new Object[]{"lily"})});
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        Table sourceTable = tableEnvironment.fromDataStream((DataStream)streamSource).as("word", new String[0]);
        tableEnvironment.createTemporaryView("tmp_table", sourceTable);
        Table resultTable = tableEnvironment.sqlQuery("select UPPER(word) as word from tmp_table");
        SingleOutputStreamOperator resultStream = tableEnvironment.toDataStream(resultTable).map((MapFunction & Serializable)row -> (String)row.getField("word")).returns(TypeInformation.of(String.class)).map((MapFunction & Serializable)s -> new Tuple2(s, (Object)1)).returns(TypeInformation.of((TypeHint)new TypeHint<Tuple2<String, Integer>>(){})).keyBy((KeySelector & Serializable)tuple -> (String)tuple.f0).sum(1);
        DataStreamJavaITCase.testResult(resultStream, new Tuple2[]{new Tuple2((Object)"ALICE", (Object)2), new Tuple2((Object)"BOB", (Object)1), new Tuple2((Object)"LILY", (Object)3)});
    }

    @Test
    public void testComplexUnifiedPipelineStreaming() {
        Table resultTable = this.getComplexUnifiedPipeline(this.env);
        DataStreamJavaITCase.testResult(resultTable.execute(), Row.of((Object[])new Object[]{"Bob", 1L}), Row.of((Object[])new Object[]{"Bob", 2L}), Row.of((Object[])new Object[]{"Bob", 3L}), Row.of((Object[])new Object[]{"Alice", 1L}));
    }

    @Test
    public void testAttachAsDataStream() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        String input1DataId = TestValuesTableFactory.registerData(Arrays.asList(Row.of((Object[])new Object[]{1, "a"}), Row.of((Object[])new Object[]{2, "b"})));
        tableEnv.createTemporaryTable("InputTable1", TableDescriptor.forConnector((String)"values").option("data-id", input1DataId).schema(Schema.newBuilder().column("i", (AbstractDataType)DataTypes.INT()).column("s", (AbstractDataType)DataTypes.STRING()).build()).build());
        tableEnv.createTemporaryTable("OutputTable1", TableDescriptor.forConnector((String)"values").schema(Schema.newBuilder().column("i", (AbstractDataType)DataTypes.INT()).column("s", (AbstractDataType)DataTypes.STRING()).build()).build());
        tableEnv.createTemporaryView("InputTable2", (DataStream)this.env.fromElements((Object[])new Integer[]{1, 2, 3}));
        tableEnv.createTemporaryTable("OutputTable2", TableDescriptor.forConnector((String)"values").schema(Schema.newBuilder().column("i", (AbstractDataType)DataTypes.INT()).build()).build());
        tableEnv.createStatementSet().addInsert("OutputTable1", tableEnv.from("InputTable1")).addInsert("OutputTable2", tableEnv.from("InputTable2")).attachAsDataStream();
        DataStreamJavaITCase.testResult(this.env.fromElements((Object[])new Integer[]{3, 4, 5}), new Integer[]{3, 4, 5});
        MatcherAssert.assertThat(TestValuesTableFactory.getResults("OutputTable1"), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"+I[1, a]", "+I[2, b]"}));
        MatcherAssert.assertThat(TestValuesTableFactory.getResults("OutputTable2"), (Matcher)Matchers.containsInAnyOrder((Object[])new String[]{"+I[1]", "+I[2]", "+I[3]"}));
    }

    @Test
    public void testMultiChangelogStreamUpsert() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        this.createTableFromElements(tableEnv, "T1", ChangelogMode.insertOnly(), Schema.newBuilder().column("pk", "INT NOT NULL").column("x", "STRING NOT NULL").primaryKey(new String[]{"pk"}).build(), Arrays.asList(Types.INT, Types.STRING), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2"}));
        this.createTableFromElements(tableEnv, "T2", ChangelogMode.upsert(), Schema.newBuilder().column("pk", "INT NOT NULL").column("y", "STRING NOT NULL").column("some_value", "DOUBLE NOT NULL").primaryKey(new String[]{"pk"}).build(), Arrays.asList(Types.INT, Types.STRING, Types.DOUBLE), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "A", 1.0}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "B", 2.0}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, "A", 1.1}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "B", 2.1}));
        this.createTableFromElements(tableEnv, "T3", ChangelogMode.insertOnly(), Schema.newBuilder().column("pk1", "STRING NOT NULL").column("pk2", "STRING NOT NULL").column("some_other_value", "DOUBLE NOT NULL").primaryKey(new String[]{"pk1", "pk2"}).build(), Arrays.asList(Types.STRING, Types.STRING, Types.DOUBLE), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"1", "A", 10.0}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"1", "B", 11.0}));
        Table resultTable = tableEnv.sqlQuery("SELECT\nT1.pk,\nT2.some_value * T3.some_other_value,\nT3.pk1,\nT3.pk2\nFROM T1\nLEFT JOIN T2 on T1.pk = T2.pk\nLEFT JOIN T3 ON T1.x = T3.pk1 AND T2.y = T3.pk2");
        DataStream resultStream = tableEnv.toChangelogStream(resultTable, Schema.newBuilder().column("pk", "INT NOT NULL").column("some_calculated_value", "DOUBLE").column("pk1", "STRING").column("pk2", "STRING").primaryKey(new String[]{"pk"}).build(), ChangelogMode.upsert());
        DataStreamJavaITCase.testMaterializedResult((DataStream<Row>)resultStream, 0, Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{1, 11.0, "1", "A"}));
    }

    private Table getComplexUnifiedPipeline(StreamExecutionEnvironment env) {
        DataStreamSource allowedNamesStream = env.fromElements((Object[])new String[]{"Bob", "Alice"});
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        tableEnv.createTemporaryView("AllowedNamesTable", tableEnv.fromDataStream((DataStream)allowedNamesStream).as("allowedName", new String[0]));
        Table nameCountTable = tableEnv.sqlQuery("SELECT name, COUNT(*) AS c FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) WHERE name IN (SELECT allowedName FROM AllowedNamesTable)GROUP BY name");
        DataStream nameCountStream = tableEnv.toChangelogStream(nameCountTable);
        SingleOutputStreamOperator updatesPerNameStream = nameCountStream.keyBy((KeySelector & Serializable)r -> (String)r.getFieldAs("name")).process((KeyedProcessFunction)new KeyedProcessFunction<String, Row, Tuple2<String, Long>>(){
            ValueState<Long> count;

            public void open(Configuration parameters) {
                this.count = this.getRuntimeContext().getState(new ValueStateDescriptor("count", Long.class));
            }

            public void processElement(Row r, KeyedProcessFunction.Context ctx, Collector<Tuple2<String, Long>> out) throws IOException {
                Long currentCount = (Long)this.count.value();
                if (currentCount == null) {
                    currentCount = 0L;
                }
                long updatedCount = currentCount + 1L;
                this.count.update((Object)updatedCount);
                out.collect((Object)Tuple2.of((Object)ctx.getCurrentKey(), (Object)updatedCount));
            }
        });
        tableEnv.createTemporaryView("UpdatesPerName", (DataStream)updatesPerNameStream);
        return tableEnv.sqlQuery("SELECT DISTINCT f0, f1 FROM UpdatesPerName");
    }

    private DataStream<Tuple3<Long, Integer, String>> getWatermarkedDataStream() {
        DataStreamSource dataStream = this.env.fromCollection(Arrays.asList(Tuple3.of((Object)1L, (Object)42, (Object)"a"), Tuple3.of((Object)2L, (Object)5, (Object)"a"), Tuple3.of((Object)3L, (Object)1000, (Object)"c"), Tuple3.of((Object)100L, (Object)1000, (Object)"c")), Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.LONG, Types.INT, Types.STRING}));
        return dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((TimestampAssignerSupplier & Serializable)ctx -> (element, recordTimestamp) -> (Long)element.f0));
    }

    private static Either<Row, Row> input(RowKind kind, Object ... fields) {
        return Either.Left((Object)Row.ofKind((RowKind)kind, (Object[])fields));
    }

    private static Row[] getInput(List<Either<Row, Row>> inputOrOutput) {
        return (Row[])inputOrOutput.stream().filter(Either::isLeft).map(Either::left).toArray(Row[]::new);
    }

    private static Either<Row, Row> output(RowKind kind, Object ... fields) {
        return Either.Right((Object)Row.ofKind((RowKind)kind, (Object[])fields));
    }

    private static Row[] getOutput(List<Either<Row, Row>> inputOrOutput) {
        return (Row[])inputOrOutput.stream().filter(Either::isRight).map(Either::right).toArray(Row[]::new);
    }

    private void createTableFromElements(StreamTableEnvironment tableEnv, String name, ChangelogMode changelogMode, Schema schema, List<TypeInformation<?>> fieldTypeInfo, Row ... elements) {
        String[] fieldNames = (String[])schema.getColumns().stream().map(Schema.UnresolvedColumn::getName).toArray(String[]::new);
        TypeInformation[] fieldTypes = fieldTypeInfo.toArray(new TypeInformation[0]);
        SingleOutputStreamOperator dataStream = this.env.fromElements((Object[])elements).returns(Types.ROW_NAMED((String[])fieldNames, (TypeInformation[])fieldTypes));
        Table table = tableEnv.fromChangelogStream((DataStream)dataStream, schema, changelogMode);
        tableEnv.createTemporaryView(name, table);
    }

    private static void testSchema(Table table, Column ... expectedColumns) {
        Assert.assertEquals((Object)ResolvedSchema.of((Column[])expectedColumns), (Object)table.getResolvedSchema());
    }

    private static void testSchema(Table table, ResolvedSchema expectedSchema) {
        Assert.assertEquals((Object)expectedSchema, (Object)table.getResolvedSchema());
    }

    private static void testSchema(TableResult result, Column ... expectedColumns) {
        Assert.assertEquals((Object)ResolvedSchema.of((Column[])expectedColumns), (Object)result.getResolvedSchema());
    }

    private static void testResult(TableResult result, Row ... expectedRows) {
        List actualRows = CollectionUtil.iteratorToList((Iterator)result.collect());
        MatcherAssert.assertThat((Object)actualRows, (Matcher)Matchers.containsInAnyOrder((Object[])expectedRows));
    }

    @SafeVarargs
    private static <T> void testResult(DataStream<T> dataStream, T ... expectedResult) throws Exception {
        try (CloseableIterator iterator = dataStream.executeAndCollect();){
            List list = CollectionUtil.iteratorToList((Iterator)iterator);
            MatcherAssert.assertThat((Object)list, (Matcher)Matchers.containsInAnyOrder((Object[])expectedResult));
        }
    }

    private static void testMaterializedResult(DataStream<Row> dataStream, int primaryKeyPos, Row ... expectedResult) throws Exception {
        try (CloseableIterator iterator = dataStream.executeAndCollect();){
            ArrayList materializedResult = new ArrayList();
            iterator.forEachRemaining(row -> {
                RowKind kind = row.getKind();
                row.setKind(RowKind.INSERT);
                switch (kind) {
                    case UPDATE_AFTER: {
                        Object primaryKeyValue = row.getField(primaryKeyPos);
                        assert (primaryKeyValue != null);
                        materializedResult.removeIf(r -> primaryKeyValue.equals(r.getField(primaryKeyPos)));
                    }
                    case INSERT: {
                        materializedResult.add(row);
                        break;
                    }
                    case UPDATE_BEFORE: 
                    case DELETE: {
                        materializedResult.remove(row);
                    }
                }
            });
            MatcherAssert.assertThat(materializedResult, (Matcher)Matchers.containsInAnyOrder((Object[])expectedResult));
        }
    }

    public static class ComplexPojo {
        public int c;
        public String a;
        public ImmutablePojo p;

        static ComplexPojo of(int c, String a, ImmutablePojo p) {
            ComplexPojo complexPojo = new ComplexPojo();
            complexPojo.c = c;
            complexPojo.a = a;
            complexPojo.p = p;
            return complexPojo;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ComplexPojo that = (ComplexPojo)o;
            return this.c == that.c && Objects.equals(this.a, that.a) && Objects.equals(this.p, that.p);
        }

        public int hashCode() {
            return Objects.hash(this.c, this.a, this.p);
        }
    }

    public static class ImmutablePojo {
        private final Boolean b;
        private final Double d;

        public ImmutablePojo(Double d, Boolean b) {
            this.d = d;
            this.b = b;
        }

        public Boolean getB() {
            return this.b;
        }

        public Double getD() {
            return this.d;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ImmutablePojo that = (ImmutablePojo)o;
            return Objects.equals(this.b, that.b) && Objects.equals(this.d, that.d);
        }

        public int hashCode() {
            return Objects.hash(this.b, this.d);
        }
    }

    static enum ObjectReuse {
        ENABLED,
        DISABLED;

    }
}

