/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TableFactoryHarness;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith(value={ParameterizedTestExtension.class})
class CommonExecSinkITCase {
    private static final int PARALLELISM = 4;
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    @Parameter
    private boolean useSinkV2;
    private StreamExecutionEnvironment env;

    CommonExecSinkITCase() {
    }

    @Parameters
    private static Collection<Boolean> useSinkV2() {
        return Arrays.asList(true, false);
    }

    @BeforeEach
    void before() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(4);
    }

    @TestTemplate
    void testStreamRecordTimestampInserterSinkRuntimeProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        SharedReference timestamps = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "foo", Instant.parse("2020-11-10T12:34:56.123Z")}), Row.of((Object[])new Object[]{2, "foo", Instant.parse("2020-11-10T11:34:56.789Z")}), Row.of((Object[])new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of((Object[])new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaStreamRecordTimestampInserter(true)).source(new TestSource(rows)).sink(this.buildRuntimeSinkProvider(new TestTimestampWriter(timestamps))).build();
        tableEnv.createTable("T1", sourceDescriptor);
        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
        CommonExecSinkITCase.assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", true);
        tableEnv.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        CommonExecSinkITCase.assertTimestampResults((SharedReference<List<Long>>)timestamps, rows);
    }

    @TestTemplate
    void testStreamRecordTimestampInserterDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        final SharedReference timestamps = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of((Object[])new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of((Object[])new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of((Object[])new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        SinkFunction<RowData> sinkFunction = new SinkFunction<RowData>(){

            public void invoke(RowData value, SinkFunction.Context context) {
                CommonExecSinkITCase.addElement(timestamps, context.timestamp());
            }
        };
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaStreamRecordTimestampInserter(true)).source(new TestSource(rows)).sink(new TableFactoryHarness.SinkBase((SinkFunction)sinkFunction){
            final /* synthetic */ SinkFunction val$sinkFunction;
            {
                this.val$sinkFunction = sinkFunction;
            }

            public DataStreamSinkProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return new DataStreamSinkProvider(){

                    public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                        return dataStream.addSink(val$sinkFunction);
                    }
                };
            }
        }).build();
        tableEnv.createTable("T1", sourceDescriptor);
        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
        CommonExecSinkITCase.assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", true);
        tableEnv.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        Collections.sort((List)timestamps.get());
        CommonExecSinkITCase.assertTimestampResults((SharedReference<List<Long>>)timestamps, rows);
    }

    @TestTemplate
    void testStreamRecordTimestampInserterNotApplied() {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        SharedReference timestamps = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of((Object[])new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of((Object[])new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of((Object[])new Object[]{4, "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaStreamRecordTimestampInserter(false)).source(new TestSource(rows)).sink(this.buildRuntimeSinkProvider(new TestTimestampWriter(timestamps))).build();
        tableEnv.createTable("T1", sourceDescriptor);
        CommonExecSinkITCase.assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
    }

    @TestTemplate
    void testUnifiedSinksAreUsableWithDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        SharedReference fetched = this.sharedObjects.add(new ArrayList());
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(Schema.newBuilder().column("a", (AbstractDataType)DataTypes.INT()).build()).source(new TestSource(rows)).sink(this.buildDataStreamSinkProvider((SharedReference<List<RowData>>)fetched)).build();
        tableEnv.createTable("T1", sourceDescriptor);
        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
        tableEnv.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        List fetchedRows = ((List)fetched.get()).stream().map(r -> r.getInt(0)).sorted().collect(Collectors.toList());
        Assertions.assertThat((int)((Integer)fetchedRows.get(0))).isEqualTo(1);
        Assertions.assertThat((int)((Integer)fetchedRows.get(1))).isEqualTo(2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testCharLengthEnforcer() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"}), Row.of((Object[])new Object[]{2, "Apache", "SQL", 22, 222, "Flink"}), Row.of((Object[])new Object[]{3, "Apache", "Flink", 33, 333, "Apache Flink SQL"}), Row.of((Object[])new Object[]{4, "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL"}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaForCharLengthEnforcer()).source(new TestSource(rows)).build();
        tableEnv.createTable("T1", sourceDescriptor);
        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
        result.await();
        ArrayList results = new ArrayList();
        result.collect().forEachRemaining(results::add);
        Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(rows);
        try {
            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
            result = tableEnv.executeSql("SELECT * FROM T1");
            result.await();
            List<Row> expected = Arrays.asList(Row.of((Object[])new Object[]{1, "Apache F", "SQL Ru", 11, 111, "SQL"}), Row.of((Object[])new Object[]{2, "Apache  ", "SQL   ", 22, 222, "Flink"}), Row.of((Object[])new Object[]{3, "Apache  ", "Flink ", 33, 333, "Apache"}), Row.of((Object[])new Object[]{4, "Flink Pr", "SQL or", 44, 444, "Apache"}));
            ArrayList resultsTrimmed = new ArrayList();
            result.collect().forEachRemaining(resultsTrimmed::add);
            Assertions.assertThat(resultsTrimmed).containsExactlyInAnyOrderElementsOf(expected);
        }
        finally {
            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testBinaryLengthEnforcer() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, new byte[]{1, 2, 3, 4, 5, 6, 7, 8}, 11, 111, new byte[]{1, 2, 3}}), Row.of((Object[])new Object[]{2, new byte[]{1, 2, 3, 4, 5}, new byte[]{1, 2, 3}, 22, 222, new byte[]{1, 2, 3, 4, 5, 6}}), Row.of((Object[])new Object[]{3, new byte[]{1, 2, 3, 4, 5, 6}, new byte[]{1, 2, 3, 4, 5}, 33, 333, new byte[]{1, 2, 3, 4, 5, 6, 7, 8}}), Row.of((Object[])new Object[]{4, new byte[]{1, 2, 3, 4, 5, 6, 7, 8}, new byte[]{1, 2, 3, 4, 5, 6}, 44, 444, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}}));
        TableDescriptor sourceDescriptor = TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaForBinaryLengthEnforcer()).source(new TestSource(rows)).build();
        tableEnv.createTable("T1", sourceDescriptor);
        TableResult result = tableEnv.executeSql("SELECT * FROM T1");
        result.await();
        ArrayList results = new ArrayList();
        result.collect().forEachRemaining(results::add);
        Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(rows);
        try {
            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
            result = tableEnv.executeSql("SELECT * FROM T1");
            result.await();
            List<Row> expected = Arrays.asList(Row.of((Object[])new Object[]{1, new byte[]{1, 2, 3, 4, 5, 6, 7, 8}, new byte[]{1, 2, 3, 4, 5, 6}, 11, 111, new byte[]{1, 2, 3}}), Row.of((Object[])new Object[]{2, new byte[]{1, 2, 3, 4, 5, 0, 0, 0}, new byte[]{1, 2, 3, 0, 0, 0}, 22, 222, new byte[]{1, 2, 3, 4, 5, 6}}), Row.of((Object[])new Object[]{3, new byte[]{1, 2, 3, 4, 5, 6, 0, 0}, new byte[]{1, 2, 3, 4, 5, 0}, 33, 333, new byte[]{1, 2, 3, 4, 5, 6}}), Row.of((Object[])new Object[]{4, new byte[]{1, 2, 3, 4, 5, 6, 7, 8}, new byte[]{1, 2, 3, 4, 5, 6}, 44, 444, new byte[]{1, 2, 3, 4, 5, 6}}));
            ArrayList resultsTrimmed = new ArrayList();
            result.collect().forEachRemaining(resultsTrimmed::add);
            Assertions.assertThat(resultsTrimmed).containsExactlyInAnyOrderElementsOf(expected);
        }
        finally {
            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testNullEnforcer() throws ExecutionException, InterruptedException {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        List<Row> rows = Arrays.asList(Row.of((Object[])new Object[]{1, "Apache", 11}), Row.of((Object[])new Object[]{2, null, 22}), Row.of((Object[])new Object[]{null, "Flink", 33}), Row.of((Object[])new Object[]{null, null, 44}));
        SharedReference results = this.sharedObjects.add(new ArrayList());
        tableEnv.createTable("T1", TableFactoryHarness.newBuilder().schema(CommonExecSinkITCase.schemaForNotNullEnforcer()).source(new TestSource(rows)).sink(this.buildRuntimeSinkProvider(new RecordWriter(results))).build());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> tableEnv.executeSql("INSERT INTO T1 SELECT * FROM T1").await()).isInstanceOf(ExecutionException.class)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)"Column 'b' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.")});
        ((List)results.get()).clear();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> tableEnv.executeSql("INSERT INTO T1(a, b) SELECT (a, b) FROM T1").await()).isInstanceOf(ValidationException.class)).hasMessage("SQL validation failed. At line 0, column 0: Column 'c' has no default value and does not allow NULLs");
        try {
            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(), ExecutionConfigOptions.NotNullEnforcer.DROP.name());
            ((List)results.get()).clear();
            tableEnv.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
            Assertions.assertThat((int)((List)results.get()).size()).isEqualTo(2);
            Assertions.assertThat((int)((RowData)((List)results.get()).get(0)).getInt(0)).isEqualTo(1);
            Assertions.assertThat((String)((RowData)((List)results.get()).get(0)).getString(1).toString()).isEqualTo("Apache");
            Assertions.assertThat((int)((RowData)((List)results.get()).get(0)).getInt(2)).isEqualTo(11);
            Assertions.assertThat((boolean)((RowData)((List)results.get()).get(1)).isNullAt(0)).isTrue();
            Assertions.assertThat((String)((RowData)((List)results.get()).get(1)).getString(1).toString()).isEqualTo("Flink");
            Assertions.assertThat((int)((RowData)((List)results.get()).get(1)).getInt(2)).isEqualTo(33);
        }
        finally {
            tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(), ExecutionConfigOptions.NotNullEnforcer.ERROR.name());
        }
    }

    @TestTemplate
    void testFromValuesWatermarkPropagation() throws Exception {
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env);
        final SharedReference watermarks = this.sharedObjects.add(new ArrayList());
        SinkFunction<RowData> sinkFunction = new SinkFunction<RowData>(){

            public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
                CommonExecSinkITCase.addElement(watermarks, watermark.getTimestamp());
            }
        };
        TableDescriptor sinkDescriptor = TableFactoryHarness.newBuilder().sink(new TableFactoryHarness.SinkBase((SinkFunction)sinkFunction){
            final /* synthetic */ SinkFunction val$sinkFunction;
            {
                this.val$sinkFunction = sinkFunction;
            }

            public DataStreamSinkProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return new DataStreamSinkProvider(){

                    public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                        return dataStream.addSink(val$sinkFunction);
                    }
                };
            }
        }).build();
        Table source = tableEnv.fromValues((AbstractDataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"a", (DataType)DataTypes.INT())}), new Object[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2}), Row.of((Object[])new Object[]{3})});
        source.executeInsert(sinkDescriptor).await();
        Assertions.assertThat((int)((List)watermarks.get()).size()).isEqualTo(this.env.getParallelism());
        for (Long watermark : (List)watermarks.get()) {
            Assertions.assertThat((Long)watermark).isEqualTo(Watermark.MAX_WATERMARK.getTimestamp());
        }
    }

    private static <T> void addElement(SharedReference<List<T>> elements, T element) {
        elements.applySync(l -> l.add(element));
    }

    private static TestSink<RowData> buildRecordWriterTestSink(TestSink.DefaultSinkWriter<RowData> writer) {
        return TestSink.newBuilder().setWriter(writer).setCommittableSerializer((SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE).build();
    }

    private TableFactoryHarness.SinkBase buildRuntimeSinkProvider(final TestSink.DefaultSinkWriter<RowData> writer) {
        return new TableFactoryHarness.SinkBase(){

            @Override
            public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                TestSink sink = CommonExecSinkITCase.buildRecordWriterTestSink((TestSink.DefaultSinkWriter<RowData>)writer);
                if (CommonExecSinkITCase.this.useSinkV2) {
                    return SinkV2Provider.of((org.apache.flink.api.connector.sink2.Sink)SinkV1Adapter.wrap((Sink)sink));
                }
                return SinkProvider.of((Sink)sink);
            }
        };
    }

    @NotNull
    private TableFactoryHarness.SinkBase buildDataStreamSinkProvider(final SharedReference<List<RowData>> fetched) {
        return new TableFactoryHarness.SinkBase(){

            public DataStreamSinkProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return new DataStreamSinkProvider(){

                    public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                        TestSink sink = CommonExecSinkITCase.buildRecordWriterTestSink((TestSink.DefaultSinkWriter<RowData>)new RecordWriter(fetched));
                        if (CommonExecSinkITCase.this.useSinkV2) {
                            return dataStream.sinkTo(SinkV1Adapter.wrap((Sink)sink));
                        }
                        return dataStream.sinkTo((Sink)sink);
                    }
                };
            }
        };
    }

    private static void assertPlan(StreamTableEnvironment tableEnv, String sql, boolean containsStreamRecordTimestampInserter) {
        String explainStr = tableEnv.explainSql(sql, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        String containedStr = "StreamRecordTimestampInserter(rowtime field: 2";
        if (containsStreamRecordTimestampInserter) {
            Assertions.assertThat((String)explainStr).contains(new CharSequence[]{"StreamRecordTimestampInserter(rowtime field: 2"});
        } else {
            Assertions.assertThat((String)explainStr).doesNotContain(new CharSequence[]{"StreamRecordTimestampInserter(rowtime field: 2"});
        }
    }

    private static Schema schemaStreamRecordTimestampInserter(boolean withWatermark) {
        Schema.Builder builder = Schema.newBuilder().column("a", "INT").column("b", "STRING").column("ts", "TIMESTAMP_LTZ(3)");
        if (withWatermark) {
            builder.watermark("ts", "ts");
        }
        return builder.build();
    }

    private static Schema schemaForCharLengthEnforcer() {
        return Schema.newBuilder().column("a", "INT").column("b", "CHAR(8)").column("c", "CHAR(6)").column("d", "INT").column("e", "INT").column("f", "VARCHAR(6)").build();
    }

    private static Schema schemaForBinaryLengthEnforcer() {
        return Schema.newBuilder().column("a", "INT").column("b", "BINARY(8)").column("c", "BINARY(6)").column("d", "INT").column("e", "INT").column("f", "VARBINARY(6)").build();
    }

    private static Schema schemaForNotNullEnforcer() {
        return Schema.newBuilder().column("a", "INT").column("b", "STRING NOT NULL").column("c", "INT NOT NULL").build();
    }

    private static void assertTimestampResults(SharedReference<List<Long>> timestamps, List<Row> rows) {
        Assertions.assertThat((List)((List)timestamps.get())).hasSize(rows.size());
        for (int i = 0; i < rows.size(); ++i) {
            Assertions.assertThat((Instant)Instant.ofEpochMilli((Long)((List)timestamps.get()).get(i))).isEqualTo(rows.get(i).getField(2));
        }
    }

    private static class RecordWriter
    extends TestSink.DefaultSinkWriter<RowData> {
        private final SharedReference<List<RowData>> rows;

        private RecordWriter(SharedReference<List<RowData>> rows) {
            this.rows = rows;
        }

        public void write(RowData element, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.rows, element);
            super.write((Object)element, context);
        }
    }

    private static class TestTimestampWriter
    extends TestSink.DefaultSinkWriter<RowData> {
        private final SharedReference<List<Long>> timestamps;

        private TestTimestampWriter(SharedReference<List<Long>> timestamps) {
            this.timestamps = timestamps;
        }

        public void write(RowData element, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.timestamps, context.timestamp());
            super.write((Object)element, context);
        }
    }

    private static class TestSourceFunction
    implements SourceFunction<RowData> {
        private final List<Row> rows;
        private final DynamicTableSource.DataStructureConverter converter;

        public TestSourceFunction(List<Row> rows, DynamicTableSource.DataStructureConverter converter) {
            this.rows = rows;
            this.converter = converter;
        }

        public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
            this.rows.stream().map(row -> (RowData)this.converter.toInternal(row)).forEach(arg_0 -> ctx.collect(arg_0));
        }

        public void cancel() {
        }
    }

    private static class TestSource
    extends TableFactoryHarness.ScanSourceBase {
        private final List<Row> rows;

        private TestSource(List<Row> rows) {
            super(false);
            this.rows = rows;
        }

        @Override
        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext context) {
            DynamicTableSource.DataStructureConverter converter = context.createDataStructureConverter(this.getFactoryContext().getPhysicalRowDataType());
            return SourceFunctionProvider.of((SourceFunction)new TestSourceFunction(this.rows, converter), (boolean)false);
        }
    }
}

