/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class TaskExecutor
extends RpcEndpoint<TaskExecutorGateway> {
    public static final String TASK_MANAGER_NAME = "taskmanager";
    private final TaskManagerLocation taskManagerLocation;
    public static final int MAX_BLOB_PORT = 65536;
    private final HighAvailabilityServices haServices;
    private final TaskManagerConfiguration taskManagerConfiguration;
    private final IOManager ioManager;
    private final MemoryManager memoryManager;
    private final NetworkEnvironment networkEnvironment;
    private final MetricRegistry metricRegistry;
    private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private final FatalErrorHandler fatalErrorHandler;
    private final TaskManagerMetricGroup taskManagerMetricGroup;
    private final BroadcastVariableManager broadcastVariableManager;
    private final FileCache fileCache;
    private TaskExecutorToResourceManagerConnection resourceManagerConnection;
    private Map<ResourceID, JobManagerConnection> jobManagerConnections;
    private final TaskSlotTable taskSlotTable;
    private final JobManagerTable jobManagerTable;
    private final JobLeaderService jobLeaderService;

    public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, TaskSlotTable taskSlotTable, JobManagerTable jobManagerTable, JobLeaderService jobLeaderService, FatalErrorHandler fatalErrorHandler) {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
        Preconditions.checkArgument((taskManagerConfiguration.getNumberSlots() > 0 ? 1 : 0) != 0, (Object)"The number of slots has to be larger than 0.");
        this.taskManagerConfiguration = (TaskManagerConfiguration)Preconditions.checkNotNull((Object)taskManagerConfiguration);
        this.taskManagerLocation = (TaskManagerLocation)Preconditions.checkNotNull((Object)taskManagerLocation);
        this.memoryManager = (MemoryManager)Preconditions.checkNotNull((Object)memoryManager);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.networkEnvironment = (NetworkEnvironment)Preconditions.checkNotNull((Object)networkEnvironment);
        this.haServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)haServices);
        this.metricRegistry = (MetricRegistry)Preconditions.checkNotNull((Object)metricRegistry);
        this.taskSlotTable = (TaskSlotTable)Preconditions.checkNotNull((Object)taskSlotTable);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.taskManagerMetricGroup = (TaskManagerMetricGroup)Preconditions.checkNotNull((Object)taskManagerMetricGroup);
        this.broadcastVariableManager = (BroadcastVariableManager)Preconditions.checkNotNull((Object)broadcastVariableManager);
        this.fileCache = (FileCache)Preconditions.checkNotNull((Object)fileCache);
        this.jobManagerTable = (JobManagerTable)Preconditions.checkNotNull((Object)jobManagerTable);
        this.jobLeaderService = (JobLeaderService)Preconditions.checkNotNull((Object)jobLeaderService);
        this.jobManagerConnections = new HashMap<ResourceID, JobManagerConnection>(4);
        this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(this.getResourceID(), new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(this.getResourceID(), new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
    }

    @Override
    public void start() throws Exception {
        super.start();
        try {
            this.haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
        }
        catch (Exception e) {
            this.onFatalErrorAsync(e);
        }
        this.taskSlotTable.start(new SlotActionsImpl());
        this.jobLeaderService.start(this.getAddress(), this.getRpcService(), this.haServices, new JobLeaderListenerImpl());
    }

    @Override
    public void shutDown() throws Exception {
        this.log.info("Stopping TaskManager {}.", (Object)this.getAddress());
        Exception exception = null;
        this.taskSlotTable.stop();
        if (this.isConnectedToResourceManager()) {
            this.resourceManagerConnection.close();
        }
        this.jobManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        this.ioManager.shutdown();
        this.memoryManager.shutdown();
        this.networkEnvironment.shutdown();
        this.fileCache.shutdown();
        try {
            super.shutDown();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
        }
        if (exception != null) {
            ExceptionUtils.rethrowException((Throwable)exception, (String)"Error while shutting the TaskExecutor down.");
        }
        this.log.info("Stopped TaskManager {}.", (Object)this.getAddress());
    }

    @RpcMethod
    public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID jobManagerLeaderId) throws TaskSubmissionException {
        boolean taskAdded;
        TaskInformation taskInformation;
        JobInformation jobInformation;
        try {
            jobInformation = (JobInformation)tdd.getSerializedJobInformation().deserializeValue(this.getClass().getClassLoader());
            taskInformation = (TaskInformation)tdd.getSerializedTaskInformation().deserializeValue(this.getClass().getClassLoader());
        }
        catch (IOException | ClassNotFoundException e) {
            throw new TaskSubmissionException("Could not deserialize the job or task information.", e);
        }
        JobID jobId = jobInformation.getJobId();
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
        if (jobManagerConnection == null) {
            String message = "Could not submit task because there is no JobManager associated for the job " + jobId + '.';
            this.log.debug(message);
            throw new TaskSubmissionException(message);
        }
        if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) {
            String message = "Rejecting the task submission because the job manager leader id " + jobManagerLeaderId + " does not match the expected job manager leader id " + jobManagerConnection.getLeaderId() + '.';
            this.log.debug(message);
            throw new TaskSubmissionException(message);
        }
        if (!this.taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) {
            String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + (Object)((Object)tdd.getAllocationId()) + '.';
            this.log.debug(message);
            throw new TaskSubmissionException(message);
        }
        TaskMetricGroup taskMetricGroup = this.taskManagerMetricGroup.addTaskForJob(jobInformation.getJobId(), jobInformation.getJobName(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), taskInformation.getTaskName(), tdd.getSubtaskIndex(), tdd.getAttemptNumber());
        RpcInputSplitProvider inputSplitProvider = new RpcInputSplitProvider(jobManagerConnection.getLeaderId(), jobManagerConnection.getJobManagerGateway(), jobInformation.getJobId(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), this.taskManagerConfiguration.getTimeout());
        TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
        CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
        LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
        PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
        Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getSubtaskIndex(), tdd.getAttemptNumber(), tdd.getProducedPartitions(), tdd.getInputGates(), tdd.getTargetSlotNumber(), tdd.getTaskStateHandles(), this.memoryManager, this.ioManager, this.networkEnvironment, this.broadcastVariableManager, taskManagerActions, inputSplitProvider, checkpointResponder, libraryCache, this.fileCache, this.taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, this.getRpcService().getExecutor());
        this.log.info("Received task {}.", (Object)task.getTaskInfo().getTaskNameWithSubtasks());
        try {
            taskAdded = this.taskSlotTable.addTask(task);
        }
        catch (SlotNotActiveException | SlotNotFoundException e) {
            throw new TaskSubmissionException("Could not submit task.", e);
        }
        if (taskAdded) {
            task.startTaskThread();
            return Acknowledge.get();
        }
        String message = "TaskManager already contains a task for id " + (Object)((Object)task.getExecutionId()) + '.';
        this.log.debug(message);
        throw new TaskSubmissionException(message);
    }

    @RpcMethod
    public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.cancelExecution();
                return Acknowledge.get();
            }
            catch (Throwable t) {
                throw new TaskException("Cannot cancel task for execution " + (Object)((Object)executionAttemptID) + '.', t);
            }
        }
        String message = "Cannot find task to stop for execution " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        throw new TaskException(message);
    }

    @RpcMethod
    public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.stopExecution();
                return Acknowledge.get();
            }
            catch (Throwable t) {
                throw new TaskException("Cannot stop task for execution " + (Object)((Object)executionAttemptID) + '.', t);
            }
        }
        String message = "Cannot find task to stop for execution " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        throw new TaskException(message);
    }

    @RpcMethod
    public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos) throws PartitionException {
        final Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            for (final PartitionInfo partitionInfo : partitionInfos) {
                IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
                final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID);
                if (singleInputGate != null) {
                    this.getRpcService().execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
                            }
                            catch (IOException | InterruptedException e) {
                                TaskExecutor.this.log.error("Could not update input data location for task {}. Trying to fail task.", (Object)task.getTaskInfo().getTaskName(), (Object)e);
                                try {
                                    task.failExternally(e);
                                }
                                catch (RuntimeException re) {
                                    TaskExecutor.this.log.error("Failed canceling task with execution ID {} after task update failure.", (Object)executionAttemptID, (Object)re);
                                }
                            }
                        }
                    });
                    continue;
                }
                throw new PartitionException("No reader with ID " + (Object)((Object)intermediateResultPartitionID) + " for task " + (Object)((Object)executionAttemptID) + " was found.");
            }
            return Acknowledge.get();
        }
        this.log.debug("Discard update for input partitions of task {}. Task is no longer running.", (Object)executionAttemptID);
        return Acknowledge.get();
    }

    @RpcMethod
    public void failPartition(ExecutionAttemptID executionAttemptID) {
        this.log.info("Discarding the results produced by task execution {}.", (Object)executionAttemptID);
        try {
            this.networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
        }
        catch (Throwable t) {
            this.onFatalError(t);
        }
    }

    @RpcMethod
    public void heartbeatFromJobManager(ResourceID resourceID) {
        this.jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @RpcMethod
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @RpcMethod
    public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException {
        this.log.debug("Trigger checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
            return Acknowledge.get();
        }
        String message = "TaskManager received a checkpoint request for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        throw new CheckpointException(message);
    }

    @RpcMethod
    public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
        this.log.debug("Confirm checkpoint {}@{} for {}.", new Object[]{checkpointId, checkpointTimestamp, executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointComplete(checkpointId);
            return Acknowledge.get();
        }
        String message = "TaskManager received a checkpoint confirmation for unknown task " + (Object)((Object)executionAttemptID) + '.';
        this.log.debug(message);
        throw new CheckpointException(message);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @RpcMethod
    public Acknowledge requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, UUID rmLeaderId) throws SlotAllocationException {
        this.log.info("Receive slot request {} for job {} from resource manager with leader id {}.", new Object[]{allocationId, jobId, rmLeaderId});
        if (this.resourceManagerConnection == null) {
            String message = "TaskManager is not connected to a resource manager.";
            this.log.debug("TaskManager is not connected to a resource manager.");
            throw new SlotAllocationException("TaskManager is not connected to a resource manager.");
        }
        if (!this.resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
            String message = "The leader id " + rmLeaderId + " does not match with the leader id of the connected resource manager " + this.resourceManagerConnection.getTargetLeaderId() + '.';
            this.log.debug(message);
            throw new SlotAllocationException(message);
        }
        if (this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
            if (!this.taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, this.taskManagerConfiguration.getTimeout())) {
                this.log.info("Could not allocate slot for {}.", (Object)allocationId);
                throw new SlotAllocationException("Could not allocate slot.");
            }
            this.log.info("Allocated slot for {}.", (Object)allocationId);
        } else if (!this.taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
            String message = "The slot " + slotId + " has already been allocated for a different job.";
            this.log.info(message);
            throw new SlotOccupiedException(message, this.taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
        }
        if (this.jobManagerTable.contains(jobId)) {
            this.offerSlotsToJobManager(jobId);
            return Acknowledge.get();
        }
        try {
            this.jobLeaderService.addJob(jobId, targetAddress);
            return Acknowledge.get();
        }
        catch (Exception e) {
            try {
                this.taskSlotTable.freeSlot(allocationId);
            }
            catch (SlotNotFoundException slotNotFoundException) {
                this.onFatalError(slotNotFoundException);
            }
            if (this.taskSlotTable.isSlotFree(slotId.getSlotNumber())) throw new SlotAllocationException("Could not add job to job leader service.", e);
            this.onFatalError(new Exception("Could not free slot " + slotId));
            throw new SlotAllocationException("Could not add job to job leader service.", e);
        }
    }

    @RpcMethod
    public void disconnectJobManager(JobID jobId, Exception cause) {
        this.closeJobManagerConnection(jobId, cause);
    }

    @RpcMethod
    public void disconnectResourceManager(Exception cause) {
        this.closeResourceManagerConnection(cause);
    }

    private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
        if (this.resourceManagerConnection != null) {
            if (newLeaderAddress != null) {
                this.log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", (Object)this.resourceManagerConnection.getTargetAddress(), (Object)newLeaderAddress);
            } else {
                this.log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", (Object)this.resourceManagerConnection.getTargetAddress());
            }
            if (this.resourceManagerConnection != null) {
                this.resourceManagerConnection.close();
                this.resourceManagerConnection = null;
            }
        }
        if (newLeaderAddress != null) {
            this.log.info("Attempting to register at ResourceManager {}", (Object)newLeaderAddress);
            this.resourceManagerConnection = new TaskExecutorToResourceManagerConnection(this.log, this.getRpcService(), this.getAddress(), this.getResourceID(), this.taskSlotTable.createSlotReport(this.getResourceID()), newLeaderAddress, newLeaderId, this.getMainThreadExecutor(), new ResourceManagerRegistrationListener());
            this.resourceManagerConnection.start();
        }
    }

    private void establishResourceManagerConnection(ResourceID resourceManagerResourceId) {
        this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)TaskExecutor.this.resourceManagerConnection.getTargetGateway();
                resourceManagerGateway.heartbeatFromTaskManager(resourceID);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, Void payload) {
            }
        });
    }

    private void closeResourceManagerConnection(Exception cause) {
        if (this.isConnectedToResourceManager()) {
            this.log.info("Close ResourceManager connection {}.", (Object)this.resourceManagerConnection.getResourceManagerId(), (Object)cause);
            this.resourceManagerHeartbeatManager.unmonitorTarget(this.resourceManagerConnection.getResourceManagerId());
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
            resourceManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void offerSlotsToJobManager(final JobID jobId) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
        if (jobManagerConnection == null) {
            this.log.debug("There is no job manager connection to the leader of job {}.", (Object)jobId);
        } else if (this.taskSlotTable.hasAllocatedSlots(jobId)) {
            this.log.info("Offer reserved slots to the leader of job {}.", (Object)jobId);
            JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
            Iterator<TaskSlot> reservedSlotsIterator = this.taskSlotTable.getAllocatedSlots(jobId);
            final UUID leaderId = jobManagerConnection.getLeaderId();
            final HashSet<SlotOffer> reservedSlots = new HashSet<SlotOffer>(2);
            while (reservedSlotsIterator.hasNext()) {
                SlotOffer offer;
                block7: {
                    offer = reservedSlotsIterator.next().generateSlotOffer();
                    try {
                        if (this.taskSlotTable.markSlotActive(offer.getAllocationId())) break block7;
                        String message = "Could not mark slot " + jobId + " active.";
                        this.log.debug(message);
                        jobMasterGateway.failSlot(this.getResourceID(), offer.getAllocationId(), leaderId, new Exception(message));
                    }
                    catch (SlotNotFoundException e) {
                        String message = "Could not mark slot " + jobId + " active.";
                        jobMasterGateway.failSlot(this.getResourceID(), offer.getAllocationId(), leaderId, new Exception(message));
                        continue;
                    }
                }
                reservedSlots.add(offer);
            }
            Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(this.getResourceID(), reservedSlots, leaderId, this.taskManagerConfiguration.getTimeout());
            Future<Void> acceptedSlotsAcceptFuture = acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>(){

                @Override
                public void accept(Iterable<SlotOffer> acceptedSlots) {
                    if (TaskExecutor.this.isJobManagerConnectionValid(jobId, leaderId)) {
                        for (SlotOffer acceptedSlot : acceptedSlots) {
                            reservedSlots.remove(acceptedSlot);
                        }
                        Exception e = new Exception("The slot was rejected by the JobManager.");
                        for (SlotOffer rejectedSlot : reservedSlots) {
                            TaskExecutor.this.freeSlot(rejectedSlot.getAllocationId(), e);
                        }
                    } else {
                        TaskExecutor.this.log.debug("Discard offer slot response since there is a new leader for the job {}.", (Object)jobId);
                    }
                }
            }, this.getMainThreadExecutor());
            acceptedSlotsAcceptFuture.exceptionally(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable throwable) {
                    if (throwable instanceof TimeoutException) {
                        TaskExecutor.this.log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
                        TaskExecutor.this.offerSlotsToJobManager(jobId);
                    } else {
                        TaskExecutor.this.log.warn("Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager.", throwable);
                        for (SlotOffer reservedSlot : reservedSlots) {
                            TaskExecutor.this.freeSlot(reservedSlot.getAllocationId(), throwable);
                        }
                    }
                    return null;
                }
            });
        } else {
            this.log.debug("There are no unassigned slots for the job {}.", (Object)jobId);
        }
    }

    private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) {
        JobManagerConnection oldJobManagerConnection;
        this.log.info("Establish JobManager connection for job {}.", (Object)jobId);
        if (this.jobManagerTable.contains(jobId) && !(oldJobManagerConnection = this.jobManagerTable.get(jobId)).getLeaderId().equals(jobManagerLeaderId)) {
            this.closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.'));
        }
        ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
        JobManagerConnection newJobManagerConnection = this.associateWithJobManager(jobId, jobManagerResourceID, jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort());
        this.jobManagerConnections.put(jobManagerResourceID, newJobManagerConnection);
        this.jobManagerTable.put(jobId, newJobManagerConnection);
        this.jobManagerHeartbeatManager.monitorTarget(jobManagerResourceID, new HeartbeatTarget<Void>(){

            @Override
            public void receiveHeartbeat(ResourceID resourceID, Void payload) {
                jobMasterGateway.heartbeatFromTaskManager(resourceID);
            }

            @Override
            public void requestHeartbeat(ResourceID resourceID, Void payload) {
            }
        });
        this.offerSlotsToJobManager(jobId);
    }

    private void closeJobManagerConnection(JobID jobId, Exception cause) {
        this.log.info("Close JobManager connection for job {}.", (Object)jobId);
        Iterator<Task> tasks = this.taskSlotTable.getTasks(jobId);
        while (tasks.hasNext()) {
            tasks.next().failExternally(new Exception("JobManager responsible for " + jobId + " lost the leadership."));
        }
        Iterator<AllocationID> activeSlots = this.taskSlotTable.getActiveSlots(jobId);
        while (activeSlots.hasNext()) {
            AllocationID activeSlot = activeSlots.next();
            try {
                if (this.taskSlotTable.markSlotInactive(activeSlot, this.taskManagerConfiguration.getTimeout())) continue;
                this.freeSlot(activeSlot, new Exception("Slot could not be marked inactive."));
            }
            catch (SlotNotFoundException e) {
                this.log.debug("Could not mark the slot {} inactive.", (Object)jobId, (Object)e);
            }
        }
        JobManagerConnection jobManagerConnection = this.jobManagerTable.remove(jobId);
        if (jobManagerConnection != null) {
            try {
                this.jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceID());
                this.jobManagerConnections.remove(jobManagerConnection.getResourceID());
                this.disassociateFromJobManager(jobManagerConnection, cause);
            }
            catch (IOException e) {
                this.log.warn("Could not properly disassociate from JobManager {}.", (Object)jobManagerConnection.getJobManagerGateway().getAddress(), (Object)e);
            }
        }
    }

    private JobManagerConnection associateWithJobManager(JobID jobID, ResourceID resourceID, JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
        BlobLibraryCacheManager libraryCacheManager;
        Preconditions.checkNotNull((Object)jobID);
        Preconditions.checkNotNull((Object)resourceID);
        Preconditions.checkNotNull((Object)jobManagerLeaderId);
        Preconditions.checkNotNull((Object)jobMasterGateway);
        Preconditions.checkArgument((blobPort > 0 || blobPort < 65536 ? 1 : 0) != 0, (Object)"Blob server port is out of range.");
        TaskManagerActionsImpl taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway);
        RpcCheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
        InetSocketAddress blobServerAddress = new InetSocketAddress(jobMasterGateway.getHostname(), blobPort);
        try {
            BlobCache blobCache = new BlobCache(blobServerAddress, this.taskManagerConfiguration.getConfiguration(), this.haServices.createBlobStore());
            libraryCacheManager = new BlobLibraryCacheManager(blobCache, this.taskManagerConfiguration.getCleanupInterval());
        }
        catch (IOException e) {
            String message = "Could not create BLOB cache or library cache.";
            this.log.error("Could not create BLOB cache or library cache.", (Throwable)e);
            throw new RuntimeException("Could not create BLOB cache or library cache.", e);
        }
        RpcResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(jobManagerLeaderId, jobMasterGateway, this.getRpcService().getExecutor(), this.taskManagerConfiguration.getTimeout());
        RpcPartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway);
        return new JobManagerConnection(jobID, resourceID, jobMasterGateway, jobManagerLeaderId, taskManagerActions, checkpointResponder, libraryCacheManager, resultPartitionConsumableNotifier, partitionStateChecker);
    }

    private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException {
        Preconditions.checkNotNull((Object)jobManagerConnection);
        JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
        jobManagerGateway.disconnectTaskManager(this.getResourceID(), cause);
        jobManagerConnection.getLibraryCacheManager().shutdown();
    }

    private void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            try {
                task.failExternally(cause);
            }
            catch (Throwable t) {
                this.log.error("Could not fail task {}.", (Object)executionAttemptID, (Object)t);
            }
        } else {
            this.log.debug("Cannot find task to fail for execution {}.", (Object)executionAttemptID);
        }
    }

    private void updateTaskExecutionState(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway, TaskExecutionState taskExecutionState) {
        final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
        Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState);
        futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>(){

            @Override
            public Void apply(Throwable value) {
                TaskExecutor.this.failTask(executionAttemptID, value);
                return null;
            }
        }, this.getMainThreadExecutor());
    }

    private void unregisterTaskAndNotifyFinalState(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        Task task = this.taskSlotTable.removeTask(executionAttemptID);
        if (task != null) {
            if (!task.getExecutionState().isTerminal()) {
                try {
                    task.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
                }
                catch (Exception e) {
                    this.log.error("Could not properly fail task.", (Throwable)e);
                }
            }
            this.log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.", new Object[]{task.getExecutionState(), task.getTaskInfo().getTaskName(), task.getExecutionId()});
            AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
            this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, new TaskExecutionState(task.getJobID(), task.getExecutionId(), task.getExecutionState(), task.getFailureCause(), accumulatorSnapshot, task.getMetricGroup().getIOMetricGroup().createSnapshot()));
        } else {
            this.log.error("Cannot find task with ID {} to unregister.", (Object)executionAttemptID);
        }
    }

    private void freeSlot(AllocationID allocationId, Throwable cause) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        try {
            int freedSlotIndex = this.taskSlotTable.freeSlot(allocationId, cause);
            if (freedSlotIndex != -1 && this.isConnectedToResourceManager()) {
                ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManagerConnection.getTargetGateway();
                resourceManagerGateway.notifySlotAvailable(this.resourceManagerConnection.getTargetLeaderId(), this.resourceManagerConnection.getRegistrationId(), new SlotID(this.getResourceID(), freedSlotIndex), allocationId);
            }
        }
        catch (SlotNotFoundException e) {
            this.log.debug("Could not free slot for allocation id {}.", (Object)allocationId, (Object)e);
        }
    }

    private void freeSlot(AllocationID allocationId) {
        this.freeSlot(allocationId, new Exception("The slot " + (Object)((Object)allocationId) + " is beeing freed."));
    }

    private void timeoutSlot(AllocationID allocationId, UUID ticket) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        Preconditions.checkNotNull((Object)ticket);
        if (this.taskSlotTable.isValidTimeout(allocationId, ticket)) {
            this.freeSlot(allocationId, new Exception("The slot " + (Object)((Object)allocationId) + " has timed out."));
        } else {
            this.log.debug("Received an invalid timeout for allocation id {} with ticket {}.", (Object)allocationId, (Object)ticket);
        }
    }

    private boolean isConnectedToResourceManager() {
        return this.resourceManagerConnection != null && this.resourceManagerConnection.isConnected();
    }

    private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) {
        JobManagerConnection jmConnection = this.jobManagerTable.get(jobId);
        return jmConnection != null && Objects.equals(jmConnection.getLeaderId(), leaderId);
    }

    public ResourceID getResourceID() {
        return this.taskManagerLocation.getResourceID();
    }

    void onFatalErrorAsync(final Throwable t) {
        this.runAsync(new Runnable(){

            @Override
            public void run() {
                TaskExecutor.this.onFatalError(t);
            }
        });
    }

    void onFatalError(Throwable t) {
        this.log.error("Fatal error occurred.", t);
        this.fatalErrorHandler.onFatalError(t);
    }

    @VisibleForTesting
    TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
        return this.resourceManagerConnection;
    }

    private class ResourceManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceId) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.log.info("The heartbeat of ResourceManager with id {} timed out.", (Object)resourceId);
                    TaskExecutor.this.closeResourceManagerConnection(new TimeoutException("The heartbeat of ResourceManager with id " + resourceId + " timed out."));
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    private class JobManagerHeartbeatListener
    implements HeartbeatListener<Void, Void> {
        private JobManagerHeartbeatListener() {
        }

        @Override
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    JobManagerConnection jobManagerConnection;
                    TaskExecutor.this.log.info("The heartbeat of JobManager with id {} timed out.", (Object)resourceID);
                    if (TaskExecutor.this.jobManagerConnections.containsKey(resourceID) && (jobManagerConnection = (JobManagerConnection)TaskExecutor.this.jobManagerConnections.get(resourceID)) != null) {
                        TaskExecutor.this.closeJobManagerConnection(jobManagerConnection.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
                    }
                }
            });
        }

        @Override
        public void reportPayload(ResourceID resourceID, Void payload) {
        }

        @Override
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    private class SlotActionsImpl
    implements SlotActions {
        private SlotActionsImpl() {
        }

        @Override
        public void freeSlot(final AllocationID allocationId) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.freeSlot(allocationId);
                }
            });
        }

        @Override
        public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.timeoutSlot(allocationId, ticket);
                }
            });
        }
    }

    private final class TaskManagerActionsImpl
    implements TaskManagerActions {
        private final UUID jobMasterLeaderId;
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerActionsImpl(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway) {
            this.jobMasterLeaderId = (UUID)Preconditions.checkNotNull((Object)jobMasterLeaderId);
            this.jobMasterGateway = (JobMasterGateway)Preconditions.checkNotNull((Object)jobMasterGateway);
        }

        @Override
        public void notifyFinalState(final ExecutionAttemptID executionAttemptID) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.unregisterTaskAndNotifyFinalState(TaskManagerActionsImpl.this.jobMasterLeaderId, TaskManagerActionsImpl.this.jobMasterGateway, executionAttemptID);
                }
            });
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            TaskExecutor.this.log.error(message, cause);
            TaskExecutor.this.fatalErrorHandler.onFatalError(cause);
        }

        @Override
        public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.failTask(executionAttemptID, cause);
                }
            });
        }

        @Override
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            TaskExecutor.this.updateTaskExecutionState(this.jobMasterLeaderId, this.jobMasterGateway, taskExecutionState);
        }
    }

    private final class ResourceManagerRegistrationListener
    implements RegistrationConnectionListener<TaskExecutorRegistrationSuccess> {
        private ResourceManagerRegistrationListener() {
        }

        @Override
        public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
            final ResourceID resourceManagerId = success.getResourceManagerId();
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.establishResourceManagerConnection(resourceManagerId);
                }
            });
        }

        @Override
        public void onRegistrationFailure(Throwable failure) {
            TaskExecutor.this.onFatalErrorAsync(failure);
        }
    }

    private final class JobLeaderListenerImpl
    implements JobLeaderListener {
        private JobLeaderListenerImpl() {
        }

        @Override
        public void jobManagerGainedLeadership(final JobID jobId, final JobMasterGateway jobManagerGateway, final UUID jobLeaderId, final JMTMRegistrationSuccess registrationMessage) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.establishJobManagerConnection(jobId, jobManagerGateway, jobLeaderId, registrationMessage);
                }
            });
        }

        @Override
        public void jobManagerLostLeadership(final JobID jobId, UUID jobLeaderId) {
            TaskExecutor.this.log.info("JobManager for job {} with leader id {} lost leadership.", (Object)jobId, (Object)jobLeaderId);
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.closeJobManagerConnection(jobId, new Exception("Job leader for job id " + jobId + " lost leadership."));
                }
            });
        }

        @Override
        public void handleError(Throwable throwable) {
            TaskExecutor.this.onFatalErrorAsync(throwable);
        }
    }

    private final class ResourceManagerLeaderListener
    implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override
        public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
            TaskExecutor.this.runAsync(new Runnable(){

                @Override
                public void run() {
                    TaskExecutor.this.notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
                }
            });
        }

        @Override
        public void handleError(Exception exception) {
            TaskExecutor.this.onFatalErrorAsync(exception);
        }
    }
}

