/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.RowKind;

public abstract class CommonExecSink
extends ExecNodeBase<Object>
implements MultipleTransformationTranslator<Object> {
    public static final String CONSTRAINT_VALIDATOR_TRANSFORMATION = "constraint-validator";
    public static final String PARTITIONER_TRANSFORMATION = "partitioner";
    public static final String UPSERT_MATERIALIZE_TRANSFORMATION = "upsert-materialize";
    public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter";
    public static final String SINK_TRANSFORMATION = "sink";
    public static final String FIELD_NAME_DYNAMIC_TABLE_SINK = "dynamicTableSink";
    @JsonProperty(value="dynamicTableSink")
    protected final DynamicTableSinkSpec tableSinkSpec;
    private final ChangelogMode inputChangelogMode;
    private final boolean isBounded;

    protected CommonExecSink(int id, ExecNodeContext context, ReadableConfig persistedConfig, DynamicTableSinkSpec tableSinkSpec, ChangelogMode inputChangelogMode, boolean isBounded, List<InputProperty> inputProperties, LogicalType outputType, String description) {
        super(id, context, persistedConfig, inputProperties, outputType, description);
        this.tableSinkSpec = tableSinkSpec;
        this.inputChangelogMode = inputChangelogMode;
        this.isBounded = isBounded;
    }

    @Override
    public String getSimplifiedName() {
        return this.tableSinkSpec.getContextResolvedTable().getIdentifier().getObjectName();
    }

    public DynamicTableSinkSpec getTableSinkSpec() {
        return this.tableSinkSpec;
    }

    protected Transformation<Object> createSinkTransformation(StreamExecutionEnvironment streamExecEnv, ExecNodeConfig config, Transformation<RowData> inputTransform, DynamicTableSink tableSink, int rowtimeFieldIndex, boolean upsertMaterialize) {
        boolean hasPk;
        ResolvedSchema schema = this.tableSinkSpec.getContextResolvedTable().getResolvedSchema();
        DynamicTableSink.SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider((DynamicTableSink.Context)new SinkRuntimeProviderContext(this.isBounded));
        RowType physicalRowType = this.getPhysicalRowType(schema);
        int[] primaryKeys = this.getPrimaryKeyIndices(physicalRowType, schema);
        int sinkParallelism = this.deriveSinkParallelism(inputTransform, runtimeProvider);
        int inputParallelism = inputTransform.getParallelism();
        boolean inputInsertOnly = this.inputChangelogMode.containsOnly(RowKind.INSERT);
        boolean bl = hasPk = primaryKeys.length > 0;
        if (!inputInsertOnly && sinkParallelism != inputParallelism && !hasPk) {
            throw new TableException(String.format("The sink for table '%s' has a configured parallelism of %s, while the input parallelism is %s. Since the configured parallelism is different from the input's parallelism and the changelog mode is not insert-only, a primary key is required but could not be found.", this.tableSinkSpec.getContextResolvedTable().getIdentifier().asSummaryString(), sinkParallelism, inputParallelism));
        }
        boolean needMaterialization = !inputInsertOnly && upsertMaterialize;
        Transformation<RowData> sinkTransform = this.applyConstraintValidations(inputTransform, config, physicalRowType);
        if (hasPk) {
            sinkTransform = this.applyKeyBy(config, sinkTransform, primaryKeys, sinkParallelism, inputParallelism, needMaterialization);
        }
        if (needMaterialization) {
            sinkTransform = this.applyUpsertMaterialize(sinkTransform, primaryKeys, sinkParallelism, config, physicalRowType);
        }
        return this.applySinkProvider(sinkTransform, streamExecEnv, runtimeProvider, rowtimeFieldIndex, sinkParallelism, config);
    }

    private Transformation<RowData> applyConstraintValidations(Transformation<RowData> inputTransform, ExecNodeConfig config, RowType physicalRowType) {
        ConstraintEnforcer constraintEnforcer;
        List<ConstraintEnforcer.FieldInfo> binaryFieldInfo;
        ConstraintEnforcer.Builder validatorBuilder = ConstraintEnforcer.newBuilder();
        String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]);
        int[] notNullFieldIndices = this.getNotNullFieldIndices(physicalRowType);
        if (notNullFieldIndices.length > 0) {
            ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = (ExecutionConfigOptions.NotNullEnforcer)config.get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
            List notNullFieldNames = Arrays.stream(notNullFieldIndices).mapToObj(idx -> fieldNames[idx]).collect(Collectors.toList());
            validatorBuilder.addNotNullConstraint(notNullEnforcer, notNullFieldIndices, notNullFieldNames, fieldNames);
        }
        ExecutionConfigOptions.TypeLengthEnforcer typeLengthEnforcer = (ExecutionConfigOptions.TypeLengthEnforcer)config.get(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER);
        List<ConstraintEnforcer.FieldInfo> charFieldInfo = this.getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.CHAR);
        if (!charFieldInfo.isEmpty()) {
            List charFieldNames = charFieldInfo.stream().map(cfi -> fieldNames[cfi.fieldIdx()]).collect(Collectors.toList());
            validatorBuilder.addCharLengthConstraint(typeLengthEnforcer, charFieldInfo, charFieldNames, fieldNames);
        }
        if (!(binaryFieldInfo = this.getFieldInfoForLengthEnforcer(physicalRowType, LengthEnforcerType.BINARY)).isEmpty()) {
            List binaryFieldNames = binaryFieldInfo.stream().map(cfi -> fieldNames[cfi.fieldIdx()]).collect(Collectors.toList());
            validatorBuilder.addBinaryLengthConstraint(typeLengthEnforcer, binaryFieldInfo, binaryFieldNames, fieldNames);
        }
        if ((constraintEnforcer = validatorBuilder.build()) != null) {
            return ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationMeta(CONSTRAINT_VALIDATOR_TRANSFORMATION, constraintEnforcer.getOperatorName(), "ConstraintEnforcer", config), constraintEnforcer, this.getInputTypeInfo(), inputTransform.getParallelism());
        }
        return inputTransform;
    }

    private int[] getNotNullFieldIndices(RowType physicalType) {
        return IntStream.range(0, physicalType.getFieldCount()).filter(pos -> !physicalType.getTypeAt(pos).isNullable()).toArray();
    }

    private List<ConstraintEnforcer.FieldInfo> getFieldInfoForLengthEnforcer(RowType physicalType, LengthEnforcerType enforcerType) {
        LogicalTypeRoot staticType = null;
        LogicalTypeRoot variableType = null;
        int maxLength = 0;
        switch (enforcerType) {
            case CHAR: {
                staticType = LogicalTypeRoot.CHAR;
                variableType = LogicalTypeRoot.VARCHAR;
                maxLength = Integer.MAX_VALUE;
                break;
            }
            case BINARY: {
                staticType = LogicalTypeRoot.BINARY;
                variableType = LogicalTypeRoot.VARBINARY;
                maxLength = Integer.MAX_VALUE;
            }
        }
        ArrayList<ConstraintEnforcer.FieldInfo> fieldsAndLengths = new ArrayList<ConstraintEnforcer.FieldInfo>();
        for (int i = 0; i < physicalType.getFieldCount(); ++i) {
            LogicalType type = physicalType.getTypeAt(i);
            boolean isStatic = type.is(staticType);
            if (isStatic && LogicalTypeChecks.getLength((LogicalType)type) < maxLength || type.is(variableType) && LogicalTypeChecks.getLength((LogicalType)type) < maxLength) {
                fieldsAndLengths.add(new ConstraintEnforcer.FieldInfo(i, Integer.valueOf(LogicalTypeChecks.getLength((LogicalType)type)), isStatic));
                continue;
            }
            if (!isStatic) continue;
            fieldsAndLengths.add(new ConstraintEnforcer.FieldInfo(i, null, isStatic));
        }
        return fieldsAndLengths;
    }

    private int deriveSinkParallelism(Transformation<RowData> inputTransform, DynamicTableSink.SinkRuntimeProvider runtimeProvider) {
        int inputParallelism = inputTransform.getParallelism();
        if (!(runtimeProvider instanceof ParallelismProvider)) {
            return inputParallelism;
        }
        ParallelismProvider parallelismProvider = (ParallelismProvider)runtimeProvider;
        return parallelismProvider.getParallelism().map(sinkParallelism -> {
            if (sinkParallelism <= 0) {
                throw new TableException(String.format("Invalid configured parallelism %s for table '%s'.", sinkParallelism, this.tableSinkSpec.getContextResolvedTable().getIdentifier().asSummaryString()));
            }
            return sinkParallelism;
        }).orElse(inputParallelism);
    }

    private Transformation<RowData> applyKeyBy(ExecNodeConfig config, Transformation<RowData> inputTransform, int[] primaryKeys, int sinkParallelism, int inputParallelism, boolean needMaterialize) {
        ExecutionConfigOptions.SinkKeyedShuffle sinkShuffleByPk = (ExecutionConfigOptions.SinkKeyedShuffle)config.get(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE);
        boolean sinkKeyBy = false;
        switch (sinkShuffleByPk) {
            case NONE: {
                break;
            }
            case AUTO: {
                sinkKeyBy = sinkParallelism != inputParallelism && sinkParallelism != 1;
                break;
            }
            case FORCE: {
                boolean bl = sinkKeyBy = sinkParallelism != 1;
            }
        }
        if (!sinkKeyBy && !needMaterialize) {
            return inputTransform;
        }
        RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(primaryKeys, this.getInputTypeInfo());
        KeyGroupStreamPartitioner partitioner = new KeyGroupStreamPartitioner((KeySelector)selector, 128);
        PartitionTransformation partitionedTransform = new PartitionTransformation(inputTransform, (StreamPartitioner)partitioner);
        this.createTransformationMeta(PARTITIONER_TRANSFORMATION, "Partitioner", "Partitioner", config).fill(partitionedTransform);
        partitionedTransform.setParallelism(sinkParallelism);
        return partitionedTransform;
    }

    private Transformation<RowData> applyUpsertMaterialize(Transformation<RowData> inputTransform, int[] primaryKeys, int sinkParallelism, ExecNodeConfig config, RowType physicalRowType) {
        GeneratedRecordEqualiser equaliser = new EqualiserCodeGenerator(physicalRowType).generateRecordEqualiser("SinkMaterializeEqualiser");
        SinkUpsertMaterializer operator = new SinkUpsertMaterializer(StateConfigUtil.createTtlConfig((long)((Duration)config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION)).toMillis()), (TypeSerializer)InternalSerializers.create((RowType)physicalRowType), equaliser);
        String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]);
        List pkFieldNames = Arrays.stream(primaryKeys).mapToObj(idx -> fieldNames[idx]).collect(Collectors.toList());
        OneInputTransformation materializeTransform = ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationMeta(UPSERT_MATERIALIZE_TRANSFORMATION, String.format("SinkMaterializer(pk=[%s])", String.join((CharSequence)", ", pkFieldNames)), "SinkMaterializer", config), operator, inputTransform.getOutputType(), sinkParallelism);
        RowDataKeySelector keySelector = KeySelectorUtil.getRowDataSelector(primaryKeys, (InternalTypeInfo<RowData>)InternalTypeInfo.of((RowType)physicalRowType));
        materializeTransform.setStateKeySelector((KeySelector)keySelector);
        materializeTransform.setStateKeyType((TypeInformation)keySelector.getProducedType());
        return materializeTransform;
    }

    private Transformation<?> applySinkProvider(final Transformation<RowData> inputTransform, StreamExecutionEnvironment env, DynamicTableSink.SinkRuntimeProvider runtimeProvider, final int rowtimeFieldIndex, int sinkParallelism, final ExecNodeConfig config) {
        TransformationMetadata sinkMeta = this.createTransformationMeta(SINK_TRANSFORMATION, config);
        if (runtimeProvider instanceof DataStreamSinkProvider) {
            Transformation<RowData> sinkTransformation = this.applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism, config);
            DataStream dataStream = new DataStream(env, sinkTransformation);
            DataStreamSinkProvider provider = (DataStreamSinkProvider)runtimeProvider;
            return provider.consumeDataStream(this.createProviderContext(config), dataStream).getTransformation();
        }
        if (runtimeProvider instanceof TransformationSinkProvider) {
            TransformationSinkProvider provider = (TransformationSinkProvider)runtimeProvider;
            return provider.createTransformation(new TransformationSinkProvider.Context(){

                @Override
                public Transformation<RowData> getInputTransformation() {
                    return inputTransform;
                }

                @Override
                public int getRowtimeIndex() {
                    return rowtimeFieldIndex;
                }

                public Optional<String> generateUid(String name) {
                    return CommonExecSink.this.createProviderContext(config).generateUid(name);
                }
            });
        }
        if (runtimeProvider instanceof SinkFunctionProvider) {
            SinkFunction sinkFunction = ((SinkFunctionProvider)runtimeProvider).createSinkFunction();
            return this.createSinkFunctionTransformation((SinkFunction<RowData>)sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, sinkParallelism);
        }
        if (runtimeProvider instanceof OutputFormatProvider) {
            OutputFormat outputFormat = ((OutputFormatProvider)runtimeProvider).createOutputFormat();
            OutputFormatSinkFunction sinkFunction = new OutputFormatSinkFunction(outputFormat);
            return this.createSinkFunctionTransformation((SinkFunction<RowData>)sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, sinkParallelism);
        }
        if (runtimeProvider instanceof SinkProvider) {
            Transformation<RowData> sinkTransformation = this.applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism, config);
            DataStream dataStream = new DataStream(env, sinkTransformation);
            Transformation transformation = DataStreamSink.forSinkV1((DataStream)dataStream, (Sink)((SinkProvider)runtimeProvider).createSink(), (CustomSinkOperatorUidHashes)CustomSinkOperatorUidHashes.DEFAULT).getTransformation();
            transformation.setParallelism(sinkParallelism);
            sinkMeta.fill(transformation);
            return transformation;
        }
        if (runtimeProvider instanceof SinkV2Provider) {
            Transformation<RowData> sinkTransformation = this.applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism, config);
            DataStream dataStream = new DataStream(env, sinkTransformation);
            Transformation transformation = DataStreamSink.forSink((DataStream)dataStream, (org.apache.flink.api.connector.sink2.Sink)((SinkV2Provider)runtimeProvider).createSink(), (CustomSinkOperatorUidHashes)CustomSinkOperatorUidHashes.DEFAULT).getTransformation();
            transformation.setParallelism(sinkParallelism);
            sinkMeta.fill(transformation);
            return transformation;
        }
        throw new TableException("Unsupported sink runtime provider.");
    }

    private ProviderContext createProviderContext(ExecNodeConfig config) {
        return name -> {
            if (this instanceof StreamExecNode && config.shouldSetUid()) {
                return Optional.of(this.createTransformationUid(name));
            }
            return Optional.empty();
        };
    }

    private Transformation<?> createSinkFunctionTransformation(SinkFunction<RowData> sinkFunction, StreamExecutionEnvironment env, Transformation<RowData> inputTransformation, int rowtimeFieldIndex, TransformationMetadata transformationMetadata, int sinkParallelism) {
        SinkOperator operator = new SinkOperator((SinkFunction)env.clean(sinkFunction), rowtimeFieldIndex);
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable)sinkFunction).setInputType(this.getInputTypeInfo(), env.getConfig());
        }
        LegacySinkTransformation transformation = new LegacySinkTransformation(inputTransformation, transformationMetadata.getName(), (StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)operator), sinkParallelism);
        transformationMetadata.fill(transformation);
        return transformation;
    }

    private Transformation<RowData> applyRowtimeTransformation(Transformation<RowData> inputTransform, int rowtimeFieldIndex, int sinkParallelism, ExecNodeConfig config) {
        if (rowtimeFieldIndex == -1) {
            return inputTransform;
        }
        return ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationMeta(TIMESTAMP_INSERTER_TRANSFORMATION, String.format("StreamRecordTimestampInserter(rowtime field: %s)", rowtimeFieldIndex), "StreamRecordTimestampInserter", config), new StreamRecordTimestampInserter(rowtimeFieldIndex), inputTransform.getOutputType(), sinkParallelism);
    }

    private InternalTypeInfo<RowData> getInputTypeInfo() {
        return InternalTypeInfo.of((LogicalType)this.getInputEdges().get(0).getOutputType());
    }

    private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
        return schema.getPrimaryKey().map(k -> k.getColumns().stream().mapToInt(arg_0 -> ((RowType)sinkRowType).getFieldIndex(arg_0)).toArray()).orElse(new int[0]);
    }

    private RowType getPhysicalRowType(ResolvedSchema schema) {
        return (RowType)schema.toPhysicalRowDataType().getLogicalType();
    }

    private static enum LengthEnforcerType {
        CHAR,
        BINARY;

    }
}

