/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class Execution
implements AccessExecution,
Archiveable<ArchivedExecution> {
    private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
    private static final Logger LOG = ExecutionGraph.LOG;
    private static final int NUM_CANCEL_CALL_TRIES = 3;
    private static final int NUM_STOP_CALL_TRIES = 3;
    private final ExecutionVertex vertex;
    private final ExecutionAttemptID attemptId;
    private final long[] stateTimestamps;
    private final int attemptNumber;
    private final Time timeout;
    private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
    private volatile ExecutionState state = ExecutionState.CREATED;
    private volatile SimpleSlot assignedResource;
    private volatile Throwable failureCause;
    private volatile TaskManagerLocation assignedResourceLocation;
    private TaskStateHandles taskStateHandles;
    private Executor executor;
    private final Object accumulatorLock = new Object();
    private volatile Map<String, Accumulator<?, ?>> userAccumulators;
    private IOMetrics ioMetrics;

    public Execution(Executor executor, ExecutionVertex vertex, int attemptNumber, long startTimestamp, Time timeout) {
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.vertex = (ExecutionVertex)Preconditions.checkNotNull((Object)vertex);
        this.attemptId = new ExecutionAttemptID();
        this.attemptNumber = attemptNumber;
        this.stateTimestamps = new long[ExecutionState.values().length];
        this.markTimestamp(ExecutionState.CREATED, startTimestamp);
        this.timeout = (Time)Preconditions.checkNotNull((Object)timeout);
        this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue();
    }

    public ExecutionVertex getVertex() {
        return this.vertex;
    }

    @Override
    public ExecutionAttemptID getAttemptId() {
        return this.attemptId;
    }

    @Override
    public int getAttemptNumber() {
        return this.attemptNumber;
    }

    @Override
    public ExecutionState getState() {
        return this.state;
    }

    public SimpleSlot getAssignedResource() {
        return this.assignedResource;
    }

    @Override
    public TaskManagerLocation getAssignedResourceLocation() {
        return this.assignedResourceLocation;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    @Override
    public String getFailureCauseAsString() {
        return ExceptionUtils.stringifyException((Throwable)this.getFailureCause());
    }

    @Override
    public long[] getStateTimestamps() {
        return this.stateTimestamps;
    }

    @Override
    public long getStateTimestamp(ExecutionState state) {
        return this.stateTimestamps[state.ordinal()];
    }

    public boolean isFinished() {
        return this.state.isTerminal();
    }

    public TaskStateHandles getTaskStateHandles() {
        return this.taskStateHandles;
    }

    public void setInitialState(TaskStateHandles checkpointStateHandles) {
        if (this.state != ExecutionState.CREATED) {
            throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
        }
        this.taskStateHandles = checkpointStateHandles;
    }

    public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
        if (slotProvider == null) {
            throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
        }
        SlotSharingGroup sharingGroup = this.vertex.getJobVertex().getSlotSharingGroup();
        CoLocationConstraint locationConstraint = this.vertex.getLocationConstraint();
        if (locationConstraint != null && sharingGroup == null) {
            throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
        }
        if (this.transitionState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
            ScheduledUnit toSchedule = locationConstraint == null ? new ScheduledUnit(this, sharingGroup) : new ScheduledUnit(this, sharingGroup, locationConstraint);
            Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued);
            Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
                    if (simpleSlot != null) {
                        try {
                            Execution.this.deployToSlot(simpleSlot);
                        }
                        catch (Throwable t) {
                            try {
                                simpleSlot.releaseSlot();
                            }
                            finally {
                                Execution.this.markFailed(t);
                            }
                        }
                    } else {
                        Execution.this.markFailed(throwable);
                    }
                    return null;
                }
            });
            if (!queued && !deploymentFuture.isDone()) {
                this.markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet."));
            }
            return true;
        }
        return false;
    }

    public void deployToSlot(SimpleSlot slot) throws JobException {
        if (slot == null) {
            throw new NullPointerException();
        }
        if (!slot.isAlive()) {
            throw new JobException("Target slot for deployment is not alive.");
        }
        ExecutionState previous = this.state;
        if (previous == ExecutionState.SCHEDULED || previous == ExecutionState.CREATED) {
            if (!this.transitionState(previous, ExecutionState.DEPLOYING)) {
                throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
            }
        } else {
            throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + (Object)((Object)previous));
        }
        try {
            if (!slot.setExecutedVertex(this)) {
                throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
            }
            this.assignedResource = slot;
            this.assignedResourceLocation = slot.getTaskManagerLocation();
            if (this.state != ExecutionState.DEPLOYING) {
                slot.releaseSlot();
                return;
            }
            if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Deploying %s (attempt #%d) to %s", this.vertex.getSimpleName(), this.attemptNumber, this.assignedResourceLocation.getHostname()));
            }
            TaskDeploymentDescriptor deployment = this.vertex.createDeploymentDescriptor(this.attemptId, slot, this.taskStateHandles, this.attemptNumber);
            this.vertex.getExecutionGraph().registerExecution(this);
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, this.timeout);
            submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable failure) {
                    if (failure instanceof TimeoutException) {
                        String taskname = Execution.this.vertex.getTaskName() + '(' + (Execution.this.getParallelSubtaskIndex() + 1) + '/' + Execution.this.vertex.getTotalNumberOfParallelSubtasks() + ") (" + (Object)((Object)Execution.this.attemptId) + ')';
                        Execution.this.markFailed(new Exception("Cannot deploy task " + taskname + " - TaskManager (" + Execution.this.assignedResourceLocation + ") not responding after a timeout of " + Execution.this.timeout, failure));
                    } else {
                        Execution.this.markFailed(failure);
                    }
                    return null;
                }
            }, this.executor);
        }
        catch (Throwable t) {
            this.markFailed(t);
            ExceptionUtils.rethrow((Throwable)t);
        }
    }

    public void stop() {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            Future<Acknowledge> stopResultFuture = FutureUtils.retry(new Callable<Future<Acknowledge>>(){

                @Override
                public Future<Acknowledge> call() throws Exception {
                    return taskManagerGateway.stopTask(Execution.this.attemptId, Execution.this.timeout);
                }
            }, 3, this.executor);
            stopResultFuture.exceptionally(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable failure) {
                    LOG.info("Stopping task was not successful.", failure);
                    return null;
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        ExecutionState current;
        block8: {
            while (true) {
                if ((current = this.state) == ExecutionState.CANCELING || current == ExecutionState.CANCELED) {
                    return;
                }
                if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                    if (!this.transitionState(current, ExecutionState.CANCELING)) continue;
                    this.sendCancelRpcCall();
                    return;
                }
                if (current == ExecutionState.FINISHED || current == ExecutionState.FAILED) {
                    this.sendFailIntermediateResultPartitionsRpcCall();
                    return;
                }
                if (current != ExecutionState.CREATED && current != ExecutionState.SCHEDULED) break block8;
                if (this.transitionState(current, ExecutionState.CANCELED)) break;
            }
            this.markTimestamp(ExecutionState.CANCELING, this.getStateTimestamp(ExecutionState.CANCELED));
            try {
                this.vertex.getExecutionGraph().deregisterExecution(this);
                if (this.assignedResource != null) {
                    this.assignedResource.releaseSlot();
                }
            }
            finally {
                this.vertex.executionCanceled();
            }
            return;
        }
        throw new IllegalStateException(current.name());
    }

    void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
        int numConsumers = allConsumers.size();
        if (numConsumers > 1) {
            this.fail(new IllegalStateException("Currently, only a single consumer group per partition is supported."));
        } else if (numConsumers == 0) {
            return;
        }
        for (ExecutionEdge edge : allConsumers.get(0)) {
            Execution partitionExecution;
            final ExecutionVertex consumerVertex = edge.getTarget();
            Execution consumer = consumerVertex.getCurrentExecutionAttempt();
            ExecutionState consumerState = consumer.getState();
            IntermediateResultPartition partition = edge.getSource();
            if (consumerState == ExecutionState.CREATED) {
                partitionExecution = partition.getProducer().getCurrentExecutionAttempt();
                consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition, partitionExecution));
                FlinkFuture.supplyAsync(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        try {
                            consumerVertex.scheduleForExecution(consumerVertex.getExecutionGraph().getSlotProvider(), consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
                        }
                        catch (Throwable t) {
                            consumerVertex.fail(new IllegalStateException("Could not schedule consumer vertex " + consumerVertex, t));
                        }
                        return null;
                    }
                }, this.executor);
                if (consumerVertex.getExecutionState() != ExecutionState.RUNNING) continue;
                consumerVertex.sendPartitionInfos();
                continue;
            }
            if (consumerState == ExecutionState.RUNNING) {
                ResultPartitionLocation partitionLocation;
                SimpleSlot consumerSlot = consumer.getAssignedResource();
                if (consumerSlot == null) continue;
                TaskManagerLocation partitionTaskManagerLocation = partition.getProducer().getCurrentAssignedResource().getTaskManagerLocation();
                ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
                ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
                ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), this.attemptId);
                if (consumerTaskManager.equals(partitionTaskManager)) {
                    partitionLocation = ResultPartitionLocation.createLocal();
                } else {
                    ConnectionID connectionId = new ConnectionID(partitionTaskManagerLocation, partition.getIntermediateResult().getConnectionIndex());
                    partitionLocation = ResultPartitionLocation.createRemote(connectionId);
                }
                InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(partitionId, partitionLocation);
                consumer.sendUpdatePartitionInfoRpcCall(Collections.singleton(new PartitionInfo(partition.getIntermediateResult().getId(), descriptor)));
                continue;
            }
            if (consumerState != ExecutionState.SCHEDULED && consumerState != ExecutionState.DEPLOYING) continue;
            partitionExecution = partition.getProducer().getCurrentExecutionAttempt();
            consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition, partitionExecution));
            if (consumerVertex.getExecutionState() != ExecutionState.RUNNING) continue;
            consumerVertex.sendPartitionInfos();
        }
    }

    public void fail(Throwable t) {
        this.processFail(t, false);
    }

    public Future<StackTraceSampleResponse> requestStackTraceSample(int sampleId, int numSamples, Time delayBetweenSamples, int maxStrackTraceDepth, Time timeout) {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            return taskManagerGateway.requestStackTraceSample(this.attemptId, sampleId, numSamples, delayBetweenSamples, maxStrackTraceDepth, timeout);
        }
        return FlinkCompletableFuture.completedExceptionally(new Exception("The execution has no slot assigned."));
    }

    public void notifyCheckpointComplete(long checkpointId, long timestamp) {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.notifyCheckpointComplete(this.attemptId, this.getVertex().getJobId(), checkpointId, timestamp);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    public void triggerCheckpoint(long checkpointId, long timestamp) {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.triggerCheckpoint(this.attemptId, this.getVertex().getJobId(), checkpointId, timestamp);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

    void markFailed(Throwable t) {
        this.processFail(t, true);
    }

    void markFinished() {
        this.markFinished(null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
        ExecutionState current;
        while ((current = this.state) == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
            if (!this.transitionState(current, ExecutionState.FINISHED)) continue;
            try {
                for (IntermediateResultPartition finishedPartition : this.getVertex().finishAllBlockingPartitions()) {
                    IntermediateResultPartition[] allPartitions;
                    for (IntermediateResultPartition partition : allPartitions = finishedPartition.getIntermediateResult().getPartitions()) {
                        this.scheduleOrUpdateConsumers(partition.getConsumers());
                    }
                }
                Object object = this.accumulatorLock;
                synchronized (object) {
                    this.userAccumulators = userAccumulators;
                }
                this.ioMetrics = metrics;
                this.assignedResource.releaseSlot();
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionFinished();
            }
            return;
        }
        if (current == ExecutionState.CANCELING) {
            this.cancelingComplete();
            return;
        }
        if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Task FINISHED, but concurrently went to state " + (Object)((Object)this.state));
            }
            return;
        }
        this.markFailed(new Exception("Vertex received FINISHED message while being in state " + (Object)((Object)this.state)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cancelingComplete() {
        ExecutionState current;
        block6: {
            do {
                if ((current = this.state) == ExecutionState.CANCELED) {
                    return;
                }
                if (current != ExecutionState.CANCELING && current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING) break block6;
            } while (!this.transitionState(current, ExecutionState.CANCELED));
            try {
                this.assignedResource.releaseSlot();
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionCanceled();
            }
            return;
        }
        if (current != ExecutionState.FAILED) {
            String message = String.format("Asynchronous race: Found state %s after successful cancel call.", new Object[]{this.state});
            LOG.error(message);
            this.vertex.getExecutionGraph().fail(new Exception(message));
        }
    }

    void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
        this.partialInputChannelDeploymentDescriptors.add(partitionInfo);
    }

    void sendPartitionInfos() {
        if (this.partialInputChannelDeploymentDescriptors != null && !this.partialInputChannelDeploymentDescriptors.isEmpty()) {
            PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
            ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>(this.partialInputChannelDeploymentDescriptors.size());
            while ((partialInputChannelDeploymentDescriptor = this.partialInputChannelDeploymentDescriptors.poll()) != null) {
                partitionInfos.add(new PartitionInfo(partialInputChannelDeploymentDescriptor.getResultId(), partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this)));
            }
            this.sendUpdatePartitionInfoRpcCall(partitionInfos);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processFail(Throwable t, boolean isCallback) {
        ExecutionState current;
        do {
            if ((current = this.state) == ExecutionState.FAILED) {
                return false;
            }
            if (current == ExecutionState.CANCELED || current == ExecutionState.FINISHED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ignoring transition of vertex {} to {} while being {}.", new Object[]{this.getVertexWithAttempt(), ExecutionState.FAILED, current});
                }
                return false;
            }
            if (current != ExecutionState.CANCELING) continue;
            this.cancelingComplete();
            return false;
        } while (!this.transitionState(current, ExecutionState.FAILED, t));
        this.failureCause = t;
        try {
            if (this.assignedResource != null) {
                this.assignedResource.releaseSlot();
            }
            this.vertex.getExecutionGraph().deregisterExecution(this);
        }
        finally {
            this.vertex.executionFailed(t);
        }
        if (!(isCallback || current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
            }
            try {
                if (this.assignedResource != null) {
                    this.sendCancelRpcCall();
                }
            }
            catch (Throwable tt) {
                LOG.error("Error triggering cancel call while marking task as failed.", tt);
            }
        }
        return true;
    }

    boolean switchToRunning() {
        if (this.transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
            this.sendPartitionInfos();
            return true;
        }
        ExecutionState currentState = this.state;
        if (currentState != ExecutionState.FINISHED && currentState != ExecutionState.CANCELED) {
            if (currentState == ExecutionState.CANCELING || currentState == ExecutionState.FAILED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", this.getVertexWithAttempt()));
                }
                this.sendCancelRpcCall();
            } else {
                String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.", new Object[]{this.getVertexWithAttempt(), currentState});
                if (LOG.isDebugEnabled()) {
                    LOG.debug(message);
                }
                this.sendCancelRpcCall();
                this.markFailed(new Exception(message));
            }
        }
        return false;
    }

    private void sendCancelRpcCall() {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            Future<Acknowledge> cancelResultFuture = FutureUtils.retry(new Callable<Future<Acknowledge>>(){

                @Override
                public Future<Acknowledge> call() throws Exception {
                    return taskManagerGateway.cancelTask(Execution.this.attemptId, Execution.this.timeout);
                }
            }, 3, this.executor);
            cancelResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable failure) {
                    Execution.this.fail(new Exception("Task could not be canceled.", failure));
                    return null;
                }
            }, this.executor);
        }
    }

    private void sendFailIntermediateResultPartitionsRpcCall() {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            taskManagerGateway.failPartition(this.attemptId);
        }
    }

    private void sendUpdatePartitionInfoRpcCall(Iterable<PartitionInfo> partitionInfos) {
        SimpleSlot slot = this.assignedResource;
        if (slot != null) {
            TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
            Future<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(this.attemptId, partitionInfos, this.timeout);
            updatePartitionsResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable failure) {
                    Execution.this.fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation + " failed due to:", failure));
                    return null;
                }
            }, this.executor);
        }
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
        return this.transitionState(currentState, targetState, null);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
        if (currentState.isTerminal()) {
            throw new IllegalStateException("Cannot leave terminal state " + (Object)((Object)currentState) + " to transition to " + (Object)((Object)targetState) + '.');
        }
        if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
            this.markTimestamp(targetState);
            if (error == null) {
                LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.getVertex().getTaskNameWithSubtaskIndex(), this.getAttemptId(), currentState, targetState});
            } else {
                LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.getVertex().getTaskNameWithSubtaskIndex(), this.getAttemptId(), currentState, targetState, error});
            }
            try {
                this.vertex.notifyStateTransition(this.attemptId, targetState, error);
            }
            catch (Throwable t) {
                LOG.error("Error while notifying execution graph of execution state transition.", t);
            }
            return true;
        }
        return false;
    }

    private void markTimestamp(ExecutionState state) {
        this.markTimestamp(state, System.currentTimeMillis());
    }

    private void markTimestamp(ExecutionState state, long timestamp) {
        this.stateTimestamps[state.ordinal()] = timestamp;
    }

    public String getVertexWithAttempt() {
        return this.vertex.getSimpleName() + " - execution #" + this.attemptNumber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setAccumulators(Map<String, Accumulator<?, ?>> userAccumulators) {
        Object object = this.accumulatorLock;
        synchronized (object) {
            if (!this.state.isTerminal()) {
                this.userAccumulators = userAccumulators;
            }
        }
    }

    public Map<String, Accumulator<?, ?>> getUserAccumulators() {
        return this.userAccumulators;
    }

    @Override
    public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(this.userAccumulators);
    }

    @Override
    public int getParallelSubtaskIndex() {
        return this.getVertex().getParallelSubtaskIndex();
    }

    @Override
    public IOMetrics getIOMetrics() {
        return this.ioMetrics;
    }

    public String toString() {
        return String.format("Attempt #%d (%s) @ %s - [%s]", new Object[]{this.attemptNumber, this.vertex.getSimpleName(), this.assignedResource == null ? "(unassigned)" : this.assignedResource.toString(), this.state});
    }

    public ArchivedExecution archive() {
        return new ArchivedExecution(this);
    }
}

