/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.optimize;

import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec;
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelateBase;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupAggregateBase;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLimit;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMatch;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregateBase;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSort;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSortLimit;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalSort;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalUnion;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalValues;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregateBase;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowDeduplicate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowRank;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowTableFunction;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import scala.collection.JavaConverters;

public class StreamNonDeterministicUpdatePlanVisitor {
    private static final ImmutableBitSet NO_REQUIRED_DETERMINISM = ImmutableBitSet.of();
    private static final String NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE = "There exists non deterministic function: '%s' in condition: '%s' which may cause wrong result in update pipeline.";

    public StreamPhysicalRel visit(StreamPhysicalRel rel) {
        return this.visit(rel, NO_REQUIRED_DETERMINISM);
    }

    /*
     * Unable to fully structure code
     */
    public StreamPhysicalRel visit(StreamPhysicalRel rel, ImmutableBitSet requireDeterminism) {
        block49: {
            block51: {
                block50: {
                    if (rel instanceof StreamPhysicalSink) {
                        if (this.inputInsertOnly(rel)) {
                            return this.transmitDeterminismRequirement(rel, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
                        }
                        sink = (StreamPhysicalSink)rel;
                        primaryKey = sink.contextResolvedTable().getResolvedSchema().getPrimaryKeyIndexes();
                        requireInputDeterminism = sink.upsertMaterialize() != false || null == primaryKey || primaryKey.length == 0 ? ImmutableBitSet.range(sink.getInput().getRowType().getFieldCount()) : ImmutableBitSet.of(primaryKey);
                        return this.transmitDeterminismRequirement(sink, requireInputDeterminism);
                    }
                    if (rel instanceof StreamPhysicalLegacySink) {
                        if (this.inputInsertOnly(rel)) {
                            return this.transmitDeterminismRequirement(rel, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
                        }
                        sink = (StreamPhysicalLegacySink)rel;
                        tableSchema = sink.sink().getTableSchema();
                        primaryKey = tableSchema.getPrimaryKey();
                        columns = Arrays.asList(tableSchema.getFieldNames());
                        requireInputDeterminism = primaryKey.isPresent() ? ImmutableBitSet.of((Iterable<Integer>)((UniqueConstraint)primaryKey.get()).getColumns().stream().map((Function<String, Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, indexOf(java.lang.Object ), (Ljava/lang/String;)Ljava/lang/Integer;)(columns)).collect(Collectors.toList())) : ImmutableBitSet.range(columns.size());
                        return this.transmitDeterminismRequirement(rel, requireInputDeterminism);
                    }
                    if (rel instanceof StreamPhysicalCalcBase) {
                        if (this.inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
                            return this.transmitDeterminismRequirement(rel, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
                        }
                        calc = (StreamPhysicalCalcBase)rel;
                        this.checkNonDeterministicRexProgram(requireDeterminism, calc.getProgram(), calc);
                        projects = calc.getProgram().getProjectList().stream().map((Function<RexLocalRef, RexNode>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$visit$0(org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase org.apache.calcite.rex.RexLocalRef ), (Lorg/apache/calcite/rex/RexLocalRef;)Lorg/apache/calcite/rex/RexNode;)((StreamPhysicalCalcBase)calc)).collect(Collectors.toList());
                        outFromSourcePos = this.extractSourceMapping(projects);
                        conv2Inputs = requireDeterminism.toList().stream().map((Function<Integer, Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$visit$2(java.util.Map org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase java.lang.Integer ), (Ljava/lang/Integer;)Ljava/lang/Integer;)(outFromSourcePos, (StreamPhysicalCalcBase)calc)).filter((Predicate<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$visit$3(java.lang.Integer ), (Ljava/lang/Integer;)Z)()).collect(Collectors.toList());
                        return this.transmitDeterminismRequirement(calc, ImmutableBitSet.of(conv2Inputs));
                    }
                    if (rel instanceof StreamPhysicalCorrelateBase) {
                        if (this.inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
                            return this.transmitDeterminismRequirement(rel, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
                        }
                        correlate = (StreamPhysicalCorrelateBase)rel;
                        if (correlate.condition().isDefined()) {
                            rexNode = (RexNode)correlate.condition().get();
                            this.checkNonDeterministicCondition(rexNode, correlate);
                        }
                        leftFieldCnt = correlate.inputRel().getRowType().getFieldCount();
                        ndCall = FlinkRexUtil.getNonDeterministicCallNameInStreaming(correlate.scan().getCall());
                        if (ndCall.isPresent() && !(unsatisfiedColumns = requireDeterminism.toList().stream().filter((Predicate<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$visit$4(int java.lang.Integer ), (Ljava/lang/Integer;)Z)((int)leftFieldCnt)).collect(Collectors.toList())).isEmpty()) {
                            this.throwNonDeterministicColumnsError(unsatisfiedColumns, correlate.getRowType(), correlate, null, ndCall);
                        }
                        if ((fromLeft = requireDeterminism.toList().stream().filter((Predicate<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$visit$5(int java.lang.Integer ), (Ljava/lang/Integer;)Z)((int)leftFieldCnt)).collect(Collectors.toList())).isEmpty()) {
                            return this.transmitDeterminismRequirement(correlate, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
                        }
                        return this.transmitDeterminismRequirement(correlate, ImmutableBitSet.of(fromLeft));
                    }
                    if (!(rel instanceof StreamPhysicalLookupJoin)) break block49;
                    if (this.inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
                        return this.transmitDeterminismRequirement(rel, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
                    }
                    lookupJoin = (StreamPhysicalLookupJoin)rel;
                    JavaScalaConversionUtil.toJava(lookupJoin.remainingCondition()).ifPresent((Consumer<RexNode>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$visit$6(org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin org.apache.calcite.rex.RexNode ), (Lorg/apache/calcite/rex/RexNode;)V)((StreamNonDeterministicUpdatePlanVisitor)this, (StreamPhysicalLookupJoin)lookupJoin));
                    JavaScalaConversionUtil.toJava(lookupJoin.calcOnTemporalTable()).ifPresent((Consumer<RexProgram>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$visit$7(org.apache.calcite.util.ImmutableBitSet org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin org.apache.calcite.rex.RexProgram ), (Lorg/apache/calcite/rex/RexProgram;)V)((StreamNonDeterministicUpdatePlanVisitor)this, (ImmutableBitSet)requireDeterminism, (StreamPhysicalLookupJoin)lookupJoin));
                    leftFieldCnt = lookupJoin.getInput().getRowType().getFieldCount();
                    requireRight = requireDeterminism.toList().stream().filter((Predicate<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$visit$8(int java.lang.Integer ), (Ljava/lang/Integer;)Z)((int)leftFieldCnt)).collect(Collectors.toList());
                    omitUpsertMaterialize = false;
                    if (!requireRight.isEmpty()) break block50;
                    omitUpsertMaterialize = true;
                    break block51;
                }
                outputPkIdx = lookupJoin.getOutputIndexesOfTemporalTablePrimaryKey();
                outputPkBitSet = ImmutableBitSet.of(outputPkIdx);
                if (!Arrays.stream(outputPkIdx).allMatch((IntPredicate)LambdaMetafactory.metafactory(null, null, null, (I)Z, lambda$visit$9(org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin int ), (I)Z)((StreamPhysicalLookupJoin)lookupJoin))) ** GOTO lbl-1000
                if (requireRight.stream().allMatch((Predicate<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, get(int ), (Ljava/lang/Integer;)Z)((ImmutableBitSet)outputPkBitSet))) {
                    v0 = true;
                } else lbl-1000:
                // 2 sources

                {
                    v0 = false;
                }
                omitUpsertMaterialize = v0;
            }
            requireLeft = requireDeterminism.toList().stream().filter((Predicate<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$visit$10(int java.lang.Integer ), (Ljava/lang/Integer;)Z)((int)leftFieldCnt)).collect(Collectors.toList());
            if (omitUpsertMaterialize) {
                return this.transmitDeterminismRequirement(lookupJoin, ImmutableBitSet.of(requireLeft));
            }
            return this.transmitDeterminismRequirement(lookupJoin.copy(true), ImmutableBitSet.of(requireLeft));
        }
        if (rel instanceof StreamPhysicalTableSourceScan) {
            if (!requireDeterminism.isEmpty()) {
                tableScan = (StreamPhysicalTableSourceScan)rel;
                insertOnly = tableScan.tableSource().getChangelogMode().containsOnly(RowKind.INSERT);
                supportsReadingMetadata = tableScan.tableSource() instanceof SupportsReadingMetadata;
                if (!insertOnly && supportsReadingMetadata) {
                    sourceTable = tableScan.getTable().unwrap(TableSourceTable.class);
                    metadataColumns = DynamicSourceUtils.extractMetadataColumns(sourceTable.contextResolvedTable().getResolvedSchema());
                    metaColumnSet = metadataColumns.stream().map((Function<Column.MetadataColumn, String>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, getName(), (Lorg/apache/flink/table/catalog/Column$MetadataColumn;)Ljava/lang/String;)()).collect(Collectors.toSet());
                    columns = tableScan.getRowType().getFieldNames();
                    metadataCauseErr = new ArrayList<String>();
                    for (index = 0; index < columns.size(); ++index) {
                        column = columns.get(index);
                        if (!metaColumnSet.contains(column) || !requireDeterminism.get(index)) continue;
                        metadataCauseErr.add(column);
                    }
                    if (!metadataCauseErr.isEmpty()) {
                        errorMsg = new StringBuilder();
                        errorMsg.append("The metadata column(s): '").append(String.join((CharSequence)", ", metadataCauseErr.toArray(new String[0]))).append("' in cdc source may cause wrong result or error on").append(" downstream operators, please consider removing these").append(" columns or use a non-cdc source that only has insert").append(" messages.\nsource node:\n").append(FlinkRelOptUtil.toString(tableScan, SqlExplainLevel.DIGEST_ATTRIBUTES, false, true, false, true, false));
                        throw new TableException(errorMsg.toString());
                    }
                }
            }
            return rel;
        }
        if (rel instanceof StreamPhysicalLegacyTableSourceScan || rel instanceof StreamPhysicalDataStreamScan || rel instanceof StreamPhysicalValues) {
            return rel;
        }
        if (rel instanceof StreamPhysicalGroupAggregateBase) {
            groupAgg = (StreamPhysicalGroupAggregateBase)rel;
            if (this.inputInsertOnly(groupAgg)) {
                if (!requireDeterminism.isEmpty()) {
                    this.checkUnsatisfiedDeterminism(requireDeterminism, groupAgg.grouping().length, JavaConverters.seqAsJavaList(groupAgg.aggCalls()), groupAgg.getRowType(), groupAgg);
                }
                return this.transmitDeterminismRequirement(groupAgg, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            return this.transmitDeterminismRequirement(groupAgg, ImmutableBitSet.range(groupAgg.getInput().getRowType().getFieldCount()));
        }
        if (rel instanceof StreamPhysicalWindowAggregateBase) {
            windowAgg = (StreamPhysicalWindowAggregateBase)rel;
            if (this.inputInsertOnly(windowAgg)) {
                if (!requireDeterminism.isEmpty()) {
                    this.checkUnsatisfiedDeterminism(requireDeterminism, windowAgg.grouping().length, JavaConverters.seqAsJavaList(windowAgg.aggCalls()), windowAgg.getRowType(), windowAgg);
                }
                return this.transmitDeterminismRequirement(windowAgg, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            return this.transmitDeterminismRequirement(windowAgg, ImmutableBitSet.range(windowAgg.getInput().getRowType().getFieldCount()));
        }
        if (rel instanceof StreamPhysicalExpand) {
            expand = (StreamPhysicalExpand)rel;
            return this.transmitDeterminismRequirement(expand, requireDeterminism.except(ImmutableBitSet.of(new int[]{expand.expandIdIndex()})));
        }
        if (rel instanceof CommonPhysicalJoin) {
            join = (CommonPhysicalJoin)rel;
            leftRel = (StreamPhysicalRel)join.getLeft();
            rightRel = (StreamPhysicalRel)join.getRight();
            leftInputHasUpdate = this.inputInsertOnly(leftRel) == false;
            rightInputHasUpdate = this.inputInsertOnly(rightRel) == false;
            innerOrSemi = join.joinSpec().getJoinType() == FlinkJoinType.INNER || join.joinSpec().getJoinType() == FlinkJoinType.SEMI;
            ndCall = FlinkRexUtil.getNonDeterministicCallNameInStreaming(join.getCondition());
            if ((leftInputHasUpdate || rightInputHasUpdate || !innerOrSemi) && ndCall.isPresent()) {
                this.throwNonDeterministicConditionError(ndCall.get(), join.getCondition(), (StreamPhysicalRel)join);
            }
            leftFieldCnt = leftRel.getRowType().getFieldCount();
            newLeft = this.visitJoinChild(requireDeterminism, leftRel, leftInputHasUpdate, leftFieldCnt, true, join.joinSpec().getLeftKeys(), JavaConverters.seqAsJavaList(join.getUpsertKeys(leftRel, join.joinSpec().getLeftKeys())));
            newRight = this.visitJoinChild(requireDeterminism, rightRel, rightInputHasUpdate, leftFieldCnt, false, join.joinSpec().getRightKeys(), JavaConverters.seqAsJavaList(join.getUpsertKeys(rightRel, join.joinSpec().getRightKeys())));
            return (StreamPhysicalRel)join.copy(join.getTraitSet(), join.getCondition(), newLeft, newRight, join.getJoinType(), join.isSemiJoin());
        }
        if (rel instanceof StreamPhysicalOverAggregateBase) {
            overAgg = (StreamPhysicalOverAggregateBase)rel;
            if (this.inputInsertOnly(overAgg)) {
                if (!requireDeterminism.isEmpty()) {
                    inputFieldCnt = overAgg.getInput().getRowType().getFieldCount();
                    overSpec = OverAggregateUtil.createOverSpec(overAgg.logicWindow());
                    aggOutputIndex = inputFieldCnt;
                    for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) {
                        this.checkUnsatisfiedDeterminism(requireDeterminism, aggOutputIndex, groupSpec.getAggCalls(), overAgg.getRowType(), overAgg);
                        aggOutputIndex += groupSpec.getAggCalls().size();
                    }
                }
                return this.transmitDeterminismRequirement(overAgg, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            return this.transmitDeterminismRequirement(overAgg, this.mappingRequireDeterminismToInput(requireDeterminism, overAgg));
        }
        if (rel instanceof StreamPhysicalRank) {
            rank = (StreamPhysicalRank)rel;
            if (this.inputInsertOnly(rank)) {
                return this.transmitDeterminismRequirement(rank, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            inputFieldCnt = rank.getInput().getRowType().getFieldCount();
            if (rank.rankStrategy() instanceof RankProcessStrategy.UpdateFastStrategy) {
                bitSetBuilder = ImmutableBitSet.builder();
                rank.partitionKey().toList().forEach((Consumer<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, set(int ), (Ljava/lang/Integer;)V)((ImmutableBitSet.Builder)bitSetBuilder));
                rank.orderKey().getKeys().toIntegerList().forEach((Consumer<Integer>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, set(int ), (Ljava/lang/Integer;)V)((ImmutableBitSet.Builder)bitSetBuilder));
                if (rank.outputRankNumber()) {
                    bitSetBuilder.set(inputFieldCnt);
                }
                return this.transmitDeterminismRequirement(rank, requireDeterminism.except(bitSetBuilder.build()));
            }
            if (rank.rankStrategy() instanceof RankProcessStrategy.RetractStrategy) {
                return this.transmitDeterminismRequirement(rank, ImmutableBitSet.range(inputFieldCnt));
            }
            throw new TableException(String.format("Can not infer the determinism for unsupported rank strategy: %s, this is a bug, please file an issue.", new Object[]{rank.rankStrategy()}));
        }
        if (rel instanceof StreamPhysicalDeduplicate) {
            dedup = (StreamPhysicalDeduplicate)rel;
            if (this.inputInsertOnly(dedup)) {
                return this.transmitDeterminismRequirement(dedup, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            return this.transmitDeterminismRequirement(dedup, requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys())));
        }
        if (rel instanceof StreamPhysicalWindowDeduplicate) {
            winDedup = (StreamPhysicalWindowDeduplicate)rel;
            if (this.inputInsertOnly(winDedup)) {
                return this.transmitDeterminismRequirement(winDedup, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            return this.transmitDeterminismRequirement(winDedup, requireDeterminism.clear(winDedup.orderKey()).union(ImmutableBitSet.of(winDedup.partitionKeys())));
        }
        if (rel instanceof StreamPhysicalWindowRank) {
            winRank = (StreamPhysicalWindowRank)rel;
            if (this.inputInsertOnly(winRank)) {
                return this.transmitDeterminismRequirement(winRank, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            inputFieldCnt = winRank.getInput().getRowType().getFieldCount();
            return this.transmitDeterminismRequirement(winRank, requireDeterminism.intersect(ImmutableBitSet.range(inputFieldCnt)).union(winRank.partitionKey()));
        }
        if (rel instanceof StreamPhysicalWindowTableFunction) {
            winTVF = (StreamPhysicalWindowTableFunction)rel;
            if (this.inputInsertOnly(winTVF)) {
                return this.transmitDeterminismRequirement(winTVF, StreamNonDeterministicUpdatePlanVisitor.NO_REQUIRED_DETERMINISM);
            }
            return this.transmitDeterminismRequirement(winTVF, requireDeterminism.intersect(ImmutableBitSet.range(winTVF.getInput().getRowType().getFieldCount())));
        }
        if (rel instanceof StreamPhysicalChangelogNormalize || rel instanceof StreamPhysicalDropUpdateBefore || rel instanceof StreamPhysicalMiniBatchAssigner || rel instanceof StreamPhysicalUnion || rel instanceof StreamPhysicalSort || rel instanceof StreamPhysicalLimit || rel instanceof StreamPhysicalSortLimit || rel instanceof StreamPhysicalTemporalSort || rel instanceof StreamPhysicalWatermarkAssigner || rel instanceof StreamPhysicalExchange) {
            return this.transmitDeterminismRequirement(rel, requireDeterminism);
        }
        if (rel instanceof StreamPhysicalMatch) {
            throw new TableException("Unsupported to resolve non-deterministic issue in match-recognize.");
        }
        throw new UnsupportedOperationException(String.format("Unsupported to visit node %s, please add the visit implementation if it is a newly added stream physical node.", new Object[]{rel.getClass().getSimpleName()}));
    }

    private boolean inputInsertOnly(StreamPhysicalRel rel) {
        return ChangelogPlanUtils.inputInsertOnly(rel);
    }

    private StreamPhysicalRel transmitDeterminismRequirement(StreamPhysicalRel parent, ImmutableBitSet requireDeterminism) {
        List<RelNode> newChildren = this.visitInputs(parent, requireDeterminism);
        return (StreamPhysicalRel)parent.copy(parent.getTraitSet(), newChildren);
    }

    private List<RelNode> visitInputs(StreamPhysicalRel parent, ImmutableBitSet requireDeterminism) {
        ArrayList<RelNode> newChildren = new ArrayList<RelNode>();
        for (int index = 0; index < parent.getInputs().size(); ++index) {
            StreamPhysicalRel input = (StreamPhysicalRel)parent.getInput(index);
            newChildren.add(this.visit(input, this.requireDeterminismExcludeUpsertKey(input, requireDeterminism)));
        }
        return newChildren;
    }

    private StreamPhysicalRel visitJoinChild(ImmutableBitSet requireDeterminism, StreamPhysicalRel rel, boolean inputHasUpdate, int leftFieldCnt, boolean isLeft, int[] joinKeys, List<int[]> inputUniqueKeys) {
        JoinInputSideSpec joinInputSideSpec = JoinUtil.analyzeJoinInput(ShortcutUtils.unwrapClassLoader(rel), (InternalTypeInfo<RowData>)InternalTypeInfo.of((RowType)FlinkTypeFactory.toLogicalRowType(rel.getRowType())), joinKeys, inputUniqueKeys);
        ImmutableBitSet inputRequireDeterminism = inputHasUpdate ? (joinInputSideSpec.hasUniqueKey() || joinInputSideSpec.joinKeyContainsUniqueKey() ? (isLeft ? ImmutableBitSet.of(requireDeterminism.toList().stream().filter(index -> index < leftFieldCnt).collect(Collectors.toList())) : ImmutableBitSet.of(requireDeterminism.toList().stream().filter(index -> index >= leftFieldCnt).map(index -> index - leftFieldCnt).collect(Collectors.toList()))) : ImmutableBitSet.range(rel.getRowType().getFieldCount())) : NO_REQUIRED_DETERMINISM;
        return this.transmitDeterminismRequirement(rel, inputRequireDeterminism);
    }

    private Map<Integer, Integer> extractSourceMapping(List<RexNode> projects) {
        HashMap<Integer, Integer> mapOutFromInPos = new HashMap<Integer, Integer>();
        for (int index = 0; index < projects.size(); ++index) {
            RexNode expr = projects.get(index);
            if (expr instanceof RexInputRef) {
                mapOutFromInPos.put(index, ((RexInputRef)expr).getIndex());
                continue;
            }
            if (expr instanceof RexCall) {
                RexCall call = (RexCall)expr;
                if (!call.getKind().equals((Object)SqlKind.AS) && !call.getKind().equals((Object)SqlKind.CAST) || !(call.getOperands().get(0) instanceof RexInputRef)) continue;
                RexInputRef ref = (RexInputRef)call.getOperands().get(0);
                mapOutFromInPos.put(index, ref.getIndex());
                continue;
            }
            if (!(expr instanceof RexLiteral)) continue;
            mapOutFromInPos.put(index, -1);
        }
        return mapOutFromInPos;
    }

    private void checkNonDeterministicRexProgram(ImmutableBitSet requireDeterminism, RexProgram program, StreamPhysicalRel relatedRel) {
        if (null != program.getCondition()) {
            RexNode rexNode = program.expandLocalRef(program.getCondition());
            this.checkNonDeterministicCondition(rexNode, relatedRel);
        }
        List projects = program.getProjectList().stream().map(expr -> program.expandLocalRef((RexLocalRef)expr)).collect(Collectors.toList());
        HashMap<Integer, String> nonDeterministicCols = new HashMap<Integer, String>();
        for (int index2 = 0; index2 < projects.size(); ++index2) {
            Optional<String> ndCall = FlinkRexUtil.getNonDeterministicCallNameInStreaming((RexNode)projects.get(index2));
            if (!ndCall.isPresent()) continue;
            nonDeterministicCols.put(index2, ndCall.get());
        }
        List<Integer> unsatisfiedColumns = requireDeterminism.toList().stream().filter(index -> nonDeterministicCols.containsKey(index)).collect(Collectors.toList());
        if (!unsatisfiedColumns.isEmpty()) {
            this.throwNonDeterministicColumnsError(unsatisfiedColumns, relatedRel.getRowType(), relatedRel, nonDeterministicCols, Optional.empty());
        }
    }

    private void checkNonDeterministicCondition(RexNode condition, StreamPhysicalRel relatedRel) {
        Optional<String> ndCall = FlinkRexUtil.getNonDeterministicCallNameInStreaming(condition);
        if (ndCall.isPresent()) {
            this.throwNonDeterministicConditionError(ndCall.get(), condition, relatedRel);
        }
    }

    private void checkUnsatisfiedDeterminism(ImmutableBitSet requireDeterminism, int aggStartIndex, List<AggregateCall> aggCalls, RelDataType rowType, StreamPhysicalRel relatedRel) {
        HashMap<Integer, String> nonDeterministicOutput = new HashMap<Integer, String>();
        int aggOutputIndex = aggStartIndex;
        for (AggregateCall aggCall : aggCalls) {
            if (!aggCall.getAggregation().isDeterministic() || aggCall.getAggregation().isDynamicFunction()) {
                nonDeterministicOutput.put(aggOutputIndex, aggCall.getAggregation().getName());
            }
            ++aggOutputIndex;
        }
        List<Integer> unsatisfiedColumns = requireDeterminism.toList().stream().filter(nonDeterministicOutput::containsKey).collect(Collectors.toList());
        if (!unsatisfiedColumns.isEmpty()) {
            this.throwNonDeterministicColumnsError(unsatisfiedColumns, rowType, relatedRel, nonDeterministicOutput, Optional.empty());
        }
    }

    private void throwNonDeterministicConditionError(String ndCall, RexNode condition, StreamPhysicalRel relatedRel) throws TableException {
        StringBuilder errorMsg = new StringBuilder();
        errorMsg.append(String.format(NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE, ndCall, condition));
        errorMsg.append("\nrelated rel plan:\n").append(FlinkRelOptUtil.toString(relatedRel, SqlExplainLevel.DIGEST_ATTRIBUTES, false, true, false, true, false));
        throw new TableException(errorMsg.toString());
    }

    private void throwNonDeterministicColumnsError(List<Integer> indexes, RelDataType rowType, StreamPhysicalRel relatedRel, Map<Integer, String> ndCallMap, Optional<String> ndCallName) throws TableException {
        StringBuilder errorMsg = new StringBuilder();
        errorMsg.append("The column(s): ");
        int index = 0;
        for (String column : rowType.getFieldNames()) {
            if (indexes.contains(index)) {
                errorMsg.append(column).append("(generated by non-deterministic function: ");
                if (ndCallName.isPresent()) {
                    errorMsg.append(ndCallName.get());
                } else {
                    errorMsg.append(ndCallMap.get(index));
                }
                errorMsg.append(" ) ");
            }
            ++index;
        }
        errorMsg.append("can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.\n");
        errorMsg.append("\nrelated rel plan:\n").append(FlinkRelOptUtil.toString(relatedRel, SqlExplainLevel.DIGEST_ATTRIBUTES, false, true, false, true, false));
        throw new TableException(errorMsg.toString());
    }

    private ImmutableBitSet mappingRequireDeterminismToInput(ImmutableBitSet requireDeterminism, StreamPhysicalOverAggregateBase overAgg) {
        int inputFieldCnt = overAgg.getInput().getRowType().getFieldCount();
        List requireInputIndexes = requireDeterminism.toList().stream().filter(index -> index < inputFieldCnt).collect(Collectors.toList());
        if (requireInputIndexes.size() == inputFieldCnt) {
            return ImmutableBitSet.range(inputFieldCnt);
        }
        HashSet<Integer> allRequiredInputSet = new HashSet<Integer>(requireInputIndexes);
        OverSpec overSpec = OverAggregateUtil.createOverSpec(overAgg.logicWindow());
        Arrays.stream(overSpec.getPartition().getFieldIndices()).forEach(allRequiredInputSet::add);
        overSpec.getGroups().forEach(OverSpec.GroupSpec::getAggCalls);
        int aggOutputIndex = inputFieldCnt;
        for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) {
            for (AggregateCall aggCall : groupSpec.getAggCalls()) {
                if (requireDeterminism.get(aggOutputIndex)) {
                    this.requiredSourceInput(aggCall, allRequiredInputSet);
                }
                ++aggOutputIndex;
            }
        }
        assert (allRequiredInputSet.size() <= inputFieldCnt);
        return ImmutableBitSet.of(new ArrayList<Integer>(allRequiredInputSet));
    }

    private void requiredSourceInput(AggregateCall aggCall, Set<Integer> requiredInputSet) {
        requiredInputSet.addAll(aggCall.getArgList());
        if (aggCall.filterArg > -1) {
            requiredInputSet.add(aggCall.filterArg);
        }
    }

    private ImmutableBitSet requireDeterminismExcludeUpsertKey(StreamPhysicalRel inputRel, ImmutableBitSet requireDeterminism) {
        ImmutableBitSet finalRequireDeterminism;
        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(inputRel.getCluster().getMetadataQuery());
        Set<ImmutableBitSet> inputUpsertKeys = fmq.getUpsertKeys(inputRel);
        if (inputUpsertKeys == null || inputUpsertKeys.isEmpty()) {
            finalRequireDeterminism = requireDeterminism;
        } else if (inputUpsertKeys.stream().anyMatch(uk -> uk.contains(requireDeterminism))) {
            finalRequireDeterminism = NO_REQUIRED_DETERMINISM;
        } else {
            List leftKeys = inputUpsertKeys.stream().map(requireDeterminism::except).collect(Collectors.toList());
            if (leftKeys.isEmpty()) {
                finalRequireDeterminism = NO_REQUIRED_DETERMINISM;
            } else {
                leftKeys.sort(Comparator.comparingInt(ImmutableBitSet::cardinality));
                finalRequireDeterminism = (ImmutableBitSet)leftKeys.get(0);
            }
        }
        return finalRequireDeterminism;
    }

    private static /* synthetic */ boolean lambda$visit$10(int leftFieldCnt, Integer index) {
        return index < leftFieldCnt;
    }

    private static /* synthetic */ boolean lambda$visit$9(StreamPhysicalLookupJoin lookupJoin, int index) {
        return lookupJoin.allLookupKeys().contains((Object)index);
    }

    private static /* synthetic */ boolean lambda$visit$8(int leftFieldCnt, Integer index) {
        return index >= leftFieldCnt;
    }

    private /* synthetic */ void lambda$visit$7(ImmutableBitSet requireDeterminism, StreamPhysicalLookupJoin lookupJoin, RexProgram calc) {
        this.checkNonDeterministicRexProgram(requireDeterminism, calc, lookupJoin);
    }

    private /* synthetic */ void lambda$visit$6(StreamPhysicalLookupJoin lookupJoin, RexNode condi) {
        this.checkNonDeterministicCondition(condi, lookupJoin);
    }

    private static /* synthetic */ boolean lambda$visit$5(int leftFieldCnt, Integer index) {
        return index < leftFieldCnt;
    }

    private static /* synthetic */ boolean lambda$visit$4(int leftFieldCnt, Integer index) {
        return index >= leftFieldCnt;
    }

    private static /* synthetic */ boolean lambda$visit$3(Integer index) {
        return index != -1;
    }

    private static /* synthetic */ Integer lambda$visit$2(Map outFromSourcePos, StreamPhysicalCalcBase calc, Integer out) {
        return (Integer)Optional.ofNullable(outFromSourcePos.get(out)).orElseThrow(() -> new TableException(String.format("Invalid pos:%d over projection:%s", out, calc.getProgram())));
    }

    private static /* synthetic */ RexNode lambda$visit$0(StreamPhysicalCalcBase calc, RexLocalRef expr) {
        return calc.getProgram().expandLocalRef(expr);
    }
}

