/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

public class Execution
implements AccessExecution,
Archiveable<ArchivedExecution>,
LogicalSlot.Payload {
    private static final Logger LOG = DefaultExecutionGraph.LOG;
    private static final int NUM_CANCEL_CALL_TRIES = 3;
    private final Executor executor;
    private final ExecutionVertex vertex;
    private final ExecutionAttemptID attemptId;
    private final long[] stateTimestamps;
    private final int attemptNumber;
    private final Time rpcTimeout;
    private final Collection<PartitionInfo> partitionInfos;
    private final CompletableFuture<ExecutionState> terminalStateFuture;
    private final CompletableFuture<?> releaseFuture;
    private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
    private final CompletableFuture<?> initializingOrRunningFuture;
    private volatile ExecutionState state = ExecutionState.CREATED;
    private LogicalSlot assignedResource;
    private Optional<ErrorInfo> failureCause = Optional.empty();
    @Nullable
    private JobManagerTaskRestore taskRestore;
    @Nullable
    private AllocationID assignedAllocationID;
    private final Object accumulatorLock = new Object();
    private Map<String, Accumulator<?, ?>> userAccumulators;
    private IOMetrics ioMetrics;
    private Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> producedPartitions;

    public Execution(Executor executor, ExecutionVertex vertex, int attemptNumber, long startTimestamp, Time rpcTimeout) {
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.vertex = (ExecutionVertex)Preconditions.checkNotNull((Object)vertex);
        this.attemptId = new ExecutionAttemptID();
        this.rpcTimeout = (Time)Preconditions.checkNotNull((Object)rpcTimeout);
        this.attemptNumber = attemptNumber;
        this.stateTimestamps = new long[ExecutionState.values().length];
        this.markTimestamp(ExecutionState.CREATED, startTimestamp);
        this.partitionInfos = new ArrayList<PartitionInfo>(16);
        this.producedPartitions = Collections.emptyMap();
        this.terminalStateFuture = new CompletableFuture();
        this.releaseFuture = new CompletableFuture();
        this.taskManagerLocationFuture = new CompletableFuture();
        this.initializingOrRunningFuture = new CompletableFuture();
        this.assignedResource = null;
    }

    public ExecutionVertex getVertex() {
        return this.vertex;
    }

    @Override
    public ExecutionAttemptID getAttemptId() {
        return this.attemptId;
    }

    @Override
    public int getAttemptNumber() {
        return this.attemptNumber;
    }

    @Override
    public ExecutionState getState() {
        return this.state;
    }

    @Nullable
    public AllocationID getAssignedAllocationID() {
        return this.assignedAllocationID;
    }

    public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
        return this.taskManagerLocationFuture;
    }

    public LogicalSlot getAssignedResource() {
        return this.assignedResource;
    }

    public Optional<ResultPartitionDeploymentDescriptor> getResultPartitionDeploymentDescriptor(IntermediateResultPartitionID id) {
        return Optional.ofNullable(this.producedPartitions.get(id));
    }

    public boolean tryAssignResource(LogicalSlot logicalSlot) {
        this.assertRunningInJobMasterMainThread();
        Preconditions.checkNotNull((Object)logicalSlot);
        if (this.state == ExecutionState.SCHEDULED || this.state == ExecutionState.CREATED) {
            if (this.assignedResource == null) {
                this.assignedResource = logicalSlot;
                if (logicalSlot.tryAssignPayload(this)) {
                    if (!(this.state != ExecutionState.SCHEDULED && this.state != ExecutionState.CREATED || this.taskManagerLocationFuture.isDone())) {
                        this.taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
                        this.assignedAllocationID = logicalSlot.getAllocationId();
                        this.getVertex().setLatestPriorSlotAllocation(this.assignedResource.getTaskManagerLocation(), logicalSlot.getAllocationId());
                        return true;
                    }
                    this.assignedResource = null;
                    return false;
                }
                this.assignedResource = null;
                return false;
            }
            return false;
        }
        return false;
    }

    public InputSplit getNextInputSplit() {
        LogicalSlot slot = this.getAssignedResource();
        String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
        return this.vertex.getNextInputSplit(host);
    }

    @Override
    public TaskManagerLocation getAssignedResourceLocation() {
        LogicalSlot currentAssignedResource = this.assignedResource;
        return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null;
    }

    @Override
    public Optional<ErrorInfo> getFailureInfo() {
        return this.failureCause;
    }

    @Override
    public long[] getStateTimestamps() {
        return this.stateTimestamps;
    }

    @Override
    public long getStateTimestamp(ExecutionState state) {
        return this.stateTimestamps[state.ordinal()];
    }

    public boolean isFinished() {
        return this.state.isTerminal();
    }

    @Nullable
    public JobManagerTaskRestore getTaskRestore() {
        return this.taskRestore;
    }

    public void setInitialState(JobManagerTaskRestore taskRestore) {
        this.taskRestore = taskRestore;
    }

    public CompletableFuture<?> getInitializingOrRunningFuture() {
        return this.initializingOrRunningFuture;
    }

    public CompletableFuture<ExecutionState> getTerminalStateFuture() {
        return this.terminalStateFuture;
    }

    public CompletableFuture<?> getReleaseFuture() {
        return this.releaseFuture;
    }

    public CompletableFuture<Void> registerProducedPartitions(TaskManagerLocation location, boolean notifyPartitionDataAvailable) {
        this.assertRunningInJobMasterMainThread();
        return FutureUtils.thenApplyAsyncIfNotDone(Execution.registerProducedPartitions(this.vertex, location, this.attemptId, notifyPartitionDataAvailable), (Executor)this.vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor(), producedPartitionsCache -> {
            this.producedPartitions = producedPartitionsCache;
            if (this.getState() == ExecutionState.SCHEDULED) {
                this.startTrackingPartitions(location.getResourceID(), producedPartitionsCache.values());
            } else {
                LOG.info("Discarding late registered partitions for {} task {}.", (Object)this.getState(), (Object)this.attemptId);
                for (ResultPartitionDeploymentDescriptor desc : producedPartitionsCache.values()) {
                    this.getVertex().getExecutionGraphAccessor().getShuffleMaster().releasePartitionExternally(desc.getShuffleDescriptor());
                }
            }
            return null;
        });
    }

    @VisibleForTesting
    static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>> registerProducedPartitions(ExecutionVertex vertex, TaskManagerLocation location, ExecutionAttemptID attemptId, boolean notifyPartitionDataAvailable) {
        ProducerDescriptor producerDescriptor = ProducerDescriptor.create(location, attemptId);
        Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values();
        ArrayList<CompletionStage> partitionRegistrations = new ArrayList<CompletionStage>(partitions.size());
        for (IntermediateResultPartition partition : partitions) {
            PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
            int maxParallelism = Execution.getPartitionMaxParallelism(partition);
            CompletableFuture<?> shuffleDescriptorFuture = vertex.getExecutionGraphAccessor().getShuffleMaster().registerPartitionWithProducer(vertex.getJobId(), partitionDescriptor, producerDescriptor);
            CompletionStage partitionRegistration = shuffleDescriptorFuture.thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor(partitionDescriptor, (ShuffleDescriptor)shuffleDescriptor, maxParallelism, notifyPartitionDataAvailable));
            partitionRegistrations.add(partitionRegistration);
        }
        return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> {
            LinkedHashMap producedPartitions = new LinkedHashMap(partitions.size());
            rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd));
            return producedPartitions;
        });
    }

    private static int getPartitionMaxParallelism(IntermediateResultPartition partition) {
        return partition.getIntermediateResult().getConsumerExecutionJobVertex().getMaxParallelism();
    }

    public void deploy() throws JobException {
        this.assertRunningInJobMasterMainThread();
        LogicalSlot slot = this.assignedResource;
        Preconditions.checkNotNull((Object)slot, (String)"In order to deploy the execution we first have to assign a resource via tryAssignResource.");
        if (!slot.isAlive()) {
            throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
        }
        ExecutionState previous = this.state;
        if (previous == ExecutionState.SCHEDULED) {
            if (!this.transitionState(previous, ExecutionState.DEPLOYING)) {
                throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
            }
        } else {
            throw new IllegalStateException("The vertex must be in SCHEDULED state to be deployed. Found state " + (Object)((Object)previous));
        }
        if (this != slot.getPayload()) {
            throw new IllegalStateException(String.format("The execution %s has not been assigned to the assigned slot.", this));
        }
        try {
            if (this.state != ExecutionState.DEPLOYING) {
                slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + (Object)((Object)this.state) + ") does not match expected state DEPLOYING."));
                return;
            }
            LOG.info("Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}", new Object[]{this.vertex.getTaskNameWithSubtaskIndex(), this.attemptNumber, this.vertex.getCurrentExecutionAttempt().getAttemptId(), this.vertex.getID(), this.getAssignedResourceLocation(), slot.getAllocationId()});
            TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory.fromExecutionVertex(this.vertex, this.attemptNumber).createDeploymentDescriptor(slot.getAllocationId(), this.taskRestore, this.producedPartitions.values());
            this.taskRestore = null;
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            ComponentMainThreadExecutor jobMasterMainThreadExecutor = this.vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
            this.getVertex().notifyPendingDeployment(this);
            ((CompletableFuture)CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, this.rpcTimeout), this.executor).thenCompose(Function.identity())).whenCompleteAsync((ack, failure) -> {
                if (failure == null) {
                    this.vertex.notifyCompletedDeployment(this);
                } else {
                    Throwable actualFailure = ExceptionUtils.stripCompletionException((Throwable)failure);
                    if (actualFailure instanceof TimeoutException) {
                        String taskname = this.vertex.getTaskNameWithSubtaskIndex() + " (" + this.attemptId + ')';
                        this.markFailed(new Exception("Cannot deploy task " + taskname + " - TaskManager (" + this.getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + this.rpcTimeout, actualFailure));
                    } else {
                        this.markFailed(actualFailure);
                    }
                }
            }, (Executor)jobMasterMainThreadExecutor);
        }
        catch (Throwable t) {
            this.markFailed(t);
        }
    }

    public void cancel() {
        ExecutionState current;
        block5: {
            this.assertRunningInJobMasterMainThread();
            while (true) {
                if ((current = this.state) == ExecutionState.CANCELING || current == ExecutionState.CANCELED) {
                    return;
                }
                if (current == ExecutionState.INITIALIZING || current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                    if (!this.startCancelling(3)) continue;
                    return;
                }
                if (current == ExecutionState.FINISHED) {
                    this.sendReleaseIntermediateResultPartitionsRpcCall();
                    return;
                }
                if (current == ExecutionState.FAILED) {
                    return;
                }
                if (current != ExecutionState.CREATED && current != ExecutionState.SCHEDULED) break block5;
                if (this.cancelAtomically()) break;
            }
            return;
        }
        throw new IllegalStateException(current.name());
    }

    public CompletableFuture<?> suspend() {
        switch (this.state) {
            case RUNNING: 
            case INITIALIZING: 
            case DEPLOYING: 
            case CREATED: 
            case SCHEDULED: {
                if (this.cancelAtomically()) break;
                throw new IllegalStateException(String.format("Could not directly go to %s from %s.", ExecutionState.CANCELED.name(), this.state.name()));
            }
            case CANCELING: {
                this.completeCancelling();
                break;
            }
            case FINISHED: {
                this.sendReleaseIntermediateResultPartitionsRpcCall();
                break;
            }
            case FAILED: 
            case CANCELED: {
                break;
            }
            default: {
                throw new IllegalStateException(this.state.name());
            }
        }
        return this.releaseFuture;
    }

    private void updatePartitionConsumers(IntermediateResultPartition partition) {
        Optional<ConsumerVertexGroup> consumerVertexGroup = partition.getConsumerVertexGroupOptional();
        if (!consumerVertexGroup.isPresent()) {
            return;
        }
        for (ExecutionVertexID consumerVertexId : consumerVertexGroup.get()) {
            ExecutionVertex consumerVertex = this.vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
            Execution consumer = consumerVertex.getCurrentExecutionAttempt();
            ExecutionState consumerState = consumer.getState();
            if (consumerState != ExecutionState.DEPLOYING && consumerState != ExecutionState.RUNNING && consumerState != ExecutionState.INITIALIZING) continue;
            PartitionInfo partitionInfo = Execution.createPartitionInfo(partition);
            if (consumerState == ExecutionState.DEPLOYING) {
                consumerVertex.cachePartitionInfo(partitionInfo);
                continue;
            }
            consumer.sendUpdatePartitionInfoRpcCall(Collections.singleton(partitionInfo));
        }
    }

    private static PartitionInfo createPartitionInfo(IntermediateResultPartition consumedPartition) {
        IntermediateDataSetID intermediateDataSetID = consumedPartition.getIntermediateResult().getId();
        ShuffleDescriptor shuffleDescriptor = TaskDeploymentDescriptorFactory.getConsumedPartitionShuffleDescriptor(consumedPartition, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN);
        return new PartitionInfo(intermediateDataSetID, shuffleDescriptor);
    }

    @Override
    public void fail(Throwable t) {
        this.processFail(t, true);
    }

    public void notifyCheckpointOnComplete(long completedCheckpointId, long completedTimestamp, long lastSubsumedCheckpointId) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.notifyCheckpointOnComplete(this.attemptId, this.getVertex().getJobId(), completedCheckpointId, completedTimestamp, lastSubsumedCheckpointId);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    public void notifyCheckpointAborted(long abortCheckpointId, long latestCompletedCheckpointId, long timestamp) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.notifyCheckpointAborted(this.attemptId, this.getVertex().getJobId(), abortCheckpointId, latestCompletedCheckpointId, timestamp);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    public CompletableFuture<Acknowledge> triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        return this.triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
    }

    public CompletableFuture<Acknowledge> triggerSynchronousSavepoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        return this.triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
    }

    private CompletableFuture<Acknowledge> triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            return taskManagerGateway.triggerCheckpoint(this.attemptId, this.getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        }
        LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    public CompletableFuture<Acknowledge> sendOperatorEvent(OperatorID operatorId, SerializedValue<OperatorEvent> event) {
        this.assertRunningInJobMasterMainThread();
        LogicalSlot slot = this.assignedResource;
        if (slot != null && (this.getState() == ExecutionState.RUNNING || this.getState() == ExecutionState.INITIALIZING)) {
            TaskManagerGateway eventGateway = slot.getTaskManagerGateway();
            return eventGateway.sendOperatorEventToTask(this.getAttemptId(), operatorId, event);
        }
        return FutureUtils.completedExceptionally((Throwable)((Object)new TaskNotRunningException('\"' + this.vertex.getTaskNameWithSubtaskIndex() + "\" is not running, but in state " + (Object)((Object)this.getState()))));
    }

    void markFailed(Throwable t) {
        this.processFail(t, false);
    }

    void markFailed(Throwable t, boolean cancelTask, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions, boolean fromSchedulerNg) {
        this.processFail(t, cancelTask, userAccumulators, metrics, releasePartitions, fromSchedulerNg);
    }

    @VisibleForTesting
    public void markFinished() {
        this.markFinished(null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        ExecutionState current;
        this.assertRunningInJobMasterMainThread();
        while ((current = this.state) == ExecutionState.INITIALIZING || current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
            if (!this.transitionState(current, ExecutionState.FINISHED)) continue;
            try {
                this.finishPartitionsAndUpdateConsumers();
                this.updateAccumulatorsAndMetrics(userAccumulators, metrics);
                this.releaseAssignedResource(null);
                this.vertex.getExecutionGraphAccessor().deregisterExecution(this);
            }
            finally {
                this.vertex.executionFinished(this);
            }
            return;
        }
        if (current == ExecutionState.CANCELING) {
            this.completeCancelling(userAccumulators, metrics, true);
            return;
        }
        if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Task FINISHED, but concurrently went to state " + (Object)((Object)this.state));
            }
            return;
        }
        this.markFailed(new Exception("Vertex received FINISHED message while being in state " + (Object)((Object)this.state)));
    }

    private void finishPartitionsAndUpdateConsumers() {
        List<IntermediateResultPartition> finishedPartitions = this.getVertex().finishAllBlockingPartitions();
        for (IntermediateResultPartition partition : finishedPartitions) {
            this.updatePartitionConsumers(partition);
        }
    }

    private boolean cancelAtomically() {
        if (this.startCancelling(0)) {
            this.completeCancelling();
            return true;
        }
        return false;
    }

    private boolean startCancelling(int numberCancelRetries) {
        if (this.transitionState(this.state, ExecutionState.CANCELING)) {
            this.taskManagerLocationFuture.cancel(false);
            this.sendCancelRpcCall(numberCancelRetries);
            return true;
        }
        return false;
    }

    void completeCancelling() {
        this.completeCancelling(null, null, true);
    }

    void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) {
        ExecutionState current;
        block3: {
            do {
                if ((current = this.state) == ExecutionState.CANCELED) {
                    return;
                }
                if (current != ExecutionState.CANCELING && current != ExecutionState.RUNNING && current != ExecutionState.INITIALIZING && current != ExecutionState.DEPLOYING) break block3;
                this.updateAccumulatorsAndMetrics(userAccumulators, metrics);
            } while (!this.transitionState(current, ExecutionState.CANCELED));
            this.finishCancellation(releasePartitions);
            return;
        }
        if (current != ExecutionState.FAILED) {
            String message = String.format("Asynchronous race: Found %s in state %s after successful cancel call.", new Object[]{this.vertex.getTaskNameWithSubtaskIndex(), this.state});
            LOG.error(message);
            this.vertex.getExecutionGraphAccessor().failGlobal(new Exception(message));
        }
    }

    private void finishCancellation(boolean releasePartitions) {
        this.releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
        this.vertex.getExecutionGraphAccessor().deregisterExecution(this);
        this.handlePartitionCleanup(releasePartitions, releasePartitions);
    }

    void cachePartitionInfo(PartitionInfo partitionInfo) {
        this.partitionInfos.add(partitionInfo);
    }

    private void sendPartitionInfos() {
        if (!this.partitionInfos.isEmpty()) {
            this.sendUpdatePartitionInfoRpcCall(new ArrayList<PartitionInfo>(this.partitionInfos));
            this.partitionInfos.clear();
        }
    }

    private void processFail(Throwable t, boolean cancelTask) {
        this.processFail(t, cancelTask, null, null, true, false);
    }

    private void processFail(Throwable t, boolean cancelTask, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions, boolean fromSchedulerNg) {
        this.assertRunningInJobMasterMainThread();
        ExecutionState current = this.state;
        if (current == ExecutionState.FAILED) {
            return;
        }
        if (current == ExecutionState.CANCELED || current == ExecutionState.FINISHED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ignoring transition of vertex {} to {} while being {}.", new Object[]{this.getVertexWithAttempt(), ExecutionState.FAILED, current});
            }
            return;
        }
        if (current == ExecutionState.CANCELING) {
            this.completeCancelling(userAccumulators, metrics, true);
            return;
        }
        if (!fromSchedulerNg) {
            this.vertex.getExecutionGraphAccessor().notifySchedulerNgAboutInternalTaskFailure(this.attemptId, t, cancelTask, releasePartitions);
            return;
        }
        Preconditions.checkState((boolean)this.transitionState(current, ExecutionState.FAILED, t));
        this.failureCause = Optional.of(ErrorInfo.createErrorInfoWithNullableCause(t, this.getStateTimestamp(ExecutionState.FAILED)));
        this.updateAccumulatorsAndMetrics(userAccumulators, metrics);
        this.releaseAssignedResource(t);
        this.vertex.getExecutionGraphAccessor().deregisterExecution(this);
        this.maybeReleasePartitionsAndSendCancelRpcCall(current, cancelTask, releasePartitions);
    }

    private void maybeReleasePartitionsAndSendCancelRpcCall(ExecutionState stateBeforeFailed, boolean cancelTask, boolean releasePartitions) {
        this.handlePartitionCleanup(releasePartitions, releasePartitions);
        if (cancelTask && (stateBeforeFailed == ExecutionState.RUNNING || stateBeforeFailed == ExecutionState.INITIALIZING || stateBeforeFailed == ExecutionState.DEPLOYING)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
            }
            try {
                if (this.assignedResource != null) {
                    this.sendCancelRpcCall(3);
                }
            }
            catch (Throwable tt) {
                LOG.error("Error triggering cancel call while marking task {} as failed.", (Object)this.getVertex().getTaskNameWithSubtaskIndex(), (Object)tt);
            }
        }
    }

    boolean switchToRecovering() {
        if (this.switchTo(ExecutionState.DEPLOYING, ExecutionState.INITIALIZING)) {
            this.sendPartitionInfos();
            return true;
        }
        return false;
    }

    boolean switchToRunning() {
        return this.switchTo(ExecutionState.INITIALIZING, ExecutionState.RUNNING);
    }

    private boolean switchTo(ExecutionState from, ExecutionState to) {
        if (this.transitionState(from, to)) {
            return true;
        }
        ExecutionState currentState = this.state;
        if (currentState != ExecutionState.FINISHED && currentState != ExecutionState.CANCELED) {
            if (currentState == ExecutionState.CANCELING || currentState == ExecutionState.FAILED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", (Object)this.getVertexWithAttempt());
                }
                this.sendCancelRpcCall(3);
            } else {
                String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.", new Object[]{this.getVertexWithAttempt(), currentState});
                LOG.debug(message);
                this.sendCancelRpcCall(3);
                this.markFailed(new Exception(message));
            }
        }
        return false;
    }

    private void sendCancelRpcCall(int numberRetries) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            ComponentMainThreadExecutor jobMasterMainThreadExecutor = this.getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
            CompletableFuture cancelResultFuture = FutureUtils.retry(() -> taskManagerGateway.cancelTask(this.attemptId, this.rpcTimeout), (int)numberRetries, (Executor)jobMasterMainThreadExecutor);
            cancelResultFuture.whenComplete((ack, failure) -> {
                if (failure != null) {
                    this.fail(new Exception("Task could not be canceled.", (Throwable)failure));
                }
            });
        }
    }

    private void startTrackingPartitions(ResourceID taskExecutorId, Collection<ResultPartitionDeploymentDescriptor> partitions) {
        JobMasterPartitionTracker partitionTracker = this.vertex.getExecutionGraphAccessor().getPartitionTracker();
        for (ResultPartitionDeploymentDescriptor partition : partitions) {
            partitionTracker.startTrackingPartition(taskExecutorId, partition);
        }
    }

    void handlePartitionCleanup(boolean releasePipelinedPartitions, boolean releaseBlockingPartitions) {
        if (releasePipelinedPartitions) {
            this.sendReleaseIntermediateResultPartitionsRpcCall();
        }
        Collection<ResultPartitionID> partitionIds = this.getPartitionIds();
        JobMasterPartitionTracker partitionTracker = this.getVertex().getExecutionGraphAccessor().getPartitionTracker();
        if (!partitionIds.isEmpty()) {
            if (releaseBlockingPartitions) {
                LOG.info("Discarding the results produced by task execution {}.", (Object)this.attemptId);
                partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
            } else {
                partitionTracker.stopTrackingPartitions(partitionIds);
            }
        }
    }

    private Collection<ResultPartitionID> getPartitionIds() {
        return this.producedPartitions.values().stream().map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor).map(ShuffleDescriptor::getResultPartitionID).collect(Collectors.toList());
    }

    private void sendReleaseIntermediateResultPartitionsRpcCall() {
        LOG.info("Discarding the results produced by task execution {}.", (Object)this.attemptId);
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            ShuffleMaster<?> shuffleMaster = this.getVertex().getExecutionGraphAccessor().getShuffleMaster();
            Set<ResultPartitionID> partitionIds = this.producedPartitions.values().stream().filter(resultPartitionDeploymentDescriptor -> resultPartitionDeploymentDescriptor.getPartitionType().isPipelined()).map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor).peek(shuffleMaster::releasePartitionExternally).map(ShuffleDescriptor::getResultPartitionID).collect(Collectors.toSet());
            if (!partitionIds.isEmpty()) {
                taskManagerGateway.releasePartitions(this.getVertex().getJobId(), partitionIds);
            }
        }
    }

    private void sendUpdatePartitionInfoRpcCall(Iterable<PartitionInfo> partitionInfos) {
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
            CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(this.attemptId, partitionInfos, this.rpcTimeout);
            updatePartitionsResultFuture.whenCompleteAsync((ack, failure) -> {
                if (failure != null) {
                    this.fail(new IllegalStateException("Update to task [" + this.getVertexWithAttempt() + "] on TaskManager " + taskManagerLocation + " failed", (Throwable)failure));
                }
            }, (Executor)this.getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor());
        }
    }

    private void releaseAssignedResource(@Nullable Throwable cause) {
        this.assertRunningInJobMasterMainThread();
        LogicalSlot slot = this.assignedResource;
        if (slot != null) {
            ComponentMainThreadExecutor jobMasterMainThreadExecutor = this.getVertex().getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
            slot.releaseSlot(cause).whenComplete((ignored, throwable) -> {
                jobMasterMainThreadExecutor.assertRunningInMainThread();
                if (throwable != null) {
                    this.releaseFuture.completeExceptionally((Throwable)throwable);
                } else {
                    this.releaseFuture.complete(null);
                }
            });
        } else {
            this.releaseFuture.complete(null);
        }
    }

    public void transitionState(ExecutionState targetState) {
        this.transitionState(this.state, targetState);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
        return this.transitionState(currentState, targetState, null);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
        if (currentState.isTerminal()) {
            throw new IllegalStateException("Cannot leave terminal state " + (Object)((Object)currentState) + " to transition to " + (Object)((Object)targetState) + '.');
        }
        if (this.state == currentState) {
            this.state = targetState;
            this.markTimestamp(targetState);
            if (error == null) {
                LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.getVertex().getTaskNameWithSubtaskIndex(), this.getAttemptId(), currentState, targetState});
            } else if (LOG.isInfoEnabled()) {
                LOG.info("{} ({}) switched from {} to {} on {}.", new Object[]{this.getVertex().getTaskNameWithSubtaskIndex(), this.getAttemptId(), currentState, targetState, this.getLocationInformation(), ExceptionUtils.stripCompletionException((Throwable)error)});
            }
            if (targetState == ExecutionState.INITIALIZING || targetState == ExecutionState.RUNNING) {
                this.initializingOrRunningFuture.complete(null);
            } else if (targetState.isTerminal()) {
                this.terminalStateFuture.complete(targetState);
            }
            try {
                this.vertex.notifyStateTransition(this, currentState, targetState);
            }
            catch (Throwable t) {
                LOG.error("Error while notifying execution graph of execution state transition.", t);
            }
            return true;
        }
        return false;
    }

    private String getLocationInformation() {
        if (this.assignedResource != null) {
            return this.assignedResource.getTaskManagerLocation().toString();
        }
        return "[unassigned resource]";
    }

    private void markTimestamp(ExecutionState state) {
        this.markTimestamp(state, System.currentTimeMillis());
    }

    private void markTimestamp(ExecutionState state, long timestamp) {
        this.stateTimestamps[state.ordinal()] = timestamp;
    }

    public String getVertexWithAttempt() {
        return this.vertex.getTaskNameWithSubtaskIndex() + " - execution #" + this.attemptNumber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setAccumulators(Map<String, Accumulator<?, ?>> userAccumulators) {
        Object object = this.accumulatorLock;
        synchronized (object) {
            if (!this.state.isTerminal()) {
                this.userAccumulators = userAccumulators;
            }
        }
    }

    public Map<String, Accumulator<?, ?>> getUserAccumulators() {
        return this.userAccumulators;
    }

    @Override
    public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
        Map<String, OptionalFailure> accumulators = this.userAccumulators == null ? null : this.userAccumulators.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> OptionalFailure.of(entry.getValue())));
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulators);
    }

    @Override
    public int getParallelSubtaskIndex() {
        return this.getVertex().getParallelSubtaskIndex();
    }

    @Override
    public IOMetrics getIOMetrics() {
        return this.ioMetrics;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateAccumulatorsAndMetrics(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        if (userAccumulators != null) {
            Object object = this.accumulatorLock;
            synchronized (object) {
                this.userAccumulators = userAccumulators;
            }
        }
        if (metrics != null) {
            this.ioMetrics = metrics;
        }
    }

    public String toString() {
        LogicalSlot slot = this.assignedResource;
        return String.format("Attempt #%d (%s) @ %s - [%s]", new Object[]{this.attemptNumber, this.vertex.getTaskNameWithSubtaskIndex(), slot == null ? "(unassigned)" : slot, this.state});
    }

    public ArchivedExecution archive() {
        return new ArchivedExecution(this);
    }

    private void assertRunningInJobMasterMainThread() {
        this.vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor().assertRunningInMainThread();
    }
}

