/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.factories;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.source.DynamicFilteringValuesSource;
import org.apache.flink.connector.source.TerminatingLogic;
import org.apache.flink.connector.source.ValuesSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.FullCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.expressions.AggregateExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions;
import org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.MaxAggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.MinAggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.Sum0AggFunction;
import org.apache.flink.table.planner.functions.aggfunctions.SumAggFunction;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.utils.FilterUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import scala.collection.Seq;

public final class TestValuesTableFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    private static final AtomicInteger idCounter = new AtomicInteger(0);
    private static final Map<String, Collection<Row>> registeredData = new HashMap<String, Collection<Row>>();
    private static final Map<String, Collection<RowData>> registeredRowData = new HashMap<String, Collection<RowData>>();
    public static final AtomicInteger RESOURCE_COUNTER = new AtomicInteger();
    public static final String IDENTIFIER = "values";
    private static final ConfigOption<String> DATA_ID = ConfigOptions.key((String)"data-id").stringType().noDefaultValue();
    private static final ConfigOption<Boolean> BOUNDED = ConfigOptions.key((String)"bounded").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> TERMINATING = ConfigOptions.key((String)"terminating").booleanType().defaultValue((Object)true).withDescription("Declares the behaviour of sources after all data has been produced. It is separate from 'bounded', because even if a source is unbounded it can stop producing records and shutdown.");
    private static final ConfigOption<String> CHANGELOG_MODE = ConfigOptions.key((String)"changelog-mode").stringType().defaultValue((Object)"I");
    private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions.key((String)"runtime-source").stringType().defaultValue((Object)"SourceFunction").withDescription("Accepted values are: SourceFunction, InputFormat, DataStream, NewSource");
    private static final ConfigOption<Boolean> FAILING_SOURCE = ConfigOptions.key((String)"failing-source").booleanType().defaultValue((Object)false);
    private static final ConfigOption<String> RUNTIME_SINK = ConfigOptions.key((String)"runtime-sink").stringType().defaultValue((Object)"SinkFunction");
    private static final ConfigOption<String> TABLE_SOURCE_CLASS = ConfigOptions.key((String)"table-source-class").stringType().defaultValue((Object)"DEFAULT");
    private static final ConfigOption<String> TABLE_SINK_CLASS = ConfigOptions.key((String)"table-sink-class").stringType().defaultValue((Object)"DEFAULT");
    private static final ConfigOption<String> LOOKUP_FUNCTION_CLASS = ConfigOptions.key((String)"lookup-function-class").stringType().noDefaultValue();
    private static final ConfigOption<Integer> LOOKUP_THRESHOLD = ConfigOptions.key((String)"start-lookup-threshold").intType().defaultValue((Object)-1).withDescription("The threshold which backend lookup function will not do real lookup for a key (returns null value immediately) until its lookup times beyond");
    private static final ConfigOption<Boolean> ASYNC_ENABLED = ConfigOptions.key((String)"async").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> DISABLE_LOOKUP = ConfigOptions.key((String)"disable-lookup").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> SINK_INSERT_ONLY = ConfigOptions.key((String)"sink-insert-only").booleanType().defaultValue((Object)true);
    private static final ConfigOption<Integer> SINK_EXPECTED_MESSAGES_NUM = ConfigOptions.key((String)"sink-expected-messages-num").intType().defaultValue((Object)-1);
    private static final ConfigOption<Boolean> ENABLE_PROJECTION_PUSH_DOWN = ConfigOptions.key((String)"enable-projection-push-down").booleanType().defaultValue((Object)true);
    private static final ConfigOption<Boolean> NESTED_PROJECTION_SUPPORTED = ConfigOptions.key((String)"nested-projection-supported").booleanType().defaultValue((Object)false);
    private static final ConfigOption<List<String>> FILTERABLE_FIELDS = ConfigOptions.key((String)"filterable-fields").stringType().asList().noDefaultValue();
    private static final ConfigOption<List<String>> DYNAMIC_FILTERING_FIELDS = ConfigOptions.key((String)"dynamic-filtering-fields").stringType().asList().noDefaultValue();
    private static final ConfigOption<Boolean> ENABLE_WATERMARK_PUSH_DOWN = ConfigOptions.key((String)"enable-watermark-push-down").booleanType().defaultValue((Object)false);
    private static final ConfigOption<Boolean> INTERNAL_DATA = ConfigOptions.key((String)"register-internal-data").booleanType().defaultValue((Object)false).withDescription("The registered data is internal type data, which can be collected by the source directly.");
    private static final ConfigOption<Map<String, String>> READABLE_METADATA = ConfigOptions.key((String)"readable-metadata").mapType().defaultValue(Collections.emptyMap()).withDescription("Optional map of 'metadata_key:data_type,...'. The order will be alphabetically. The metadata is part of the data when enabled.");
    private static final ConfigOption<Map<String, String>> WRITABLE_METADATA = ConfigOptions.key((String)"writable-metadata").mapType().defaultValue(Collections.emptyMap()).withDescription("Optional map of 'metadata_key:data_type'. The order will be alphabetically. The metadata is part of the data when enabled.");
    private static final ConfigOption<Boolean> SINK_DROP_LATE_EVENT = ConfigOptions.key((String)"sink.drop-late-event").booleanType().defaultValue((Object)false).withDescription("Option to determine whether to discard the late event.");
    private static final ConfigOption<Integer> SOURCE_NUM_ELEMENT_TO_SKIP = ConfigOptions.key((String)"source.num-element-to-skip").intType().defaultValue((Object)-1).withDescription("Option to define the number of elements to skip.");
    private static final ConfigOption<Integer> SOURCE_SLEEP_AFTER_ELEMENTS = ConfigOptions.key((String)"source.sleep-after-elements").intType().defaultValue((Object)-1).withDescription("Option to specify the number of elements to process before sleeping for a specific amount of time. The default value is -1, which means that this process is skipped.");
    private static final ConfigOption<Duration> SOURCE_SLEEP_TIME = ConfigOptions.key((String)"source.sleep-time").durationType().defaultValue((Object)Duration.ofMillis(0L)).withDescription("Option to specify the amount of time to sleep after processing every N elements. The default value is 0, which means that no sleep is performed");
    private static final ConfigOption<List<String>> PARTITION_LIST = ConfigOptions.key((String)"partition-list").stringType().asList().defaultValues((Object[])new String[0]);
    private static final ConfigOption<String> SINK_CHANGELOG_MODE_ENFORCED = ConfigOptions.key((String)"sink-changelog-mode-enforced").stringType().noDefaultValue();
    private static final ConfigOption<Integer> SOURCE_PARALLELISM = FactoryUtil.SOURCE_PARALLELISM;
    private static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

    public static String registerData(Collection<Row> data) {
        String id = String.valueOf(idCounter.incrementAndGet());
        registeredData.put(id, data);
        return id;
    }

    public static String registerData(Seq<Row> data) {
        return TestValuesTableFactory.registerData(JavaScalaConversionUtil.toJava(data));
    }

    public static String registerRowData(Collection<RowData> data) {
        String id = String.valueOf(idCounter.incrementAndGet());
        registeredRowData.put(id, data);
        return id;
    }

    public static String registerRowData(Seq<RowData> data) {
        return TestValuesTableFactory.registerRowData(JavaScalaConversionUtil.toJava(data));
    }

    public static List<String> getRawResultsAsStrings(String tableName) {
        return TestValuesRuntimeFunctions.getRawResultsAsStrings(tableName);
    }

    public static List<Row> getRawResults(String tableName) {
        return TestValuesRuntimeFunctions.getRawResults(tableName);
    }

    public static List<String> getOnlyRawResultsAsStrings() {
        return TestValuesRuntimeFunctions.getOnlyRawResultsAsStrings();
    }

    public static List<String> getResultsAsStrings(String tableName) {
        return TestValuesRuntimeFunctions.getResultsAsStrings(tableName);
    }

    public static List<Row> getResults(String tableName) {
        return TestValuesRuntimeFunctions.getResults(tableName);
    }

    public static void registerLocalRawResultsObserver(String tableName, BiConsumer<Integer, List<Row>> observer) {
        TestValuesRuntimeFunctions.registerLocalRawResultsObserver(tableName, observer);
    }

    public static List<Watermark> getWatermarkOutput(String tableName) {
        return TestValuesRuntimeFunctions.getWatermarks(tableName);
    }

    public static void clearAllData() {
        registeredData.clear();
        registeredRowData.clear();
        TestValuesRuntimeFunctions.clearResults();
    }

    public static Row changelogRow(String rowKind, Object ... values) {
        RowKind kind = TestValuesTableFactory.parseRowKind(rowKind);
        return Row.ofKind((RowKind)kind, (Object[])values);
    }

    private static RowKind parseRowKind(String rowKindShortString) {
        switch (rowKindShortString) {
            case "+I": {
                return RowKind.INSERT;
            }
            case "-U": {
                return RowKind.UPDATE_BEFORE;
            }
            case "+U": {
                return RowKind.UPDATE_AFTER;
            }
            case "-D": {
                return RowKind.DELETE;
            }
        }
        throw new IllegalArgumentException("Unsupported RowKind string: " + rowKindShortString);
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        Boundedness boundedness;
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        ChangelogMode changelogMode = this.parseChangelogMode((String)helper.getOptions().get(CHANGELOG_MODE));
        String runtimeSource = (String)helper.getOptions().get(RUNTIME_SOURCE);
        boolean isBounded = (Boolean)helper.getOptions().get(BOUNDED);
        boolean isFinite = (Boolean)helper.getOptions().get(TERMINATING);
        String dataId = (String)helper.getOptions().get(DATA_ID);
        String sourceClass = (String)helper.getOptions().get(TABLE_SOURCE_CLASS);
        boolean isAsync = (Boolean)helper.getOptions().get(ASYNC_ENABLED);
        String lookupFunctionClass = (String)helper.getOptions().get(LOOKUP_FUNCTION_CLASS);
        boolean disableLookup = (Boolean)helper.getOptions().get(DISABLE_LOOKUP);
        boolean enableProjectionPushDown = (Boolean)helper.getOptions().get(ENABLE_PROJECTION_PUSH_DOWN);
        boolean nestedProjectionSupported = (Boolean)helper.getOptions().get(NESTED_PROJECTION_SUPPORTED);
        boolean enableWatermarkPushDown = (Boolean)helper.getOptions().get(ENABLE_WATERMARK_PUSH_DOWN);
        boolean failingSource = (Boolean)helper.getOptions().get(FAILING_SOURCE);
        int numElementToSkip = (Integer)helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP);
        boolean internalData = (Boolean)helper.getOptions().get(INTERNAL_DATA);
        int lookupThreshold = (Integer)helper.getOptions().get(LOOKUP_THRESHOLD);
        int sleepAfterElements = (Integer)helper.getOptions().get(SOURCE_SLEEP_AFTER_ELEMENTS);
        long sleepTimeMillis = ((Duration)helper.getOptions().get(SOURCE_SLEEP_TIME)).toMillis();
        Integer parallelism = (Integer)helper.getOptions().get(SOURCE_PARALLELISM);
        DefaultLookupCache cache = null;
        if (((LookupOptions.LookupCacheType)helper.getOptions().get(LookupOptions.CACHE_TYPE)).equals((Object)LookupOptions.LookupCacheType.PARTIAL)) {
            cache = DefaultLookupCache.fromConfig((ReadableConfig)helper.getOptions());
        }
        PeriodicCacheReloadTrigger reloadTrigger = null;
        if (((LookupOptions.LookupCacheType)helper.getOptions().get(LookupOptions.CACHE_TYPE)).equals((Object)LookupOptions.LookupCacheType.FULL)) {
            reloadTrigger = PeriodicCacheReloadTrigger.fromConfig((ReadableConfig)helper.getOptions());
        }
        Optional filterableFields = helper.getOptions().getOptional(FILTERABLE_FIELDS);
        HashSet filterableFieldsSet = new HashSet();
        filterableFields.ifPresent(filterableFieldsSet::addAll);
        Optional dynamicFilteringFields = helper.getOptions().getOptional(DYNAMIC_FILTERING_FIELDS);
        HashSet dynamicFilteringFieldsSet = new HashSet();
        dynamicFilteringFields.ifPresent(dynamicFilteringFieldsSet::addAll);
        Map<String, DataType> readableMetadata = TestValuesTableFactory.convertToMetadataMap((Map)helper.getOptions().get(READABLE_METADATA), context.getClassLoader());
        if (!isFinite && isBounded) {
            throw new IllegalArgumentException("Source can not be bounded and infinite at the same time.");
        }
        TerminatingLogic terminating = isFinite ? TerminatingLogic.FINITE : TerminatingLogic.INFINITE;
        Boundedness boundedness2 = boundedness = isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
        if (sourceClass.equals("DEFAULT")) {
            Map<Map<String, String>, Collection<Row>> partition2Rows;
            if (internalData) {
                return new TestValuesScanTableSourceWithInternalData(dataId, isBounded, sleepAfterElements, sleepTimeMillis);
            }
            Collection data = registeredData.getOrDefault(dataId, Collections.emptyList());
            List<Map<String, String>> partitions = TestValuesTableFactory.parsePartitionList((List)helper.getOptions().get(PARTITION_LIST));
            DataType producedDataType = context.getPhysicalRowDataType();
            if (!partitions.isEmpty()) {
                partition2Rows = TestValuesTableFactory.mapPartitionToRow(producedDataType, data, partitions);
            } else {
                partitions = Collections.emptyList();
                partition2Rows = new HashMap<Map<String, String>, Collection<Row>>();
                partition2Rows.put(Collections.emptyMap(), data);
            }
            if (!enableProjectionPushDown) {
                return new TestValuesScanTableSourceWithoutProjectionPushDown(producedDataType, changelogMode, boundedness, terminating, runtimeSource, failingSource, partition2Rows, nestedProjectionSupported, null, Collections.emptyList(), filterableFieldsSet, dynamicFilteringFieldsSet, numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, null, parallelism);
            }
            if (disableLookup) {
                if (enableWatermarkPushDown) {
                    return new TestValuesScanTableSourceWithWatermarkPushDown(producedDataType, changelogMode, terminating, runtimeSource, failingSource, partition2Rows, context.getObjectIdentifier().getObjectName(), nestedProjectionSupported, (int[][])null, Collections.emptyList(), filterableFieldsSet, dynamicFilteringFieldsSet, numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, null);
                }
                return new TestValuesScanTableSource(producedDataType, changelogMode, boundedness, terminating, runtimeSource, failingSource, partition2Rows, nestedProjectionSupported, null, Collections.emptyList(), filterableFieldsSet, dynamicFilteringFieldsSet, numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, null);
            }
            return new TestValuesScanLookupTableSource(context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(), producedDataType, changelogMode, boundedness, terminating, runtimeSource, failingSource, partition2Rows, isAsync, lookupFunctionClass, nestedProjectionSupported, null, Collections.emptyList(), filterableFieldsSet, dynamicFilteringFieldsSet, numElementToSkip, Long.MAX_VALUE, partitions, readableMetadata, null, (LookupCache)cache, (CacheReloadTrigger)reloadTrigger, lookupThreshold);
        }
        try {
            return (DynamicTableSource)InstantiationUtil.instantiate((String)sourceClass, DynamicTableSource.class, (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        catch (FlinkException e) {
            throw new TableException("Can't instantiate class " + sourceClass, (Throwable)e);
        }
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        String sinkClass = (String)helper.getOptions().get(TABLE_SINK_CLASS);
        boolean isInsertOnly = (Boolean)helper.getOptions().get(SINK_INSERT_ONLY);
        String runtimeSink = (String)helper.getOptions().get(RUNTIME_SINK);
        int expectedNum = (Integer)helper.getOptions().get(SINK_EXPECTED_MESSAGES_NUM);
        Integer parallelism = (Integer)helper.getOptions().get(SINK_PARALLELISM);
        boolean dropLateEvent = (Boolean)helper.getOptions().get(SINK_DROP_LATE_EVENT);
        Map<String, DataType> writableMetadata = TestValuesTableFactory.convertToMetadataMap((Map)helper.getOptions().get(WRITABLE_METADATA), context.getClassLoader());
        ChangelogMode changelogMode = Optional.ofNullable(helper.getOptions().get(SINK_CHANGELOG_MODE_ENFORCED)).map(this::parseChangelogMode).orElse(null);
        DataType consumedType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
        int[] primaryKeyIndices = TableSchemaUtils.getPrimaryKeyIndices((TableSchema)context.getCatalogTable().getSchema());
        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)context.getCatalogTable().getSchema());
        if (sinkClass.equals("DEFAULT")) {
            int rowTimeIndex = TestValuesTableFactory.validateAndExtractRowtimeIndex((CatalogTable)context.getCatalogTable(), dropLateEvent, isInsertOnly);
            return new TestValuesTableSink(consumedType, primaryKeyIndices, context.getObjectIdentifier().getObjectName(), isInsertOnly, runtimeSink, expectedNum, writableMetadata, parallelism, changelogMode, rowTimeIndex, tableSchema);
        }
        try {
            return (DynamicTableSink)InstantiationUtil.instantiate((String)sinkClass, DynamicTableSink.class, (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        catch (FlinkException e) {
            throw new TableException("Can't instantiate class " + sinkClass, (Throwable)e);
        }
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet(Arrays.asList(DATA_ID, CHANGELOG_MODE, BOUNDED, TERMINATING, RUNTIME_SOURCE, TABLE_SOURCE_CLASS, FAILING_SOURCE, LOOKUP_FUNCTION_CLASS, LOOKUP_THRESHOLD, ASYNC_ENABLED, DISABLE_LOOKUP, TABLE_SOURCE_CLASS, TABLE_SINK_CLASS, SINK_INSERT_ONLY, RUNTIME_SINK, SINK_EXPECTED_MESSAGES_NUM, ENABLE_PROJECTION_PUSH_DOWN, NESTED_PROJECTION_SUPPORTED, FILTERABLE_FIELDS, DYNAMIC_FILTERING_FIELDS, PARTITION_LIST, READABLE_METADATA, SINK_PARALLELISM, SINK_CHANGELOG_MODE_ENFORCED, WRITABLE_METADATA, ENABLE_WATERMARK_PUSH_DOWN, SINK_DROP_LATE_EVENT, SOURCE_NUM_ELEMENT_TO_SKIP, SOURCE_SLEEP_AFTER_ELEMENTS, SOURCE_SLEEP_TIME, SOURCE_PARALLELISM, INTERNAL_DATA, LookupOptions.CACHE_TYPE, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, LookupOptions.PARTIAL_CACHE_MAX_ROWS, LookupOptions.FULL_CACHE_RELOAD_STRATEGY, LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL, LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE, LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME, LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS));
    }

    private static int validateAndExtractRowtimeIndex(CatalogTable sinkTable, boolean dropLateEvent, boolean isInsertOnly) {
        if (!dropLateEvent) {
            return -1;
        }
        if (!isInsertOnly) {
            throw new ValidationException("Option 'sink.drop-late-event' only works for insert-only sink now.");
        }
        TableSchema schema = sinkTable.getSchema();
        List watermarkSpecs = schema.getWatermarkSpecs();
        if (watermarkSpecs.size() == 0) {
            throw new ValidationException("Please define the watermark in the schema that is used to indicate the rowtime column. The sink function will compare the rowtime and the current watermark to determine whether the event is late.");
        }
        String rowtimeName = ((WatermarkSpec)watermarkSpecs.get(0)).getRowtimeAttribute();
        return Arrays.asList(schema.getFieldNames()).indexOf(rowtimeName);
    }

    private static List<Map<String, String>> parsePartitionList(List<String> stringPartitions) {
        return stringPartitions.stream().map(partition -> {
            HashMap spec = new HashMap();
            Arrays.stream(partition.split(",")).forEach(pair -> {
                String[] split = pair.split(":");
                spec.put(split[0].trim(), split[1].trim());
            });
            return spec;
        }).collect(Collectors.toList());
    }

    private static Map<Map<String, String>, Collection<Row>> mapPartitionToRow(DataType producedDataType, Collection<Row> rows, List<Map<String, String>> partitions) {
        HashMap<Map<String, String>, Collection<Row>> map = new HashMap<Map<String, String>, Collection<Row>>();
        for (Map<String, String> partition : partitions) {
            map.put(partition, new ArrayList());
        }
        List fieldNames = DataTypeUtils.flattenToNames((DataType)producedDataType);
        block1: for (Row row : rows) {
            for (Map<String, String> partition : partitions) {
                boolean match = true;
                for (Map.Entry<String, String> entry : partition.entrySet()) {
                    int index = fieldNames.indexOf(entry.getKey());
                    if (index < 0) {
                        throw new IllegalArgumentException(String.format("Illegal partition list: partition key %s is not found in schema.", entry.getKey()));
                    }
                    if (entry.getValue() != null) {
                        match = row.getField(index) == null ? false : entry.getValue().equals(Objects.requireNonNull(row.getField(index)).toString());
                    } else {
                        boolean bl = match = row.getField(index) == null;
                    }
                    if (match) continue;
                    break;
                }
                if (!match) continue;
                ((Collection)map.get(partition)).add(row);
                continue block1;
            }
        }
        return map;
    }

    private ChangelogMode parseChangelogMode(String string) {
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        block12: for (String split : string.split(",")) {
            switch (split.trim()) {
                case "I": {
                    builder.addContainedKind(RowKind.INSERT);
                    continue block12;
                }
                case "UB": {
                    builder.addContainedKind(RowKind.UPDATE_BEFORE);
                    continue block12;
                }
                case "UA": {
                    builder.addContainedKind(RowKind.UPDATE_AFTER);
                    continue block12;
                }
                case "D": {
                    builder.addContainedKind(RowKind.DELETE);
                    continue block12;
                }
                default: {
                    throw new IllegalArgumentException("Invalid ChangelogMode string: " + string);
                }
            }
        }
        return builder.build();
    }

    private static Map<String, DataType> convertToMetadataMap(Map<String, String> metadataOption, ClassLoader classLoader) {
        return metadataOption.keySet().stream().sorted().collect(Collectors.toMap(Function.identity(), key -> {
            String typeString = (String)metadataOption.get(key);
            LogicalType type = LogicalTypeParser.parse((String)typeString, (ClassLoader)classLoader);
            return TypeConversions.fromLogicalToDataType((LogicalType)type);
        }, (u, v) -> {
            throw new IllegalStateException();
        }, LinkedHashMap::new));
    }

    private static class FromRowDataSourceFunction
    implements SourceFunction<RowData> {
        private final String dataId;
        private final int sleepAfterElements;
        private final long sleepTimeMillis;
        private final AtomicInteger elementCtr = new AtomicInteger(0);
        private volatile boolean isRunning = true;

        public FromRowDataSourceFunction(String dataId, int sleepAfterElements, long sleepTimeMillis) {
            this.dataId = dataId;
            this.sleepAfterElements = sleepAfterElements;
            this.sleepTimeMillis = sleepTimeMillis;
        }

        public void run(SourceFunction.SourceContext<RowData> ctx) throws Exception {
            Collection values = registeredRowData.getOrDefault(this.dataId, Collections.emptyList());
            Iterator valueIter = values.iterator();
            while (this.isRunning && valueIter.hasNext()) {
                ctx.collect(valueIter.next());
                if (this.elementCtr.incrementAndGet() < this.sleepAfterElements || this.sleepTimeMillis <= 0L) continue;
                try {
                    Thread.sleep(this.sleepTimeMillis);
                    this.elementCtr.set(0);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    public static class TestSinkContextTableSink
    implements DynamicTableSink {
        public static final List<Long> ROWTIMES = new ArrayList<Long>();

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return ChangelogMode.insertOnly();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            List<Long> list = ROWTIMES;
            synchronized (list) {
                ROWTIMES.clear();
            }
            SinkFunction<RowData> sinkFunction = new SinkFunction<RowData>(){
                private static final long serialVersionUID = -4871941979714977824L;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void invoke(RowData value, SinkFunction.Context context) throws Exception {
                    List<Long> list = ROWTIMES;
                    synchronized (list) {
                        ROWTIMES.add(context.timestamp());
                    }
                }
            };
            return SinkFunctionProvider.of((SinkFunction)sinkFunction);
        }

        public DynamicTableSink copy() {
            return new TestSinkContextTableSink();
        }

        public String asSummaryString() {
            return "TestSinkContextTableSink";
        }
    }

    private static class TestValuesTableSink
    implements DynamicTableSink,
    SupportsWritingMetadata,
    SupportsPartitioning,
    SupportsOverwrite {
        private DataType consumedDataType;
        private int[] primaryKeyIndices;
        private final String tableName;
        private final boolean isInsertOnly;
        private final String runtimeSink;
        private final int expectedNum;
        private final Map<String, DataType> writableMetadata;
        private final Integer parallelism;
        private final ChangelogMode changelogModeEnforced;
        private final int rowtimeIndex;
        private final TableSchema tableSchema;

        private TestValuesTableSink(DataType consumedDataType, int[] primaryKeyIndices, String tableName, boolean isInsertOnly, String runtimeSink, int expectedNum, Map<String, DataType> writableMetadata, @Nullable Integer parallelism, @Nullable ChangelogMode changelogModeEnforced, int rowtimeIndex, TableSchema tableSchema) {
            this.consumedDataType = consumedDataType;
            this.primaryKeyIndices = primaryKeyIndices;
            this.tableName = tableName;
            this.isInsertOnly = isInsertOnly;
            this.runtimeSink = runtimeSink;
            this.expectedNum = expectedNum;
            this.writableMetadata = writableMetadata;
            this.parallelism = parallelism;
            this.changelogModeEnforced = changelogModeEnforced;
            this.rowtimeIndex = rowtimeIndex;
            this.tableSchema = tableSchema;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            if (this.changelogModeEnforced != null) {
                return this.changelogModeEnforced;
            }
            if (this.isInsertOnly) {
                return ChangelogMode.insertOnly();
            }
            if (this.primaryKeyIndices.length > 0) {
                return ChangelogMode.upsert();
            }
            return requestedMode;
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            TestValuesRuntimeFunctions.AbstractExactlyOnceSink sinkFunction;
            final DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.consumedDataType);
            final Optional<Integer> parallelismOption = Optional.ofNullable(this.parallelism);
            Boolean isEnforcedInsertOnly = Optional.ofNullable(this.changelogModeEnforced).map(changelogMode -> changelogMode.equals((Object)ChangelogMode.insertOnly())).orElse(false);
            Boolean isInsertOnly = isEnforcedInsertOnly != false || this.isInsertOnly;
            if (isInsertOnly.booleanValue()) {
                Preconditions.checkArgument((this.expectedNum == -1 ? 1 : 0) != 0, (Object)("Appending Sink doesn't support '" + SINK_EXPECTED_MESSAGES_NUM.key() + "' yet."));
                switch (this.runtimeSink) {
                    case "SinkFunction": {
                        return new SinkFunctionProvider(){

                            public Optional<Integer> getParallelism() {
                                return parallelismOption;
                            }

                            public SinkFunction<RowData> createSinkFunction() {
                                return new TestValuesRuntimeFunctions.AppendingSinkFunction(tableName, consumedDataType, converter, rowtimeIndex);
                            }
                        };
                    }
                    case "OutputFormat": {
                        return new OutputFormatProvider(){

                            public OutputFormat<RowData> createOutputFormat() {
                                return new TestValuesRuntimeFunctions.AppendingOutputFormat(tableName, converter);
                            }

                            public Optional<Integer> getParallelism() {
                                return parallelismOption;
                            }
                        };
                    }
                    case "DataStream": {
                        return new DataStreamSinkProvider(){

                            public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                                DataStreamSink sink = dataStream.addSink((SinkFunction)new TestValuesRuntimeFunctions.AppendingSinkFunction(tableName, consumedDataType, converter, rowtimeIndex));
                                providerContext.generateUid("sink-function").ifPresent(arg_0 -> ((DataStreamSink)sink).uid(arg_0));
                                return sink;
                            }

                            public Optional<Integer> getParallelism() {
                                return parallelismOption;
                            }
                        };
                    }
                }
                throw new IllegalArgumentException("Unsupported runtime sink class: " + this.runtimeSink);
            }
            Assertions.assertThat((boolean)this.runtimeSink.equals("SinkFunction")).isTrue();
            if (this.primaryKeyIndices.length > 0) {
                int[][] targetColumns = context.getTargetColumns().orElse(new int[0][]);
                Preconditions.checkArgument((boolean)Arrays.stream(targetColumns).allMatch(subArr -> ((int[])subArr).length <= 1), (Object)"partial-insert composite columns are not supported yet!");
                sinkFunction = new TestValuesRuntimeFunctions.KeyedUpsertingSinkFunction(this.tableName, this.consumedDataType, converter, this.primaryKeyIndices, Arrays.stream(targetColumns).mapToInt(a -> a[0]).toArray(), this.expectedNum, this.tableSchema.getFieldCount());
            } else {
                Preconditions.checkArgument((this.expectedNum == -1 ? 1 : 0) != 0, (Object)("Retracting Sink doesn't support '" + SINK_EXPECTED_MESSAGES_NUM.key() + "' yet."));
                sinkFunction = new TestValuesRuntimeFunctions.RetractingSinkFunction(this.tableName, this.consumedDataType, converter);
            }
            return SinkFunctionProvider.of((SinkFunction)sinkFunction, (Integer)this.parallelism);
        }

        public DynamicTableSink copy() {
            return new TestValuesTableSink(this.consumedDataType, this.primaryKeyIndices, this.tableName, this.isInsertOnly, this.runtimeSink, this.expectedNum, this.writableMetadata, this.parallelism, this.changelogModeEnforced, this.rowtimeIndex, this.tableSchema);
        }

        public String asSummaryString() {
            return "TestValues";
        }

        public Map<String, DataType> listWritableMetadata() {
            return this.writableMetadata;
        }

        public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
            this.consumedDataType = consumedDataType;
        }

        public void applyStaticPartition(Map<String, String> partition) {
        }

        public boolean requiresPartitionGrouping(boolean supportsGrouping) {
            return supportsGrouping;
        }

        public void applyOverwrite(boolean overwrite) {
        }
    }

    private static class TestValuesScanTableSourceWithInternalData
    implements ScanTableSource {
        private final String dataId;
        private final boolean bounded;
        private final int sleepAfterElements;
        private final long sleepTimeMillis;

        public TestValuesScanTableSourceWithInternalData(String dataId, boolean bounded, int sleepAfterElements, long sleepTimeMillis) {
            this.dataId = dataId;
            this.bounded = bounded;
            this.sleepAfterElements = sleepAfterElements;
            this.sleepTimeMillis = sleepTimeMillis;
        }

        public ChangelogMode getChangelogMode() {
            return ChangelogMode.insertOnly();
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            FromRowDataSourceFunction sourceFunction = new FromRowDataSourceFunction(this.dataId, this.sleepAfterElements, this.sleepTimeMillis);
            return SourceFunctionProvider.of((SourceFunction)sourceFunction, (boolean)this.bounded);
        }

        public DynamicTableSource copy() {
            return new TestValuesScanTableSourceWithInternalData(this.dataId, this.bounded, this.sleepAfterElements, this.sleepTimeMillis);
        }

        public String asSummaryString() {
            return "TestValuesWithInternalData";
        }
    }

    public static class MockedLookupTableSource
    implements LookupTableSource {
        public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
            return null;
        }

        public DynamicTableSource copy() {
            return null;
        }

        public String asSummaryString() {
            return null;
        }
    }

    private static class TestValuesScanLookupTableSource
    extends TestValuesScanTableSource
    implements LookupTableSource,
    SupportsDynamicFiltering {
        @Nullable
        private final String lookupFunctionClass;
        @Nullable
        private final LookupCache cache;
        @Nullable
        private final CacheReloadTrigger reloadTrigger;
        private final boolean isAsync;
        private final int lookupThreshold;
        private final DataType originType;

        private TestValuesScanLookupTableSource(DataType originType, DataType producedDataType, ChangelogMode changelogMode, Boundedness boundedness, TerminatingLogic terminating, String runtimeSource, boolean failingSource, Map<Map<String, String>, Collection<Row>> data, boolean isAsync, @Nullable String lookupFunctionClass, boolean nestedProjectionSupported, int[][] projectedFields, List<ResolvedExpression> filterPredicates, Set<String> filterableFields, Set<String> dynamicFilteringFields, int numElementToSkip, long limit, List<Map<String, String>> allPartitions, Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields, @Nullable LookupCache cache, @Nullable CacheReloadTrigger reloadTrigger, int lookupThreshold) {
            super(producedDataType, changelogMode, boundedness, terminating, runtimeSource, failingSource, data, nestedProjectionSupported, projectedFields, filterPredicates, filterableFields, dynamicFilteringFields, numElementToSkip, limit, allPartitions, readableMetadata, projectedMetadataFields);
            this.originType = originType;
            this.lookupFunctionClass = lookupFunctionClass;
            this.isAsync = isAsync;
            this.cache = cache;
            this.reloadTrigger = reloadTrigger;
            this.lookupThreshold = lookupThreshold;
        }

        public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
            Collection rows;
            if (this.lookupFunctionClass != null) {
                try {
                    Class<?> clazz = Class.forName(this.lookupFunctionClass);
                    Object udtf = InstantiationUtil.instantiate(clazz);
                    if (udtf instanceof LookupFunction) {
                        return LookupFunctionProvider.of((LookupFunction)((LookupFunction)udtf));
                    }
                    if (udtf instanceof AsyncLookupFunction) {
                        return AsyncLookupFunctionProvider.of((AsyncLookupFunction)((AsyncLookupFunction)udtf));
                    }
                    if (udtf instanceof TableFunction) {
                        return TableFunctionProvider.of((TableFunction)((TableFunction)udtf));
                    }
                    return AsyncTableFunctionProvider.of((AsyncTableFunction)((AsyncTableFunction)udtf));
                }
                catch (ClassNotFoundException e) {
                    throw new IllegalArgumentException("Could not instantiate class: " + this.lookupFunctionClass);
                }
            }
            int[] lookupIndices = Arrays.stream(context.getKeys()).mapToInt(k -> k[0]).toArray();
            if (this.allPartitions.equals(Collections.EMPTY_LIST)) {
                rows = this.data.getOrDefault(Collections.EMPTY_MAP, Collections.EMPTY_LIST);
            } else {
                rows = new ArrayList();
                this.allPartitions.forEach(key -> rows.addAll(this.data.getOrDefault(key, new ArrayList())));
            }
            List data = new ArrayList(rows);
            if (this.numElementToSkip > 0) {
                data = this.numElementToSkip >= data.size() ? Collections.EMPTY_LIST : data.subList(this.numElementToSkip, data.size());
            }
            if (this.nestedProjectionSupported) {
                throw new UnsupportedOperationException("nestedProjectionSupported is unsupported for lookup source currently.");
            }
            DynamicTableSource.DataStructureConverter converter = context.createDataStructureConverter(this.originType);
            RowType originRowType = RowType.of((LogicalType[])this.originType.getLogicalType().getChildren().toArray(new LogicalType[0]));
            RowType producedRowType = RowType.of((LogicalType[])this.producedDataType.getLogicalType().getChildren().toArray(new LogicalType[0]));
            Optional<GeneratedProjection> generatedProjection = this.genProjection(originRowType, producedRowType);
            if (this.isAsync) {
                TestValuesRuntimeFunctions.AsyncTestValueLookupFunction asyncLookupFunction = this.getTestValuesAsyncLookupFunction(data, lookupIndices, producedRowType, converter, generatedProjection);
                if (this.cache == null) {
                    return AsyncLookupFunctionProvider.of((AsyncLookupFunction)asyncLookupFunction);
                }
                return PartialCachingAsyncLookupProvider.of((AsyncLookupFunction)asyncLookupFunction, (LookupCache)this.cache);
            }
            TestValuesRuntimeFunctions.TestValuesLookupFunction lookupFunction = this.getTestValuesLookupFunction(data, lookupIndices, producedRowType, converter, generatedProjection);
            if (this.cache != null) {
                return PartialCachingLookupProvider.of((LookupFunction)lookupFunction, (LookupCache)this.cache);
            }
            if (this.reloadTrigger != null) {
                DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(this.originType.getChildren().toArray(new DataType[0]));
                FullCacheTestInputFormat inputFormat = new FullCacheTestInputFormat(data, generatedProjection, rowConverter);
                return FullCachingLookupProvider.of((ScanTableSource.ScanRuntimeProvider)InputFormatProvider.of((InputFormat)inputFormat), (CacheReloadTrigger)this.reloadTrigger);
            }
            return LookupFunctionProvider.of((LookupFunction)lookupFunction);
        }

        private Optional<GeneratedProjection> genProjection(RowType originRowType, RowType producedRowType) {
            if (null == this.projectedPhysicalFields) {
                return Optional.empty();
            }
            CodeGeneratorContext context = new CodeGeneratorContext((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader());
            int[] mapping = Arrays.stream(this.projectedPhysicalFields).mapToInt(levelOne -> levelOne[0]).toArray();
            return Optional.of(ProjectionCodeGenerator.generateProjection((CodeGeneratorContext)context, (String)"InternalProjection", (RowType)originRowType, (RowType)producedRowType, (int[])mapping, GenericRowData.class));
        }

        private TestValuesRuntimeFunctions.AsyncTestValueLookupFunction getTestValuesAsyncLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> projection) {
            if (this.lookupThreshold > 0) {
                return new TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessAsyncLookupFunction(data, lookupIndices, producedRowType, converter, projection, this.lookupThreshold);
            }
            return new TestValuesRuntimeFunctions.AsyncTestValueLookupFunction(data, lookupIndices, producedRowType, converter, projection);
        }

        private TestValuesRuntimeFunctions.TestValuesLookupFunction getTestValuesLookupFunction(List<Row> data, int[] lookupIndices, RowType producedRowType, DynamicTableSource.DataStructureConverter converter, Optional<GeneratedProjection> generatedProjection) {
            if (this.lookupThreshold > 0) {
                return new TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessLookupFunction(data, lookupIndices, producedRowType, converter, generatedProjection, this.lookupThreshold);
            }
            return new TestValuesRuntimeFunctions.TestValuesLookupFunction(data, lookupIndices, producedRowType, converter, generatedProjection);
        }

        @Override
        public DynamicTableSource copy() {
            return new TestValuesScanLookupTableSource(this.originType, this.producedDataType, this.changelogMode, this.boundedness, this.terminating, this.runtimeSource, this.failingSource, this.data, this.isAsync, this.lookupFunctionClass, this.nestedProjectionSupported, this.projectedPhysicalFields, this.filterPredicates, this.filterableFields, this.dynamicFilteringFields, this.numElementToSkip, this.limit, this.allPartitions, this.readableMetadata, this.projectedMetadataFields, this.cache, this.reloadTrigger, this.lookupThreshold);
        }
    }

    private static class TestValuesScanTableSourceWithWatermarkPushDown
    extends TestValuesScanTableSource
    implements SupportsWatermarkPushDown,
    SupportsSourceWatermark {
        private final String tableName;
        private WatermarkStrategy<RowData> watermarkStrategy;

        private TestValuesScanTableSourceWithWatermarkPushDown(DataType producedDataType, ChangelogMode changelogMode, TerminatingLogic terminating, String runtimeSource, boolean failingSource, Map<Map<String, String>, Collection<Row>> data, String tableName, boolean nestedProjectionSupported, @Nullable int[][] projectedPhysicalFields, List<ResolvedExpression> filterPredicates, Set<String> filterableFields, Set<String> dynamicFilteringFields, int numElementToSkip, long limit, List<Map<String, String>> allPartitions, Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields) {
            super(producedDataType, changelogMode, Boundedness.CONTINUOUS_UNBOUNDED, terminating, runtimeSource, failingSource, data, nestedProjectionSupported, projectedPhysicalFields, filterPredicates, filterableFields, dynamicFilteringFields, numElementToSkip, limit, allPartitions, readableMetadata, projectedMetadataFields);
            this.tableName = tableName;
        }

        public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
            this.watermarkStrategy = watermarkStrategy;
        }

        public void applySourceWatermark() {
            this.watermarkStrategy = WatermarkStrategy.noWatermarks();
        }

        @Override
        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            TypeInformation type = runtimeProviderContext.createTypeInformation(this.producedDataType);
            TypeSerializer serializer = type.createSerializer((SerializerConfig)new SerializerConfigImpl());
            DynamicTableSource.DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(this.producedDataType);
            converter.open(RuntimeConverter.Context.create((ClassLoader)TestValuesTableFactory.class.getClassLoader()));
            Collection<RowData> values = this.convertToRowData(converter);
            try {
                return SourceFunctionProvider.of((SourceFunction)new TestValuesRuntimeFunctions.FromElementSourceFunctionWithWatermark(this.tableName, (TypeSerializer<RowData>)serializer, values, this.watermarkStrategy), (boolean)false);
            }
            catch (IOException e) {
                throw new TableException("Fail to init source function", (Throwable)e);
            }
        }

        @Override
        public DynamicTableSource copy() {
            TestValuesScanTableSourceWithWatermarkPushDown newSource = new TestValuesScanTableSourceWithWatermarkPushDown(this.producedDataType, this.changelogMode, this.terminating, this.runtimeSource, this.failingSource, this.data, this.tableName, this.nestedProjectionSupported, this.projectedPhysicalFields, this.filterPredicates, this.filterableFields, this.dynamicFilteringFields, this.numElementToSkip, this.limit, this.allPartitions, this.readableMetadata, this.projectedMetadataFields);
            newSource.watermarkStrategy = this.watermarkStrategy;
            return newSource;
        }
    }

    private static class TestValuesScanTableSource
    extends TestValuesScanTableSourceWithoutProjectionPushDown
    implements SupportsProjectionPushDown {
        private TestValuesScanTableSource(DataType producedDataType, ChangelogMode changelogMode, Boundedness boundedness, TerminatingLogic terminating, String runtimeSource, boolean failingSource, Map<Map<String, String>, Collection<Row>> data, boolean nestedProjectionSupported, @Nullable int[][] projectedPhysicalFields, List<ResolvedExpression> filterPredicates, Set<String> filterableFields, Set<String> dynamicFilteringFields, int numElementToSkip, long limit, List<Map<String, String>> allPartitions, Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields) {
            super(producedDataType, changelogMode, boundedness, terminating, runtimeSource, failingSource, data, nestedProjectionSupported, projectedPhysicalFields, filterPredicates, filterableFields, dynamicFilteringFields, numElementToSkip, limit, allPartitions, readableMetadata, projectedMetadataFields, null);
        }

        @Override
        public DynamicTableSource copy() {
            return new TestValuesScanTableSource(this.producedDataType, this.changelogMode, this.boundedness, this.terminating, this.runtimeSource, this.failingSource, this.data, this.nestedProjectionSupported, this.projectedPhysicalFields, this.filterPredicates, this.filterableFields, this.dynamicFilteringFields, this.numElementToSkip, this.limit, this.allPartitions, this.readableMetadata, this.projectedMetadataFields);
        }

        public boolean supportsNestedProjection() {
            return this.nestedProjectionSupported;
        }

        public void applyProjection(int[][] projectedFields, DataType producedDataType) {
            this.producedDataType = producedDataType;
            this.projectedPhysicalFields = projectedFields;
        }
    }

    private static class TestValuesScanTableSourceWithoutProjectionPushDown
    implements ScanTableSource,
    SupportsFilterPushDown,
    SupportsLimitPushDown,
    SupportsPartitionPushDown,
    SupportsReadingMetadata,
    SupportsAggregatePushDown,
    SupportsDynamicFiltering {
        protected DataType producedDataType;
        protected final ChangelogMode changelogMode;
        protected final Boundedness boundedness;
        protected final TerminatingLogic terminating;
        protected final String runtimeSource;
        protected final boolean failingSource;
        protected Map<Map<String, String>, Collection<Row>> data;
        protected final boolean nestedProjectionSupported;
        @Nullable
        protected int[][] projectedPhysicalFields;
        protected List<ResolvedExpression> filterPredicates;
        protected final Set<String> filterableFields;
        protected final Set<String> dynamicFilteringFields;
        protected long limit;
        protected int numElementToSkip;
        protected List<Map<String, String>> allPartitions;
        protected final Map<String, DataType> readableMetadata;
        @Nullable
        protected int[] projectedMetadataFields;
        @Nullable
        private int[] groupingSet;
        private List<AggregateExpression> aggregateExpressions;
        private List<String> acceptedPartitionFilterFields;
        private final Integer parallelism;

        private TestValuesScanTableSourceWithoutProjectionPushDown(DataType producedDataType, ChangelogMode changelogMode, Boundedness boundedness, TerminatingLogic terminating, String runtimeSource, boolean failingSource, Map<Map<String, String>, Collection<Row>> data, boolean nestedProjectionSupported, @Nullable int[][] projectedPhysicalFields, List<ResolvedExpression> filterPredicates, Set<String> filterableFields, Set<String> dynamicFilteringFields, int numElementToSkip, long limit, List<Map<String, String>> allPartitions, Map<String, DataType> readableMetadata, @Nullable int[] projectedMetadataFields, @Nullable Integer parallelism) {
            this.producedDataType = producedDataType;
            this.changelogMode = changelogMode;
            this.boundedness = boundedness;
            this.terminating = terminating;
            this.runtimeSource = runtimeSource;
            this.failingSource = failingSource;
            this.data = data;
            this.nestedProjectionSupported = nestedProjectionSupported;
            this.projectedPhysicalFields = projectedPhysicalFields;
            this.filterPredicates = filterPredicates;
            this.filterableFields = filterableFields;
            this.dynamicFilteringFields = dynamicFilteringFields;
            this.numElementToSkip = numElementToSkip;
            this.limit = limit;
            this.allPartitions = allPartitions;
            this.readableMetadata = readableMetadata;
            this.projectedMetadataFields = projectedMetadataFields;
            this.groupingSet = null;
            this.aggregateExpressions = Collections.emptyList();
            this.parallelism = parallelism;
        }

        public ChangelogMode getChangelogMode() {
            return this.changelogMode;
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            TypeInformation type = runtimeProviderContext.createTypeInformation(this.producedDataType);
            TypeSerializer serializer = type.createSerializer((SerializerConfig)new SerializerConfigImpl());
            DynamicTableSource.DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(this.producedDataType);
            converter.open(RuntimeConverter.Context.create((ClassLoader)TestValuesTableFactory.class.getClassLoader()));
            switch (this.runtimeSource) {
                case "SourceFunction": {
                    try {
                        Preconditions.checkArgument((this.terminating == TerminatingLogic.FINITE ? 1 : 0) != 0, (Object)"Values Source doesn't support infinite SourceFunction.");
                        Collection<RowData> values = this.convertToRowData(converter);
                        Object sourceFunction = this.failingSource ? new FailingCollectionSource<RowData>(serializer, values, values.size() / 2) : new FromElementsFunction(serializer, values);
                        return SourceFunctionProvider.of((SourceFunction)sourceFunction, (this.boundedness == Boundedness.BOUNDED ? 1 : 0) != 0, (Integer)this.parallelism);
                    }
                    catch (IOException e) {
                        throw new TableException("Fail to init source function", (Throwable)e);
                    }
                }
                case "InputFormat": {
                    Preconditions.checkArgument((!this.failingSource ? 1 : 0) != 0, (Object)"Values InputFormat Source doesn't support as failing source.");
                    Preconditions.checkArgument((this.terminating == TerminatingLogic.FINITE ? 1 : 0) != 0, (Object)"Values Source doesn't support infinite InputFormat.");
                    Collection<RowData> values = this.convertToRowData(converter);
                    return InputFormatProvider.of((InputFormat)new CollectionInputFormat(values, serializer), (Integer)this.parallelism);
                }
                case "DataStream": {
                    Preconditions.checkArgument((!this.failingSource ? 1 : 0) != 0, (Object)"Values DataStream Source doesn't support as failing source.");
                    Preconditions.checkArgument((this.terminating == TerminatingLogic.FINITE ? 1 : 0) != 0, (Object)"Values Source doesn't support infinite DataStream.");
                    try {
                        Collection<RowData> values2 = this.convertToRowData(converter);
                        final FromElementsFunction function = new FromElementsFunction(serializer, values2);
                        return new DataStreamScanProvider(){

                            public DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
                                DataStreamSource sourceStream = execEnv.addSource((SourceFunction)function);
                                providerContext.generateUid("source-function").ifPresent(arg_0 -> ((DataStreamSource)sourceStream).uid(arg_0));
                                return sourceStream;
                            }

                            public Optional<Integer> getParallelism() {
                                return Optional.ofNullable(parallelism);
                            }

                            public boolean isBounded() {
                                return boundedness == Boundedness.BOUNDED;
                            }
                        };
                    }
                    catch (IOException e) {
                        throw new TableException("Fail to init data stream source", (Throwable)e);
                    }
                }
                case "NewSource": {
                    Preconditions.checkArgument((!this.failingSource ? 1 : 0) != 0, (Object)"Values Source doesn't support as failing new source.");
                    if (this.acceptedPartitionFilterFields == null || this.acceptedPartitionFilterFields.isEmpty()) {
                        Collection<RowData> values2 = this.convertToRowData(converter);
                        return SourceProvider.of((Source)new ValuesSource(this.terminating, this.boundedness, values2, (TypeSerializer<RowData>)serializer), (Integer)this.parallelism);
                    }
                    Map<Map<String, String>, Collection<RowData>> partitionValues = this.convertToPartitionedRowData(converter);
                    DynamicFilteringValuesSource source = new DynamicFilteringValuesSource(this.terminating, this.boundedness, partitionValues, (TypeSerializer<RowData>)serializer, this.acceptedPartitionFilterFields);
                    return SourceProvider.of((Source)source, (Integer)this.parallelism);
                }
            }
            throw new IllegalArgumentException("Unsupported runtime source class: " + this.runtimeSource);
        }

        public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> filters) {
            ArrayList<ResolvedExpression> acceptedFilters = new ArrayList<ResolvedExpression>();
            ArrayList<ResolvedExpression> remainingFilters = new ArrayList<ResolvedExpression>();
            for (ResolvedExpression expr : filters) {
                if (FilterUtils.shouldPushDown(expr, this.filterableFields)) {
                    acceptedFilters.add(expr);
                    continue;
                }
                remainingFilters.add(expr);
            }
            this.filterPredicates = acceptedFilters;
            this.data = this.filterAllData(this.data);
            return SupportsFilterPushDown.Result.of(acceptedFilters, remainingFilters);
        }

        private Function<String, Comparable<?>> getValueGetter(Row row) {
            List fieldNames = DataTypeUtils.flattenToNames((DataType)this.producedDataType);
            return fieldName -> {
                int idx = fieldNames.indexOf(fieldName);
                return (Comparable)row.getField(idx);
            };
        }

        private Function<int[], Comparable<?>> getNestedValueGetter(Row row) {
            return fieldIndices -> {
                Object current = row;
                for (int i = 0; i < ((int[])fieldIndices).length - 1; ++i) {
                    current = current.getField(fieldIndices[i]);
                }
                return (Comparable)current.getField(fieldIndices[((int[])fieldIndices).length - 1]);
            };
        }

        public DynamicTableSource copy() {
            return new TestValuesScanTableSourceWithoutProjectionPushDown(this.producedDataType, this.changelogMode, this.boundedness, this.terminating, this.runtimeSource, this.failingSource, this.data, this.nestedProjectionSupported, this.projectedPhysicalFields, this.filterPredicates, this.filterableFields, this.dynamicFilteringFields, this.numElementToSkip, this.limit, this.allPartitions, this.readableMetadata, this.projectedMetadataFields, this.parallelism);
        }

        public String asSummaryString() {
            return "TestValues";
        }

        protected Collection<RowData> convertToRowData(DynamicTableSource.DataStructureConverter converter) {
            ArrayList<RowData> result = new ArrayList<RowData>();
            for (Collection<RowData> rowData : this.convertToPartitionedRowData(converter).values()) {
                result.addAll(rowData);
            }
            return result;
        }

        protected Map<Map<String, String>, Collection<RowData>> convertToPartitionedRowData(DynamicTableSource.DataStructureConverter converter) {
            HashMap<Map<String, String>, Collection<RowData>> result = new HashMap<Map<String, String>, Collection<RowData>>();
            int size = 0;
            int numSkipped = 0;
            for (Map<String, String> partition : this.data.keySet()) {
                ArrayList<RowData> partitionResult = new ArrayList<RowData>();
                result.put(partition, partitionResult);
                Collection<Row> rowsInPartition = this.data.get(partition);
                int numToSkipInPartition = 0;
                if (numSkipped < this.numElementToSkip) {
                    numToSkipInPartition = Math.min(rowsInPartition.size(), this.numElementToSkip - numSkipped);
                }
                numSkipped += numToSkipInPartition;
                List<Row> rowsRetained = rowsInPartition.stream().skip(numToSkipInPartition).map(row -> {
                    Row projectedRow = this.projectRow((Row)row);
                    projectedRow.setKind(row.getKind());
                    return projectedRow;
                }).collect(Collectors.toList());
                if (!this.aggregateExpressions.isEmpty()) {
                    rowsRetained = this.applyAggregatesToRows(rowsRetained);
                }
                for (Row row2 : rowsRetained) {
                    RowData rowData = (RowData)converter.toInternal((Object)row2);
                    if (rowData != null) {
                        rowData.setRowKind(row2.getKind());
                        partitionResult.add(rowData);
                        ++size;
                    }
                    if ((long)size < this.limit) continue;
                    return result;
                }
            }
            return result;
        }

        private List<Row> applyAggregatesToRows(List<Row> rows) {
            if (this.groupingSet != null && this.groupingSet.length > 0) {
                HashMap<Row, ArrayList<Row>> buffer = new HashMap<Row, ArrayList<Row>>();
                for (Row row : rows) {
                    Row bufferKey = new Row(this.groupingSet.length);
                    for (int i = 0; i < this.groupingSet.length; ++i) {
                        bufferKey.setField(i, row.getField(this.groupingSet[i]));
                    }
                    if (buffer.containsKey(bufferKey)) {
                        ((List)buffer.get(bufferKey)).add(row);
                        continue;
                    }
                    buffer.put(bufferKey, new ArrayList<Row>(Collections.singletonList(row)));
                }
                ArrayList<Row> result = new ArrayList<Row>();
                for (Map.Entry entry : buffer.entrySet()) {
                    result.add(Row.join((Row)((Row)entry.getKey()), (Row[])new Row[]{this.accumulateRows((List)entry.getValue())}));
                }
                return result;
            }
            return Collections.singletonList(this.accumulateRows(rows));
        }

        private Row accumulateRows(List<Row> rows) {
            Row result = new Row(this.aggregateExpressions.size());
            for (int i = 0; i < this.aggregateExpressions.size(); ++i) {
                int argIndex;
                FunctionDefinition aggFunction = this.aggregateExpressions.get(i).getFunctionDefinition();
                List arguments = this.aggregateExpressions.get(i).getArgs();
                if (aggFunction instanceof MinAggFunction) {
                    argIndex = ((FieldReferenceExpression)arguments.get(0)).getFieldIndex();
                    Row minRow = rows.stream().min(Comparator.comparing(row -> (Comparable)row.getFieldAs(argIndex))).orElse(null);
                    result.setField(i, minRow != null ? minRow.getField(argIndex) : null);
                    continue;
                }
                if (aggFunction instanceof MaxAggFunction) {
                    argIndex = ((FieldReferenceExpression)arguments.get(0)).getFieldIndex();
                    Row maxRow = rows.stream().max(Comparator.comparing(row -> (Comparable)row.getFieldAs(argIndex))).orElse(null);
                    result.setField(i, maxRow != null ? maxRow.getField(argIndex) : null);
                    continue;
                }
                if (aggFunction instanceof SumAggFunction) {
                    argIndex = ((FieldReferenceExpression)arguments.get(0)).getFieldIndex();
                    Long finalSum = rows.stream().filter(row -> row.getField(argIndex) != null).mapToLong(row -> (Long)row.getFieldAs(argIndex)).sum();
                    boolean allNull = rows.stream().noneMatch(r -> r.getField(argIndex) != null);
                    result.setField(i, (Object)(allNull ? null : finalSum));
                    continue;
                }
                if (aggFunction instanceof Sum0AggFunction) {
                    argIndex = ((FieldReferenceExpression)arguments.get(0)).getFieldIndex();
                    Long finalSum0 = rows.stream().filter(row -> row.getField(argIndex) != null).mapToLong(row -> (Long)row.getFieldAs(argIndex)).sum();
                    result.setField(i, (Object)finalSum0);
                    continue;
                }
                if (aggFunction instanceof CountAggFunction) {
                    argIndex = ((FieldReferenceExpression)arguments.get(0)).getFieldIndex();
                    long count = rows.stream().filter(r -> r.getField(argIndex) != null).count();
                    result.setField(i, (Object)count);
                    continue;
                }
                if (!(aggFunction instanceof Count1AggFunction)) continue;
                result.setField(i, (Object)rows.size());
            }
            return result;
        }

        private Map<Map<String, String>, Collection<Row>> filterAllData(Map<Map<String, String>, Collection<Row>> allData) {
            HashMap<Map<String, String>, Collection<Row>> result = new HashMap<Map<String, String>, Collection<Row>>();
            for (Map<String, String> partition : allData.keySet()) {
                ArrayList<Row> remainData = new ArrayList<Row>();
                for (Row row : allData.get(partition)) {
                    boolean isRetained = FilterUtils.isRetainedAfterApplyingFilterPredicates(this.filterPredicates, this.getValueGetter(row), this.getNestedValueGetter(row));
                    if (!isRetained) continue;
                    remainData.add(row);
                }
                result.put(partition, remainData);
            }
            return result;
        }

        private Row projectRow(Row row) {
            int i;
            if (this.projectedPhysicalFields == null) {
                return row;
            }
            int originPhysicalSize = row.getArity() - this.readableMetadata.size();
            int newLength = this.projectedPhysicalFields.length + (this.projectedMetadataFields == null ? 0 : this.projectedMetadataFields.length);
            Object[] newValues = new Object[newLength];
            for (i = 0; i < this.projectedPhysicalFields.length; ++i) {
                Object field = row;
                for (int dim : this.projectedPhysicalFields[i]) {
                    field = field.getField(dim);
                }
                newValues[i] = field;
            }
            for (i = this.projectedPhysicalFields.length; i < newValues.length; ++i) {
                newValues[i] = row.getField(this.projectedMetadataFields[i - this.projectedPhysicalFields.length] + originPhysicalSize);
            }
            return Row.of((Object[])newValues);
        }

        public Optional<List<Map<String, String>>> listPartitions() {
            if (this.allPartitions.isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(this.allPartitions);
        }

        public void applyPartitions(List<Map<String, String>> remainingPartitions) {
            if (this.allPartitions.isEmpty()) {
                if (!remainingPartitions.isEmpty()) {
                    this.allPartitions = remainingPartitions;
                    this.data = TestValuesTableFactory.mapPartitionToRow(this.producedDataType, this.data.get(Collections.EMPTY_MAP), remainingPartitions);
                } else {
                    remainingPartitions = Collections.singletonList(Collections.emptyMap());
                    this.data.put(Collections.emptyMap(), Collections.emptyList());
                }
            } else {
                this.allPartitions = remainingPartitions;
                if (remainingPartitions.isEmpty()) {
                    remainingPartitions = Collections.singletonList(Collections.emptyMap());
                }
            }
            this.data = this.pruneDataByRemainingPartitions(remainingPartitions, this.data);
        }

        private Map<Map<String, String>, Collection<Row>> pruneDataByRemainingPartitions(List<Map<String, String>> remainingPartitions, Map<Map<String, String>, Collection<Row>> allData) {
            HashMap<Map<String, String>, Collection<Row>> result = new HashMap<Map<String, String>, Collection<Row>>();
            for (Map<String, String> remainingPartition : remainingPartitions) {
                result.put(remainingPartition, allData.getOrDefault(remainingPartition, Collections.emptyList()));
            }
            return result;
        }

        public boolean applyAggregates(List<int[]> groupingSets, List<AggregateExpression> aggregateExpressions, DataType producedDataType) {
            if (groupingSets.size() > 1) {
                return false;
            }
            ArrayList<AggregateExpression> aggExpressions = new ArrayList<AggregateExpression>();
            for (AggregateExpression aggExpression : aggregateExpressions) {
                FunctionDefinition functionDefinition = aggExpression.getFunctionDefinition();
                if (!(functionDefinition instanceof MinAggFunction || functionDefinition instanceof MaxAggFunction || functionDefinition instanceof SumAggFunction || functionDefinition instanceof Sum0AggFunction || functionDefinition instanceof CountAggFunction || functionDefinition instanceof Count1AggFunction)) {
                    return false;
                }
                if (aggExpression.getFilterExpression().isPresent() || aggExpression.isApproximate() || aggExpression.isDistinct()) {
                    return false;
                }
                if (aggExpression.getArgs().stream().anyMatch(field -> !(field.getOutputDataType().getLogicalType() instanceof BigIntType) && !(functionDefinition instanceof CountAggFunction) && !(functionDefinition instanceof Count1AggFunction))) {
                    return false;
                }
                aggExpressions.add(aggExpression);
            }
            this.groupingSet = groupingSets.get(0);
            this.aggregateExpressions = aggExpressions;
            this.producedDataType = producedDataType;
            return true;
        }

        public void applyLimit(long limit) {
            this.limit = limit;
        }

        public Map<String, DataType> listReadableMetadata() {
            return this.readableMetadata;
        }

        public void applyReadableMetadata(List<String> remainingMetadataKeys, DataType newProducedDataType) {
            this.producedDataType = newProducedDataType;
            ArrayList<String> allMetadataKeys = new ArrayList<String>(this.listReadableMetadata().keySet());
            this.projectedMetadataFields = remainingMetadataKeys.stream().mapToInt(allMetadataKeys::indexOf).toArray();
        }

        public List<String> listAcceptedFilterFields() {
            return new ArrayList<String>(this.dynamicFilteringFields);
        }

        public void applyDynamicFiltering(List<String> candidateFilterFields) {
            this.acceptedPartitionFilterFields = candidateFilterFields;
        }
    }
}

