/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.FunctionMasterCheckpointHookFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphHasher;
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
import org.apache.flink.streaming.api.graph.StreamGraphUserHashHasher;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private final StreamGraph streamGraph;
    private final Map<Integer, JobVertex> jobVertices;
    private final JobGraph jobGraph;
    private final Collection<Integer> builtVertices;
    private final List<StreamEdge> physicalEdgesInOrder;
    private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
    private final Map<Integer, StreamConfig> vertexConfigs;
    private final Map<Integer, String> chainedNames;
    private final Map<Integer, ResourceSpec> chainedMinResources;
    private final Map<Integer, ResourceSpec> chainedPreferredResources;
    private final Map<Integer, InputOutputFormatContainer> chainedInputOutputFormats;
    private final StreamGraphHasher defaultStreamGraphHasher;
    private final List<StreamGraphHasher> legacyStreamGraphHashers;

    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return StreamingJobGraphGenerator.createJobGraph(streamGraph, null);
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

    private StreamingJobGraphGenerator(StreamGraph streamGraph) {
        this(streamGraph, null);
    }

    private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
        this.streamGraph = streamGraph;
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
        this.jobVertices = new HashMap<Integer, JobVertex>();
        this.builtVertices = new HashSet<Integer>();
        this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
        this.vertexConfigs = new HashMap<Integer, StreamConfig>();
        this.chainedNames = new HashMap<Integer, String>();
        this.chainedMinResources = new HashMap<Integer, ResourceSpec>();
        this.chainedPreferredResources = new HashMap<Integer, ResourceSpec>();
        this.chainedInputOutputFormats = new HashMap<Integer, InputOutputFormatContainer>();
        this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
        this.jobGraph = new JobGraph(jobID, streamGraph.getJobName());
    }

    private JobGraph createJobGraph() {
        this.jobGraph.setScheduleMode(this.streamGraph.getScheduleMode());
        Map<Integer, byte[]> hashes = this.defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(this.streamGraph);
        ArrayList<Map<Integer, byte[]>> legacyHashes = new ArrayList<Map<Integer, byte[]>>(this.legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : this.legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(this.streamGraph));
        }
        HashMap<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<Integer, List<Tuple2<byte[], byte[]>>>();
        this.setChaining(hashes, legacyHashes, chainedOperatorHashes);
        this.setPhysicalEdges();
        this.setSlotSharingAndCoLocation();
        this.configureCheckpointing();
        JobGraphGenerator.addUserArtifactEntries(this.streamGraph.getUserArtifacts(), (JobGraph)this.jobGraph);
        try {
            this.jobGraph.setExecutionConfig(this.streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
        }
        return this.jobGraph;
    }

    private void setPhysicalEdges() {
        HashMap<Integer, List> physicalInEdgesInOrder = new HashMap<Integer, List>();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            int target = streamEdge.getTargetId();
            List inEdges = physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList());
            inEdges.add(streamEdge);
        }
        for (Map.Entry entry : physicalInEdgesInOrder.entrySet()) {
            int vertex = (Integer)entry.getKey();
            List edgeList = (List)entry.getValue();
            this.vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
        }
    }

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
        for (Integer sourceNodeId : this.streamGraph.getSourceIDs()) {
            this.createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
        }
    }

    private List<StreamEdge> createChain(Integer startNodeId, Integer currentNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, int chainIndex, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
        if (!this.builtVertices.contains(startNodeId)) {
            ArrayList<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            StreamNode currentNode = this.streamGraph.getStreamNode(currentNodeId);
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (StreamingJobGraphGenerator.isChainable(outEdge, this.streamGraph)) {
                    chainableOutputs.add(outEdge);
                    continue;
                }
                nonChainableOutputs.add(outEdge);
            }
            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(this.createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
            }
            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                this.createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
            }
            List operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList());
            byte[] primaryHashBytes = hashes.get(currentNodeId);
            OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
            for (Map<Integer, byte[]> map : legacyHashes) {
                operatorHashes.add(new Tuple2((Object)primaryHashBytes, (Object)map.get(currentNodeId)));
            }
            this.chainedNames.put(currentNodeId, this.createChainedName(currentNodeId, chainableOutputs));
            this.chainedMinResources.put(currentNodeId, this.createChainedMinResources(currentNodeId, chainableOutputs));
            this.chainedPreferredResources.put(currentNodeId, this.createChainedPreferredResources(currentNodeId, chainableOutputs));
            if (currentNode.getInputFormat() != null) {
                this.getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
            if (currentNode.getOutputFormat() != null) {
                this.getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }
            StreamConfig config = currentNodeId.equals(startNodeId) ? this.createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration());
            this.setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
            if (currentNodeId.equals(startNodeId)) {
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(this.streamGraph.getStreamNode(currentNodeId).getOperatorName());
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(this.streamGraph.getStreamNode(currentNodeId).getOutEdges());
                for (StreamEdge edge : transitiveOutEdges) {
                    this.connect(startNodeId, edge);
                }
                config.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(startNodeId));
            } else {
                this.chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap());
                config.setChainIndex(chainIndex);
                StreamNode streamNode = this.streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(streamNode.getOperatorName());
                this.chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }
            config.setOperatorID(currentOperatorId);
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            return transitiveOutEdges;
        }
        return new ArrayList<StreamEdge>();
    }

    private InputOutputFormatContainer getOrCreateFormatContainer(Integer startNodeId) {
        return this.chainedInputOutputFormats.computeIfAbsent(startNodeId, k -> new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()));
    }

    private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
        String operatorName = this.streamGraph.getStreamNode(vertexID).getOperatorName();
        if (chainedOutputs.size() > 1) {
            ArrayList<String> outputChainedNames = new ArrayList<String>();
            for (StreamEdge chainable : chainedOutputs) {
                outputChainedNames.add(this.chainedNames.get(chainable.getTargetId()));
            }
            return operatorName + " -> (" + StringUtils.join(outputChainedNames, (String)", ") + ")";
        }
        if (chainedOutputs.size() == 1) {
            return operatorName + " -> " + this.chainedNames.get(chainedOutputs.get(0).getTargetId());
        }
        return operatorName;
    }

    private ResourceSpec createChainedMinResources(Integer vertexID, List<StreamEdge> chainedOutputs) {
        ResourceSpec minResources = this.streamGraph.getStreamNode(vertexID).getMinResources();
        for (StreamEdge chainable : chainedOutputs) {
            minResources = minResources.merge(this.chainedMinResources.get(chainable.getTargetId()));
        }
        return minResources;
    }

    private ResourceSpec createChainedPreferredResources(Integer vertexID, List<StreamEdge> chainedOutputs) {
        ResourceSpec preferredResources = this.streamGraph.getStreamNode(vertexID).getPreferredResources();
        for (StreamEdge chainable : chainedOutputs) {
            preferredResources = preferredResources.merge(this.chainedPreferredResources.get(chainable.getTargetId()));
        }
        return preferredResources;
    }

    private StreamConfig createJobVertex(Integer streamNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
        InputOutputFormatVertex jobVertex;
        StreamNode streamNode = this.streamGraph.getStreamNode(streamNodeId);
        byte[] hash = hashes.get(streamNodeId);
        if (hash == null) {
            throw new IllegalStateException("Cannot find node hash. Did you generate them before calling this method?");
        }
        JobVertexID jobVertexId = new JobVertexID(hash);
        ArrayList<JobVertexID> legacyJobVertexIds = new ArrayList<JobVertexID>(legacyHashes.size());
        for (Map<Integer, byte[]> legacyHash : legacyHashes) {
            hash = legacyHash.get(streamNodeId);
            if (null == hash) continue;
            legacyJobVertexIds.add(new JobVertexID(hash));
        }
        List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
        ArrayList<OperatorID> chainedOperatorVertexIds = new ArrayList<OperatorID>();
        ArrayList<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<OperatorID>();
        if (chainedOperators != null) {
            for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
                chainedOperatorVertexIds.add(new OperatorID((byte[])chainedOperator.f0));
                userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID((byte[])chainedOperator.f1) : null);
            }
        }
        if (this.chainedInputOutputFormats.containsKey(streamNodeId)) {
            jobVertex = new InputOutputFormatVertex(this.chainedNames.get(streamNodeId), jobVertexId, legacyJobVertexIds, chainedOperatorVertexIds, userDefinedChainedOperatorVertexIds);
            this.chainedInputOutputFormats.get(streamNodeId).write(new TaskConfig(jobVertex.getConfiguration()));
        } else {
            jobVertex = new JobVertex(this.chainedNames.get(streamNodeId), jobVertexId, legacyJobVertexIds, chainedOperatorVertexIds, userDefinedChainedOperatorVertexIds);
        }
        jobVertex.setResources(this.chainedMinResources.get(streamNodeId), this.chainedPreferredResources.get(streamNodeId));
        jobVertex.setInvokableClass(streamNode.getJobVertexClass());
        int parallelism = streamNode.getParallelism();
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        } else {
            parallelism = jobVertex.getParallelism();
        }
        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", (Object)parallelism, (Object)streamNodeId);
        }
        jobVertex.setInputDependencyConstraint(this.streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
        this.jobVertices.put(streamNodeId, (JobVertex)jobVertex);
        this.builtVertices.add(streamNodeId);
        this.jobGraph.addVertex((JobVertex)jobVertex);
        return new StreamConfig(jobVertex.getConfiguration());
    }

    private void setVertexConfig(Integer vertexID, StreamConfig config, List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
        StreamNode vertex = this.streamGraph.getStreamNode(vertexID);
        config.setVertexID(vertexID);
        config.setBufferTimeout(vertex.getBufferTimeout());
        config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
        config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
        config.setTypeSerializerOut(vertex.getTypeSerializerOut());
        for (StreamEdge edge : chainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
        }
        for (StreamEdge edge : nonChainableOutputs) {
            if (edge.getOutputTag() == null) continue;
            config.setTypeSerializerSideOut(edge.getOutputTag(), edge.getOutputTag().getTypeInfo().createSerializer(this.streamGraph.getExecutionConfig()));
        }
        config.setStreamOperatorFactory(vertex.getOperatorFactory());
        config.setOutputSelectors(vertex.getOutputSelectors());
        config.setNumberOfOutputs(nonChainableOutputs.size());
        config.setNonChainedOutputs(nonChainableOutputs);
        config.setChainedOutputs(chainableOutputs);
        config.setTimeCharacteristic(this.streamGraph.getTimeCharacteristic());
        CheckpointConfig checkpointCfg = this.streamGraph.getCheckpointConfig();
        config.setStateBackend(this.streamGraph.getStateBackend());
        config.setCheckpointingEnabled(checkpointCfg.isCheckpointingEnabled());
        if (checkpointCfg.isCheckpointingEnabled()) {
            config.setCheckpointMode(checkpointCfg.getCheckpointingMode());
        } else {
            config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
        }
        config.setStatePartitioner(0, vertex.getStatePartitioner1());
        config.setStatePartitioner(1, vertex.getStatePartitioner2());
        config.setStateKeySerializer(vertex.getStateKeySerializer());
        Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
        if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) {
            config.setIterationId(this.streamGraph.getBrokerID(vertexID));
            config.setIterationWaitTime(this.streamGraph.getLoopTimeout(vertexID));
        }
        this.vertexConfigs.put(vertexID, config);
    }

    private void connect(Integer headOfChain, StreamEdge edge) {
        ResultPartitionType resultPartitionType;
        this.physicalEdgesInOrder.add(edge);
        Integer downStreamvertexID = edge.getTargetId();
        JobVertex headVertex = this.jobVertices.get(headOfChain);
        JobVertex downStreamVertex = this.jobVertices.get(downStreamvertexID);
        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
        StreamPartitioner<?> partitioner = edge.getPartitioner();
        switch (edge.getShuffleMode()) {
            case PIPELINED: {
                resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                break;
            }
            case BATCH: {
                resultPartitionType = ResultPartitionType.BLOCKING;
                break;
            }
            case UNDEFINED: {
                resultPartitionType = this.streamGraph.isBlockingConnectionsBetweenChains() ? ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED;
                break;
            }
            default: {
                throw new UnsupportedOperationException("Data exchange mode " + (Object)((Object)edge.getShuffleMode()) + " is not supported yet.");
            }
        }
        JobEdge jobEdge = partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner ? downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, resultPartitionType) : downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        jobEdge.setShipStrategyName(partitioner.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), headOfChain, downStreamvertexID});
        }
    }

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
        StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
        return downStreamVertex.getInEdges().size() == 1 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && edge.getPartitioner() instanceof ForwardPartitioner && edge.getShuffleMode() != ShuffleMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled();
    }

    private void setSlotSharingAndCoLocation() {
        HashMap<String, SlotSharingGroup> slotSharingGroups = new HashMap<String, SlotSharingGroup>();
        HashMap<String, Tuple2> coLocationGroups = new HashMap<String, Tuple2>();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            String coLocationGroupKey;
            SlotSharingGroup sharingGroup;
            StreamNode node = this.streamGraph.getStreamNode(entry.getKey());
            JobVertex vertex = entry.getValue();
            String slotSharingGroupKey = node.getSlotSharingGroup();
            if (slotSharingGroupKey != null) {
                sharingGroup = slotSharingGroups.computeIfAbsent(slotSharingGroupKey, k -> new SlotSharingGroup());
                vertex.setSlotSharingGroup(sharingGroup);
            } else {
                sharingGroup = null;
            }
            if ((coLocationGroupKey = node.getCoLocationGroup()) == null) continue;
            if (sharingGroup == null) {
                throw new IllegalStateException("Cannot use a co-location constraint without a slot sharing group");
            }
            Tuple2 constraint = coLocationGroups.computeIfAbsent(coLocationGroupKey, k -> new Tuple2((Object)sharingGroup, (Object)new CoLocationGroup()));
            if (constraint.f0 != sharingGroup) {
                throw new IllegalStateException("Cannot co-locate operators from different slot sharing groups");
            }
            vertex.updateCoLocationGroup((CoLocationGroup)constraint.f1);
            ((CoLocationGroup)constraint.f1).addVertex(vertex);
        }
    }

    private void configureCheckpointing() {
        SerializedValue serializedStateBackend;
        SerializedValue serializedHooks;
        boolean isExactlyOnce;
        CheckpointRetentionPolicy retentionAfterTermination;
        CheckpointConfig cfg = this.streamGraph.getCheckpointConfig();
        long interval = cfg.getCheckpointInterval();
        if (interval < 10L) {
            interval = Long.MAX_VALUE;
        }
        ArrayList<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
        ArrayList<JobVertexID> ackVertices = new ArrayList<JobVertexID>(this.jobVertices.size());
        ArrayList<JobVertexID> commitVertices = new ArrayList<JobVertexID>(this.jobVertices.size());
        for (JobVertex vertex : this.jobVertices.values()) {
            if (vertex.isInputVertex()) {
                triggerVertices.add(vertex.getID());
            }
            commitVertices.add(vertex.getID());
            ackVertices.add(vertex.getID());
        }
        if (cfg.isExternalizedCheckpointsEnabled()) {
            CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
            if (cleanup == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            retentionAfterTermination = cleanup.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        CheckpointingMode mode = cfg.getCheckpointingMode();
        if (mode == CheckpointingMode.EXACTLY_ONCE) {
            isExactlyOnce = true;
        } else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
            isExactlyOnce = false;
        } else {
            throw new IllegalStateException("Unexpected checkpointing mode. Did not expect there to be another checkpointing mode besides exactly-once or at-least-once.");
        }
        ArrayList<FunctionMasterCheckpointHookFactory> hooks = new ArrayList<FunctionMasterCheckpointHookFactory>();
        for (StreamNode node : this.streamGraph.getStreamNodes()) {
            Function f;
            if (!(node.getOperatorFactory() instanceof UdfStreamOperatorFactory) || !((f = ((UdfStreamOperatorFactory)node.getOperatorFactory()).getUserFunction()) instanceof WithMasterCheckpointHook)) continue;
            hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook)f));
        }
        if (hooks.isEmpty()) {
            serializedHooks = null;
        } else {
            try {
                MasterTriggerRestoreHook.Factory[] asArray = hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
                serializedHooks = new SerializedValue((Object)asArray);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", (Throwable)e);
            }
        }
        if (this.streamGraph.getStateBackend() == null) {
            serializedStateBackend = null;
        } else {
            try {
                serializedStateBackend = new SerializedValue((Object)this.streamGraph.getStateBackend());
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("State backend is not serializable", (Throwable)e);
            }
        }
        JobCheckpointingSettings settings = new JobCheckpointingSettings(triggerVertices, ackVertices, commitVertices, new CheckpointCoordinatorConfiguration(interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), retentionAfterTermination, isExactlyOnce, cfg.isPreferCheckpointForRecovery(), cfg.getTolerableCheckpointFailureNumber()), serializedStateBackend, serializedHooks);
        this.jobGraph.setSnapshotSettings(settings);
    }
}

