/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManagerSlot;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotMatchingStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlot;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlotId;
import org.apache.flink.runtime.rest.messages.taskmanager.SlotInfo;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.OptionalConsumer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlotManagerImpl
implements SlotManager {
    private static final Logger LOG = LoggerFactory.getLogger(SlotManagerImpl.class);
    private final ScheduledExecutor scheduledExecutor;
    private final Time taskManagerRequestTimeout;
    private final Time slotRequestTimeout;
    private final Time taskManagerTimeout;
    private final HashMap<SlotID, TaskManagerSlot> slots;
    private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
    private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
    private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
    private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
    private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;
    private final SlotMatchingStrategy slotMatchingStrategy;
    private ResourceManagerId resourceManagerId;
    private Executor mainThreadExecutor;
    private ResourceActions resourceActions;
    private ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
    private ScheduledFuture<?> slotRequestTimeoutCheck;
    private boolean started;
    private final boolean waitResultConsumedBeforeRelease;
    private final int maxSlotNum;
    private final int redundantTaskManagerNum;
    private boolean failUnfulfillableRequest = true;
    private final WorkerResourceSpec defaultWorkerResourceSpec;
    private final int numSlotsPerWorker;
    private final ResourceProfile defaultSlotResourceProfile;
    private final SlotManagerMetricGroup slotManagerMetricGroup;

    public SlotManagerImpl(ScheduledExecutor scheduledExecutor, SlotManagerConfiguration slotManagerConfiguration, SlotManagerMetricGroup slotManagerMetricGroup) {
        this.scheduledExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)scheduledExecutor);
        Preconditions.checkNotNull((Object)slotManagerConfiguration);
        this.slotMatchingStrategy = slotManagerConfiguration.getSlotMatchingStrategy();
        this.taskManagerRequestTimeout = slotManagerConfiguration.getTaskManagerRequestTimeout();
        this.slotRequestTimeout = slotManagerConfiguration.getSlotRequestTimeout();
        this.taskManagerTimeout = slotManagerConfiguration.getTaskManagerTimeout();
        this.waitResultConsumedBeforeRelease = slotManagerConfiguration.isWaitResultConsumedBeforeRelease();
        this.defaultWorkerResourceSpec = slotManagerConfiguration.getDefaultWorkerResourceSpec();
        this.numSlotsPerWorker = slotManagerConfiguration.getNumSlotsPerWorker();
        this.defaultSlotResourceProfile = SlotManagerUtils.generateDefaultSlotResourceProfile(this.defaultWorkerResourceSpec, this.numSlotsPerWorker);
        this.slotManagerMetricGroup = (SlotManagerMetricGroup)Preconditions.checkNotNull((Object)slotManagerMetricGroup);
        this.maxSlotNum = slotManagerConfiguration.getMaxSlotNum();
        this.redundantTaskManagerNum = slotManagerConfiguration.getRedundantTaskManagerNum();
        this.slots = new HashMap(16);
        this.freeSlots = new LinkedHashMap(16);
        this.taskManagerRegistrations = new HashMap(4);
        this.fulfilledSlotRequests = new HashMap(16);
        this.pendingSlotRequests = new HashMap(16);
        this.pendingSlots = new HashMap(16);
        this.resourceManagerId = null;
        this.resourceActions = null;
        this.mainThreadExecutor = null;
        this.taskManagerTimeoutsAndRedundancyCheck = null;
        this.slotRequestTimeoutCheck = null;
        this.started = false;
    }

    @Override
    public int getNumberRegisteredSlots() {
        return this.slots.size();
    }

    @Override
    public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)instanceId);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberRegisteredSlots();
        }
        return 0;
    }

    @Override
    public int getNumberFreeSlots() {
        return this.freeSlots.size();
    }

    @Override
    public int getNumberFreeSlotsOf(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)instanceId);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberFreeSlots();
        }
        return 0;
    }

    @Override
    public Map<WorkerResourceSpec, Integer> getRequiredResources() {
        int pendingWorkerNum = MathUtils.divideRoundUp((int)this.pendingSlots.size(), (int)this.numSlotsPerWorker);
        return pendingWorkerNum > 0 ? Collections.singletonMap(this.defaultWorkerResourceSpec, pendingWorkerNum) : Collections.emptyMap();
    }

    @Override
    public ResourceProfile getRegisteredResource() {
        return this.taskManagerRegistrations.values().stream().map(TaskManagerRegistration::getTotalResource).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    @Override
    public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
        return Optional.ofNullable(this.taskManagerRegistrations.get((Object)instanceID)).map(TaskManagerRegistration::getTotalResource).orElse(ResourceProfile.ZERO);
    }

    @Override
    public ResourceProfile getFreeResource() {
        return this.taskManagerRegistrations.values().stream().map(taskManagerRegistration -> taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots())).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    @Override
    public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
        return Optional.ofNullable(this.taskManagerRegistrations.get((Object)instanceID)).map(taskManagerRegistration -> taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots())).orElse(ResourceProfile.ZERO);
    }

    @Override
    public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
        return Collections.emptyList();
    }

    @VisibleForTesting
    public int getNumberPendingTaskManagerSlots() {
        return this.pendingSlots.size();
    }

    @Override
    public int getNumberPendingSlotRequests() {
        return this.pendingSlotRequests.size();
    }

    @VisibleForTesting
    public int getNumberAssignedPendingTaskManagerSlots() {
        return (int)this.pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count();
    }

    @Override
    public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
        LOG.info("Starting the SlotManager.");
        this.resourceManagerId = (ResourceManagerId)((Object)Preconditions.checkNotNull((Object)((Object)newResourceManagerId)));
        this.mainThreadExecutor = (Executor)Preconditions.checkNotNull((Object)newMainThreadExecutor);
        this.resourceActions = (ResourceActions)Preconditions.checkNotNull((Object)newResourceActions);
        this.started = true;
        this.taskManagerTimeoutsAndRedundancyCheck = this.scheduledExecutor.scheduleWithFixedDelay(() -> this.mainThreadExecutor.execute(() -> this.checkTaskManagerTimeoutsAndRedundancy()), 0L, this.taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.slotRequestTimeoutCheck = this.scheduledExecutor.scheduleWithFixedDelay(() -> this.mainThreadExecutor.execute(() -> this.checkSlotRequestTimeouts()), 0L, this.slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.registerSlotManagerMetrics();
    }

    private void registerSlotManagerMetrics() {
        this.slotManagerMetricGroup.gauge("taskSlotsAvailable", () -> (long)this.getNumberFreeSlots());
        this.slotManagerMetricGroup.gauge("taskSlotsTotal", () -> (long)this.getNumberRegisteredSlots());
    }

    @Override
    public void suspend() {
        LOG.info("Suspending the SlotManager.");
        if (this.taskManagerTimeoutsAndRedundancyCheck != null) {
            this.taskManagerTimeoutsAndRedundancyCheck.cancel(false);
            this.taskManagerTimeoutsAndRedundancyCheck = null;
        }
        if (this.slotRequestTimeoutCheck != null) {
            this.slotRequestTimeoutCheck.cancel(false);
            this.slotRequestTimeoutCheck = null;
        }
        for (PendingSlotRequest pendingSlotRequest : this.pendingSlotRequests.values()) {
            this.cancelPendingSlotRequest(pendingSlotRequest);
        }
        this.pendingSlotRequests.clear();
        ArrayList<InstanceID> registeredTaskManagers = new ArrayList<InstanceID>(this.taskManagerRegistrations.keySet());
        for (InstanceID registeredTaskManager : registeredTaskManagers) {
            this.unregisterTaskManager(registeredTaskManager, (Exception)((Object)new SlotManagerException("The slot manager is being suspended.")));
        }
        this.resourceManagerId = null;
        this.resourceActions = null;
        this.started = false;
    }

    @Override
    public void close() throws Exception {
        LOG.info("Closing the SlotManager.");
        this.suspend();
        this.slotManagerMetricGroup.close();
    }

    @Override
    public void clearResourceRequirements(JobID jobId) {
    }

    @Override
    public void processResourceRequirements(ResourceRequirements resourceRequirements) {
    }

    @Override
    public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
        this.checkInit();
        if (this.checkDuplicateRequest(slotRequest.getAllocationId())) {
            LOG.debug("Ignoring a duplicate slot request with allocation id {}.", (Object)slotRequest.getAllocationId());
            return false;
        }
        PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
        this.pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
        try {
            this.internalRequestSlot(pendingSlotRequest);
        }
        catch (ResourceManagerException e) {
            this.pendingSlotRequests.remove((Object)slotRequest.getAllocationId());
            throw new ResourceManagerException("Could not fulfill slot request " + (Object)((Object)slotRequest.getAllocationId()) + '.', (Throwable)((Object)e));
        }
        return true;
    }

    @Override
    public boolean unregisterSlotRequest(AllocationID allocationId) {
        this.checkInit();
        PendingSlotRequest pendingSlotRequest = this.pendingSlotRequests.remove((Object)allocationId);
        if (null != pendingSlotRequest) {
            LOG.debug("Cancel slot request {}.", (Object)allocationId);
            this.cancelPendingSlotRequest(pendingSlotRequest);
            return true;
        }
        LOG.debug("No pending slot request with allocation id {} found. Ignoring unregistration request.", (Object)allocationId);
        return false;
    }

    @Override
    public boolean registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport, ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile) {
        this.checkInit();
        LOG.debug("Registering TaskManager {} under {} at the SlotManager.", (Object)taskExecutorConnection.getResourceID().getStringWithMetadata(), (Object)taskExecutorConnection.getInstanceID());
        if (this.taskManagerRegistrations.containsKey((Object)taskExecutorConnection.getInstanceID())) {
            this.reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
            return false;
        }
        if (this.isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
            LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", (Object)this.maxSlotNum);
            this.resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), (Exception)((Object)new FlinkException("The total number of slots exceeds the max limitation.")));
            return false;
        }
        ArrayList<SlotID> reportedSlots = new ArrayList<SlotID>();
        for (SlotStatus slotStatus : initialSlotReport) {
            reportedSlots.add(slotStatus.getSlotID());
        }
        TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection, reportedSlots, totalResourceProfile, defaultSlotResourceProfile);
        this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
        for (SlotStatus slotStatus : initialSlotReport) {
            this.registerSlot(slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID(), slotStatus.getResourceProfile(), taskExecutorConnection);
        }
        return true;
    }

    @Override
    public boolean unregisterTaskManager(InstanceID instanceId, Exception cause) {
        this.checkInit();
        LOG.debug("Unregister TaskManager {} from the SlotManager.", (Object)instanceId);
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.remove((Object)instanceId);
        if (null != taskManagerRegistration) {
            this.internalUnregisterTaskManager(taskManagerRegistration, cause);
            return true;
        }
        LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", (Object)instanceId);
        return false;
    }

    @Override
    public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
        this.checkInit();
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)instanceId);
        if (null != taskManagerRegistration) {
            LOG.debug("Received slot report from instance {}: {}.", (Object)instanceId, (Object)slotReport);
            for (SlotStatus slotStatus : slotReport) {
                this.updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID());
            }
            return true;
        }
        LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", (Object)instanceId);
        return false;
    }

    @Override
    public void freeSlot(SlotID slotId, AllocationID allocationId) {
        this.checkInit();
        TaskManagerSlot slot = this.slots.get(slotId);
        if (null != slot) {
            if (slot.getState() == SlotState.ALLOCATED) {
                if (Objects.equals((Object)allocationId, (Object)slot.getAllocationId())) {
                    TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)slot.getInstanceId());
                    if (taskManagerRegistration == null) {
                        throw new IllegalStateException("Trying to free a slot from a TaskManager " + (Object)((Object)slot.getInstanceId()) + " which has not been registered.");
                    }
                    this.updateSlotState(slot, taskManagerRegistration, null, null);
                } else {
                    LOG.debug("Received request to free slot {} with expected allocation id {}, but actual allocation id {} differs. Ignoring the request.", new Object[]{slotId, allocationId, slot.getAllocationId()});
                }
            } else {
                LOG.debug("Slot {} has not been allocated.", (Object)allocationId);
            }
        } else {
            LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.", (Object)slotId);
        }
    }

    @Override
    public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
        if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {
            Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = this.pendingSlotRequests.entrySet().iterator();
            while (slotRequestIterator.hasNext()) {
                PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();
                if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null || this.isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) continue;
                slotRequestIterator.remove();
                this.resourceActions.notifyAllocationFailure(pendingSlotRequest.getJobId(), pendingSlotRequest.getAllocationId(), (Exception)((Object)new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile())));
            }
        }
        this.failUnfulfillableRequest = failUnfulfillableRequest;
    }

    private PendingSlotRequest findMatchingRequest(ResourceProfile slotResourceProfile) {
        for (PendingSlotRequest pendingSlotRequest : this.pendingSlotRequests.values()) {
            if (pendingSlotRequest.isAssigned() || !slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) continue;
            return pendingSlotRequest;
        }
        return null;
    }

    private Optional<TaskManagerSlot> findMatchingSlot(ResourceProfile requestResourceProfile) {
        Optional<TaskManagerSlot> optionalMatchingSlot = this.slotMatchingStrategy.findMatchingSlot(requestResourceProfile, this.freeSlots.values(), this::getNumberRegisteredSlotsOf);
        optionalMatchingSlot.ifPresent(taskManagerSlot -> {
            Preconditions.checkState((taskManagerSlot.getState() == SlotState.FREE ? 1 : 0) != 0, (String)"TaskManagerSlot %s is not in state FREE but %s.", (Object[])new Object[]{taskManagerSlot.getSlotId(), taskManagerSlot.getState()});
            this.freeSlots.remove(taskManagerSlot.getSlotId());
        });
        return optionalMatchingSlot;
    }

    private void registerSlot(SlotID slotId, AllocationID allocationId, JobID jobId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
        if (this.slots.containsKey(slotId)) {
            this.removeSlot(slotId, (Exception)((Object)new SlotManagerException(String.format("Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.", slotId))));
        }
        TaskManagerSlot slot = this.createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
        PendingTaskManagerSlot pendingTaskManagerSlot = allocationId == null ? this.findExactlyMatchingPendingTaskManagerSlot(resourceProfile) : null;
        if (pendingTaskManagerSlot == null) {
            this.updateSlot(slotId, allocationId, jobId);
        } else {
            this.pendingSlots.remove((Object)pendingTaskManagerSlot.getTaskManagerSlotId());
            PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
            if (assignedPendingSlotRequest == null) {
                this.handleFreeSlot(slot);
            } else {
                assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
                this.allocateSlot(slot, assignedPendingSlotRequest);
            }
        }
    }

    @Nonnull
    private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
        TaskManagerSlot slot = new TaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
        this.slots.put(slotId, slot);
        return slot;
    }

    @Nullable
    private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
        for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
            if (!this.isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) continue;
            return pendingTaskManagerSlot;
        }
        return null;
    }

    private boolean isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) {
        return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
    }

    private boolean isMaxSlotNumExceededAfterRegistration(SlotReport initialSlotReport) {
        if (!this.isMaxSlotNumExceededAfterAdding(initialSlotReport.getNumSlotStatus())) {
            return false;
        }
        return this.isMaxSlotNumExceededAfterAdding(this.getNumNonPendingReportedNewSlots(initialSlotReport));
    }

    private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
        HashSet<TaskManagerSlotId> matchingPendingSlots = new HashSet<TaskManagerSlotId>();
        block0: for (SlotStatus slotStatus : slotReport) {
            if (slotStatus.getAllocationID() != null) continue;
            for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
                if (matchingPendingSlots.contains((Object)pendingTaskManagerSlot.getTaskManagerSlotId()) || !this.isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, slotStatus.getResourceProfile())) continue;
                matchingPendingSlots.add(pendingTaskManagerSlot.getTaskManagerSlotId());
                continue block0;
            }
        }
        return slotReport.getNumSlotStatus() - matchingPendingSlots.size();
    }

    private boolean updateSlot(SlotID slotId, AllocationID allocationId, JobID jobId) {
        TaskManagerSlot slot = this.slots.get(slotId);
        if (slot != null) {
            TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)slot.getInstanceId());
            if (taskManagerRegistration != null) {
                this.updateSlotState(slot, taskManagerRegistration, allocationId, jobId);
                return true;
            }
            throw new IllegalStateException("Trying to update a slot from a TaskManager " + (Object)((Object)slot.getInstanceId()) + " which has not been registered.");
        }
        LOG.debug("Trying to update unknown slot with slot id {}.", (Object)slotId);
        return false;
    }

    private void updateSlotState(TaskManagerSlot slot, TaskManagerRegistration taskManagerRegistration, @Nullable AllocationID allocationId, @Nullable JobID jobId) {
        if (null != allocationId) {
            switch (slot.getState()) {
                case PENDING: {
                    PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();
                    if (Objects.equals((Object)pendingSlotRequest.getAllocationId(), (Object)allocationId)) {
                        this.cancelPendingSlotRequest(pendingSlotRequest);
                        this.pendingSlotRequests.remove((Object)pendingSlotRequest.getAllocationId());
                        slot.completeAllocation(allocationId, jobId);
                    } else {
                        slot.clearPendingSlotRequest();
                        slot.updateAllocation(allocationId, jobId);
                        PendingSlotRequest actualPendingSlotRequest = this.pendingSlotRequests.remove((Object)allocationId);
                        if (actualPendingSlotRequest != null) {
                            this.cancelPendingSlotRequest(actualPendingSlotRequest);
                        }
                        this.rejectPendingSlotRequest(pendingSlotRequest, new Exception("Task manager reported slot " + slot.getSlotId() + " being already allocated."));
                    }
                    taskManagerRegistration.occupySlot();
                    break;
                }
                case ALLOCATED: {
                    if (Objects.equals((Object)allocationId, (Object)slot.getAllocationId())) break;
                    slot.freeSlot();
                    slot.updateAllocation(allocationId, jobId);
                    break;
                }
                case FREE: {
                    this.freeSlots.remove(slot.getSlotId());
                    slot.updateAllocation(allocationId, jobId);
                    taskManagerRegistration.occupySlot();
                }
            }
            this.fulfilledSlotRequests.put(allocationId, slot.getSlotId());
        } else {
            switch (slot.getState()) {
                case FREE: {
                    this.handleFreeSlot(slot);
                    break;
                }
                case PENDING: {
                    break;
                }
                case ALLOCATED: {
                    AllocationID oldAllocation = slot.getAllocationId();
                    slot.freeSlot();
                    this.fulfilledSlotRequests.remove((Object)oldAllocation);
                    taskManagerRegistration.freeSlot();
                    this.handleFreeSlot(slot);
                }
            }
        }
    }

    private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
        ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
        OptionalConsumer.of(this.findMatchingSlot(resourceProfile)).ifPresent(taskManagerSlot -> this.allocateSlot((TaskManagerSlot)taskManagerSlot, pendingSlotRequest)).ifNotPresent(() -> this.fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
    }

    private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
        ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
        Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = this.findFreeMatchingPendingTaskManagerSlot(resourceProfile);
        if (!pendingTaskManagerSlotOptional.isPresent()) {
            pendingTaskManagerSlotOptional = this.allocateResource(resourceProfile);
        }
        OptionalConsumer.of(pendingTaskManagerSlotOptional).ifPresent(pendingTaskManagerSlot -> this.assignPendingTaskManagerSlot(pendingSlotRequest, (PendingTaskManagerSlot)pendingTaskManagerSlot)).ifNotPresent(() -> {
            if (this.failUnfulfillableRequest && !this.isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {
                throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile());
            }
        });
    }

    private Optional<PendingTaskManagerSlot> findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile) {
        for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
            if (pendingTaskManagerSlot.getAssignedPendingSlotRequest() != null || !pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile)) continue;
            return Optional.of(pendingTaskManagerSlot);
        }
        return Optional.empty();
    }

    private boolean isFulfillableByRegisteredOrPendingSlots(ResourceProfile resourceProfile) {
        for (TaskManagerSlot taskManagerSlot : this.slots.values()) {
            if (!taskManagerSlot.getResourceProfile().isMatching(resourceProfile)) continue;
            return true;
        }
        for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
            if (!pendingTaskManagerSlot.getResourceProfile().isMatching(resourceProfile)) continue;
            return true;
        }
        return false;
    }

    private boolean isMaxSlotNumExceededAfterAdding(int numNewSlot) {
        return this.getNumberRegisteredSlots() + this.getNumberPendingTaskManagerSlots() + numNewSlot > this.maxSlotNum;
    }

    private void allocateRedundantTaskManagers(int number) {
        int allocatedNumber = this.allocateResources(number);
        if (number != allocatedNumber) {
            LOG.warn("Expect to allocate {} taskManagers. Actually allocate {} taskManagers.", (Object)number, (Object)allocatedNumber);
        }
    }

    private int allocateResources(int workerNum) {
        int allocatedWorkerNum = 0;
        for (int i = 0; i < workerNum && this.allocateResource(this.defaultSlotResourceProfile).isPresent(); ++i) {
            ++allocatedWorkerNum;
        }
        return allocatedWorkerNum;
    }

    private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile requestedSlotResourceProfile) {
        int numRegisteredSlots = this.getNumberRegisteredSlots();
        int numPendingSlots = this.getNumberPendingTaskManagerSlots();
        if (this.isMaxSlotNumExceededAfterAdding(this.numSlotsPerWorker)) {
            LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", new Object[]{this.numSlotsPerWorker, numPendingSlots + numRegisteredSlots, this.maxSlotNum});
            return Optional.empty();
        }
        if (!this.defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
            return Optional.empty();
        }
        if (!this.resourceActions.allocateResource(this.defaultWorkerResourceSpec)) {
            return Optional.empty();
        }
        PendingTaskManagerSlot pendingTaskManagerSlot = null;
        for (int i = 0; i < this.numSlotsPerWorker; ++i) {
            pendingTaskManagerSlot = new PendingTaskManagerSlot(this.defaultSlotResourceProfile);
            this.pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
        }
        return Optional.of(Preconditions.checkNotNull(pendingTaskManagerSlot, (String)"At least one pending slot should be created."));
    }

    private void assignPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) {
        pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
        pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot);
    }

    private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
        Preconditions.checkState((taskManagerSlot.getState() == SlotState.FREE ? 1 : 0) != 0);
        TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
        TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
        CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<Acknowledge>();
        AllocationID allocationId = pendingSlotRequest.getAllocationId();
        SlotID slotId = taskManagerSlot.getSlotId();
        InstanceID instanceID = taskManagerSlot.getInstanceId();
        taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
        pendingSlotRequest.setRequestFuture(completableFuture);
        this.returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)instanceID);
        if (taskManagerRegistration == null) {
            throw new IllegalStateException("Could not find a registered task manager for instance id " + (Object)((Object)instanceID) + '.');
        }
        taskManagerRegistration.markUsed();
        CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(slotId, pendingSlotRequest.getJobId(), allocationId, pendingSlotRequest.getResourceProfile(), pendingSlotRequest.getTargetAddress(), this.resourceManagerId, this.taskManagerRequestTimeout);
        requestFuture.whenComplete((acknowledge, throwable) -> {
            if (acknowledge != null) {
                completableFuture.complete((Acknowledge)acknowledge);
            } else {
                completableFuture.completeExceptionally((Throwable)throwable);
            }
        });
        completableFuture.whenCompleteAsync((acknowledge, throwable) -> {
            try {
                if (acknowledge != null) {
                    this.updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
                } else {
                    if (throwable instanceof SlotOccupiedException) {
                        SlotOccupiedException exception = (SlotOccupiedException)throwable;
                        this.updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
                    } else {
                        this.removeSlotRequestFromSlot(slotId, allocationId);
                    }
                    if (!(throwable instanceof CancellationException)) {
                        this.handleFailedSlotRequest(slotId, allocationId, (Throwable)throwable);
                    } else {
                        LOG.debug("Slot allocation request {} has been cancelled.", (Object)allocationId, throwable);
                    }
                }
            }
            catch (Exception e) {
                LOG.error("Error while completing the slot allocation.", (Throwable)e);
            }
        }, this.mainThreadExecutor);
    }

    private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest pendingSlotRequest) {
        PendingTaskManagerSlot pendingTaskManagerSlot = pendingSlotRequest.getAssignedPendingTaskManagerSlot();
        if (pendingTaskManagerSlot != null) {
            pendingTaskManagerSlot.unassignPendingSlotRequest();
            pendingSlotRequest.unassignPendingTaskManagerSlot();
        }
    }

    private void handleFreeSlot(TaskManagerSlot freeSlot) {
        Preconditions.checkState((freeSlot.getState() == SlotState.FREE ? 1 : 0) != 0);
        PendingSlotRequest pendingSlotRequest = this.findMatchingRequest(freeSlot.getResourceProfile());
        if (null != pendingSlotRequest) {
            this.allocateSlot(freeSlot, pendingSlotRequest);
        } else {
            this.freeSlots.put(freeSlot.getSlotId(), freeSlot);
        }
    }

    private void removeSlots(Iterable<SlotID> slotsToRemove, Exception cause) {
        for (SlotID slotId : slotsToRemove) {
            this.removeSlot(slotId, cause);
        }
    }

    private void removeSlot(SlotID slotId, Exception cause) {
        TaskManagerSlot slot = this.slots.remove(slotId);
        if (null != slot) {
            AllocationID oldAllocationId;
            this.freeSlots.remove(slotId);
            if (slot.getState() == SlotState.PENDING) {
                this.rejectPendingSlotRequest(slot.getAssignedSlotRequest(), cause);
            }
            if ((oldAllocationId = slot.getAllocationId()) != null) {
                this.fulfilledSlotRequests.remove((Object)oldAllocationId);
                this.resourceActions.notifyAllocationFailure(slot.getJobId(), oldAllocationId, cause);
            }
        } else {
            LOG.debug("There was no slot registered with slot id {}.", (Object)slotId);
        }
    }

    private void removeSlotRequestFromSlot(SlotID slotId, AllocationID allocationId) {
        TaskManagerSlot taskManagerSlot = this.slots.get(slotId);
        if (null != taskManagerSlot) {
            if (taskManagerSlot.getState() == SlotState.PENDING && Objects.equals((Object)allocationId, (Object)taskManagerSlot.getAssignedSlotRequest().getAllocationId())) {
                TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)taskManagerSlot.getInstanceId());
                if (taskManagerRegistration == null) {
                    throw new IllegalStateException("Trying to remove slot request from slot for which there is no TaskManager " + (Object)((Object)taskManagerSlot.getInstanceId()) + " is registered.");
                }
                taskManagerSlot.clearPendingSlotRequest();
                this.updateSlotState(taskManagerSlot, taskManagerRegistration, null, null);
            } else {
                LOG.debug("Ignore slot request removal for slot {}.", (Object)slotId);
            }
        } else {
            LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", (Object)slotId);
        }
    }

    private void handleFailedSlotRequest(SlotID slotId, AllocationID allocationId, Throwable cause) {
        PendingSlotRequest pendingSlotRequest = this.pendingSlotRequests.get((Object)allocationId);
        LOG.debug("Slot request with allocation id {} failed for slot {}.", new Object[]{allocationId, slotId, cause});
        if (null != pendingSlotRequest) {
            pendingSlotRequest.setRequestFuture(null);
            try {
                this.internalRequestSlot(pendingSlotRequest);
            }
            catch (ResourceManagerException e) {
                this.pendingSlotRequests.remove((Object)allocationId);
                this.resourceActions.notifyAllocationFailure(pendingSlotRequest.getJobId(), allocationId, (Exception)((Object)e));
            }
        } else {
            LOG.debug("There was not pending slot request with allocation id {}. Probably the request has been fulfilled or cancelled.", (Object)allocationId);
        }
    }

    private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exception cause) {
        CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
        if (null != request) {
            request.completeExceptionally(new SlotAllocationException(cause));
        } else {
            LOG.debug("Cannot reject pending slot request {}, since no request has been sent.", (Object)pendingSlotRequest.getAllocationId());
        }
    }

    private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
        CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
        this.returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
        if (null != request) {
            request.cancel(false);
        }
    }

    @VisibleForTesting
    void checkTaskManagerTimeoutsAndRedundancy() {
        if (!this.taskManagerRegistrations.isEmpty()) {
            long currentTime = System.currentTimeMillis();
            ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<TaskManagerRegistration>(this.taskManagerRegistrations.size());
            for (TaskManagerRegistration taskManagerRegistration : this.taskManagerRegistrations.values()) {
                if (currentTime - taskManagerRegistration.getIdleSince() < this.taskManagerTimeout.toMilliseconds()) continue;
                timedOutTaskManagers.add(taskManagerRegistration);
            }
            int slotsDiff = this.redundantTaskManagerNum * this.numSlotsPerWorker - this.freeSlots.size();
            if (this.freeSlots.size() == this.slots.size()) {
                this.releaseTaskExecutors(timedOutTaskManagers, timedOutTaskManagers.size());
            } else if (slotsDiff > 0) {
                int requiredTaskManagers = MathUtils.divideRoundUp((int)slotsDiff, (int)this.numSlotsPerWorker);
                this.allocateRedundantTaskManagers(requiredTaskManagers);
            } else {
                int maxReleaseNum = -slotsDiff / this.numSlotsPerWorker;
                this.releaseTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
            }
        }
    }

    private void releaseTaskExecutors(ArrayList<TaskManagerRegistration> timedOutTaskManagers, int releaseNum) {
        for (int index = 0; index < releaseNum; ++index) {
            if (this.waitResultConsumedBeforeRelease) {
                this.releaseTaskExecutorIfPossible(timedOutTaskManagers.get(index));
                continue;
            }
            this.releaseTaskExecutor(timedOutTaskManagers.get(index).getInstanceId());
        }
    }

    private void releaseTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) {
        long idleSince = taskManagerRegistration.getIdleSince();
        taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased().thenAcceptAsync(canBeReleased -> {
            boolean stillIdle;
            InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
            boolean bl = stillIdle = idleSince == taskManagerRegistration.getIdleSince();
            if (stillIdle && canBeReleased.booleanValue()) {
                this.releaseTaskExecutor(timedOutTaskManagerId);
            }
        }, this.mainThreadExecutor);
    }

    private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) {
        FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
        LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", (Object)timedOutTaskManagerId);
        this.resourceActions.releaseResource(timedOutTaskManagerId, (Exception)((Object)cause));
    }

    private void checkSlotRequestTimeouts() {
        if (!this.pendingSlotRequests.isEmpty()) {
            long currentTime = System.currentTimeMillis();
            Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = this.pendingSlotRequests.entrySet().iterator();
            while (slotRequestIterator.hasNext()) {
                PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
                if (currentTime - slotRequest.getCreationTimestamp() < this.slotRequestTimeout.toMilliseconds()) continue;
                slotRequestIterator.remove();
                if (slotRequest.isAssigned()) {
                    this.cancelPendingSlotRequest(slotRequest);
                }
                this.resourceActions.notifyAllocationFailure(slotRequest.getJobId(), slotRequest.getAllocationId(), new TimeoutException("The allocation could not be fulfilled in time."));
            }
        }
    }

    private void internalUnregisterTaskManager(TaskManagerRegistration taskManagerRegistration, Exception cause) {
        Preconditions.checkNotNull((Object)taskManagerRegistration);
        this.removeSlots(taskManagerRegistration.getSlots(), cause);
    }

    private boolean checkDuplicateRequest(AllocationID allocationId) {
        return this.pendingSlotRequests.containsKey((Object)allocationId) || this.fulfilledSlotRequests.containsKey((Object)allocationId);
    }

    private void checkInit() {
        Preconditions.checkState((boolean)this.started, (Object)"The slot manager has not been started.");
    }

    @VisibleForTesting
    TaskManagerSlot getSlot(SlotID slotId) {
        return this.slots.get(slotId);
    }

    @VisibleForTesting
    PendingSlotRequest getSlotRequest(AllocationID allocationId) {
        return this.pendingSlotRequests.get((Object)allocationId);
    }

    @VisibleForTesting
    boolean isTaskManagerIdle(InstanceID instanceId) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get((Object)instanceId);
        if (null != taskManagerRegistration) {
            return taskManagerRegistration.isIdle();
        }
        return false;
    }
}

