/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlotPoolImpl
implements SlotPool,
SlotPoolService {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private static final long STATUS_LOG_INTERVAL_MS = 60000L;
    private final JobID jobId;
    private final HashSet<ResourceID> registeredTaskManagers;
    private final AllocatedSlots allocatedSlots;
    private final AvailableSlots availableSlots;
    private final DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
    private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
    private final Time rpcTimeout;
    private final Time idleSlotTimeout;
    private final Time batchSlotTimeout;
    private final Clock clock;
    private JobMasterId jobMasterId;
    @Nullable
    private ResourceManagerGateway resourceManagerGateway;
    private String jobManagerAddress;
    private ComponentMainThreadExecutor componentMainThreadExecutor;
    protected boolean batchSlotRequestTimeoutCheckEnabled;

    public SlotPoolImpl(JobID jobId, Clock clock, Time rpcTimeout, Time idleSlotTimeout, Time batchSlotTimeout) {
        this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
        this.clock = (Clock)Preconditions.checkNotNull((Object)clock);
        this.rpcTimeout = (Time)Preconditions.checkNotNull((Object)rpcTimeout);
        this.idleSlotTimeout = (Time)Preconditions.checkNotNull((Object)idleSlotTimeout);
        this.batchSlotTimeout = (Time)Preconditions.checkNotNull((Object)batchSlotTimeout);
        this.registeredTaskManagers = new HashSet(16);
        this.allocatedSlots = new AllocatedSlots();
        this.availableSlots = new AvailableSlots();
        this.pendingRequests = new DualKeyLinkedMap(16);
        this.waitingForResourceManager = new LinkedHashMap(16);
        this.jobMasterId = null;
        this.resourceManagerGateway = null;
        this.jobManagerAddress = null;
        this.componentMainThreadExecutor = null;
        this.batchSlotRequestTimeoutCheckEnabled = true;
    }

    @Override
    public Collection<SlotInfo> getAllocatedSlotsInformation() {
        return this.allocatedSlots.listSlotInfo();
    }

    @VisibleForTesting
    AllocatedSlots getAllocatedSlots() {
        return this.allocatedSlots;
    }

    @VisibleForTesting
    AvailableSlots getAvailableSlots() {
        return this.availableSlots;
    }

    @VisibleForTesting
    DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> getPendingRequests() {
        return this.pendingRequests;
    }

    @VisibleForTesting
    Map<SlotRequestId, PendingRequest> getWaitingForResourceManager() {
        return this.waitingForResourceManager;
    }

    @Override
    public void start(@Nonnull JobMasterId jobMasterId, @Nonnull String newJobManagerAddress, @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
        this.jobMasterId = jobMasterId;
        this.jobManagerAddress = newJobManagerAddress;
        this.componentMainThreadExecutor = componentMainThreadExecutor;
        this.scheduleRunAsync(this::checkIdleSlot, this.idleSlotTimeout);
        this.scheduleRunAsync(this::checkBatchSlotTimeout, this.batchSlotTimeout);
        if (this.log.isDebugEnabled()) {
            this.scheduleRunAsync(this::scheduledLogStatus, 60000L, TimeUnit.MILLISECONDS);
        }
    }

    private void cancelPendingSlotRequests() {
        if (this.resourceManagerGateway != null) {
            Set<AllocationID> allocationIds = this.pendingRequests.keySetB();
            for (AllocationID allocationId : allocationIds) {
                this.resourceManagerGateway.cancelSlotRequest(allocationId);
            }
        }
    }

    @Override
    public void close() {
        this.log.info("Stopping SlotPool.");
        this.cancelPendingSlotRequests();
        for (ResourceID taskManagerResourceId : this.registeredTaskManagers) {
            FlinkException cause = new FlinkException("Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
            this.releaseTaskManagerInternal(taskManagerResourceId, (Exception)((Object)cause));
        }
        this.clear();
    }

    @Override
    public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = (ResourceManagerGateway)Preconditions.checkNotNull((Object)resourceManagerGateway);
        for (PendingRequest pendingRequest : this.waitingForResourceManager.values()) {
            this.requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        this.waitingForResourceManager.clear();
    }

    @Override
    public void disconnectResourceManager() {
        this.resourceManagerGateway = null;
    }

    @Nonnull
    private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {
        if (this.resourceManagerGateway == null) {
            this.stashRequestWaitingForResourceManager(pendingRequest);
        } else {
            this.requestSlotFromResourceManager(this.resourceManagerGateway, pendingRequest);
        }
        return pendingRequest.getAllocatedSlotFuture();
    }

    private void requestSlotFromResourceManager(ResourceManagerGateway resourceManagerGateway, PendingRequest pendingRequest) {
        Preconditions.checkNotNull((Object)resourceManagerGateway);
        Preconditions.checkNotNull((Object)pendingRequest);
        AllocationID allocationId = new AllocationID();
        pendingRequest.setAllocationId(allocationId);
        this.pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
        pendingRequest.getAllocatedSlotFuture().whenComplete((allocatedSlot, throwable) -> {
            Optional<AllocationID> updatedAllocationId;
            if (throwable != null && (updatedAllocationId = pendingRequest.getAllocationId()).isPresent()) {
                resourceManagerGateway.cancelSlotRequest(updatedAllocationId.get());
            }
        });
        this.log.info("Requesting new slot [{}] and profile {} with allocation id {} from resource manager.", new Object[]{pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile(), allocationId});
        CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(this.jobMasterId, new SlotRequest(this.jobId, allocationId, pendingRequest.getResourceProfile(), this.jobManagerAddress), this.rpcTimeout);
        FutureUtils.whenCompleteAsyncIfNotDone(rmResponse, this.componentMainThreadExecutor, (ignored, failure) -> {
            if (failure != null) {
                this.slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), (Throwable)failure);
            }
        });
    }

    private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) {
        PendingRequest request = this.pendingRequests.getValueByKeyA(slotRequestID);
        if (request != null) {
            if (this.isBatchRequestAndFailureCanBeIgnored(request, failure)) {
                this.log.debug("Ignoring failed request to the resource manager for a batch slot request.");
            } else {
                this.removePendingRequest(slotRequestID);
                request.getAllocatedSlotFuture().completeExceptionally((Throwable)((Object)new NoResourceAvailableException("No pooled slot available and request to ResourceManager for new slot failed", failure)));
            }
        } else if (this.log.isDebugEnabled()) {
            this.log.debug("Unregistered slot request [{}] failed.", (Object)slotRequestID, (Object)failure);
        }
    }

    private void stashRequestWaitingForResourceManager(PendingRequest pendingRequest) {
        this.log.info("Cannot serve slot request, no ResourceManager connected. Adding as pending request [{}]", (Object)pendingRequest.getSlotRequestId());
        this.waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
    }

    private boolean isBatchRequestAndFailureCanBeIgnored(PendingRequest request, Throwable failure) {
        return request.isBatchRequest && !ExceptionUtils.findThrowable((Throwable)failure, UnfulfillableSlotRequestException.class).isPresent();
    }

    @Override
    public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        this.log.debug("Releasing slot [{}] because: {}", (Object)slotRequestId, (Object)(cause != null ? cause.getMessage() : "null"));
        this.releaseSingleSlot(slotRequestId, cause);
    }

    @Override
    public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nonnull ResourceProfile requirementProfile) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        AllocatedSlot allocatedSlot = this.availableSlots.tryRemove(allocationID);
        if (allocatedSlot != null) {
            this.allocatedSlots.add(slotRequestId, allocatedSlot);
            return Optional.of(allocatedSlot);
        }
        return Optional.empty();
    }

    @Override
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nullable Time timeout) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);
        if (timeout != null) {
            FutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(), timeout.toMilliseconds(), TimeUnit.MILLISECONDS, this.componentMainThreadExecutor).whenComplete((ignored, throwable) -> {
                if (throwable instanceof TimeoutException) {
                    this.timeoutPendingSlotRequest(slotRequestId);
                }
            });
        }
        return this.requestNewAllocatedSlotInternal(pendingRequest).thenApply(Function.identity());
    }

    @Override
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        PendingRequest pendingRequest = PendingRequest.createBatchRequest(slotRequestId, resourceProfile);
        return this.requestNewAllocatedSlotInternal(pendingRequest).thenApply(Function.identity());
    }

    @Override
    public void disableBatchSlotRequestTimeoutCheck() {
        this.batchSlotRequestTimeoutCheckEnabled = false;
    }

    @Override
    @Nonnull
    public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
        Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = this.availableSlots.getSlotsByTaskManager();
        Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager = this.allocatedSlots.getSlotsByTaskManager();
        return availableSlotsByTaskManager.entrySet().stream().flatMap(entry -> {
            int numberAllocatedSlots = allocatedSlotsByTaskManager.getOrDefault(entry.getKey(), Collections.emptySet()).size();
            int numberAvailableSlots = ((Set)entry.getValue()).size();
            double taskExecutorUtilization = (double)numberAllocatedSlots / (double)(numberAllocatedSlots + numberAvailableSlots);
            return ((Set)entry.getValue()).stream().map(slot -> SlotInfoWithUtilization.from(slot, taskExecutorUtilization));
        }).collect(Collectors.toList());
    }

    private void releaseSingleSlot(SlotRequestId slotRequestId, Throwable cause) {
        PendingRequest pendingRequest = this.removePendingRequest(slotRequestId);
        if (pendingRequest != null) {
            this.failPendingRequest(pendingRequest, (Exception)((Object)new FlinkException("Pending slot request with " + (Object)((Object)slotRequestId) + " has been released.", cause)));
        } else {
            AllocatedSlot allocatedSlot = this.allocatedSlots.remove(slotRequestId);
            if (allocatedSlot != null) {
                allocatedSlot.releasePayload(cause);
                this.tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
            } else {
                this.log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", (Object)slotRequestId);
            }
        }
    }

    @Nullable
    private PendingRequest removePendingRequest(SlotRequestId requestId) {
        PendingRequest result = (PendingRequest)this.waitingForResourceManager.remove((Object)requestId);
        if (result != null) {
            assert (!this.pendingRequests.containsKeyA(requestId)) : "A pending requests should only be part of either the pendingRequests or waitingForResourceManager but not both.";
            return result;
        }
        return this.pendingRequests.removeKeyA(requestId);
    }

    private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
        Preconditions.checkNotNull((Object)pendingRequest);
        Preconditions.checkNotNull((Object)e);
        if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
            this.log.info("Failing pending slot request [{}]: {}", (Object)pendingRequest.getSlotRequestId(), (Object)e.getMessage());
            pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
        }
    }

    private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
        Preconditions.checkState((!allocatedSlot.isUsed() ? 1 : 0) != 0, (Object)"Provided slot is still in use.");
        PendingRequest pendingRequest = this.findMatchingPendingRequest(allocatedSlot);
        if (pendingRequest != null) {
            this.log.debug("Fulfilling pending slot request [{}] with slot [{}]", (Object)pendingRequest.getSlotRequestId(), (Object)allocatedSlot.getAllocationId());
            this.removePendingRequest(pendingRequest.getSlotRequestId());
            this.allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
            pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
            Optional<AllocationID> allocationIdOfRequest = pendingRequest.getAllocationId();
            if (allocationIdOfRequest.isPresent()) {
                this.maybeRemapOrphanedAllocation(allocationIdOfRequest.get(), allocatedSlot.getAllocationId());
            }
        } else {
            this.log.debug("Adding slot [{}] to available slots", (Object)allocatedSlot.getAllocationId());
            this.availableSlots.add(allocatedSlot, this.clock.relativeTimeMillis());
        }
    }

    private PendingRequest findMatchingPendingRequest(AllocatedSlot slot) {
        ResourceProfile slotResources = slot.getResourceProfile();
        for (PendingRequest request : this.pendingRequests.values()) {
            if (!slotResources.isMatching(request.getResourceProfile())) continue;
            return request;
        }
        for (PendingRequest request : this.waitingForResourceManager.values()) {
            if (!slotResources.isMatching(request.getResourceProfile())) continue;
            return request;
        }
        return null;
    }

    private void maybeRemapOrphanedAllocation(AllocationID allocationIdOfRequest, AllocationID allocationIdOfSlot) {
        if (!allocationIdOfRequest.equals((Object)allocationIdOfSlot)) {
            PendingRequest requestOfAllocatedSlot = this.pendingRequests.getValueByKeyB(allocationIdOfSlot);
            if (requestOfAllocatedSlot != null) {
                requestOfAllocatedSlot.setAllocationId(allocationIdOfRequest);
                this.pendingRequests.put(requestOfAllocatedSlot.getSlotRequestId(), allocationIdOfRequest, requestOfAllocatedSlot);
            } else if (this.resourceManagerGateway != null) {
                this.resourceManagerGateway.cancelSlotRequest(allocationIdOfRequest);
            }
        }
    }

    @Override
    public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
        ArrayList<SlotOffer> result = new ArrayList<SlotOffer>(offers.size());
        for (SlotOffer offer : offers) {
            if (!this.offerSlot(taskManagerLocation, taskManagerGateway, offer)) continue;
            result.add(offer);
        }
        return result;
    }

    boolean offerSlot(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, SlotOffer slotOffer) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        ResourceID resourceID = taskManagerLocation.getResourceID();
        AllocationID allocationID = slotOffer.getAllocationId();
        if (!this.registeredTaskManagers.contains(resourceID)) {
            this.log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", (Object)slotOffer.getAllocationId(), (Object)taskManagerLocation);
            return false;
        }
        AllocatedSlot existingSlot = this.allocatedSlots.get(allocationID);
        if (existingSlot != null || (existingSlot = this.availableSlots.get(allocationID)) != null) {
            SlotID newSlotId;
            SlotID existingSlotId = existingSlot.getSlotId();
            if (existingSlotId.equals(newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex()))) {
                this.log.info("Received repeated offer for slot [{}]. Ignoring.", (Object)allocationID);
                return true;
            }
            return false;
        }
        AllocatedSlot allocatedSlot = new AllocatedSlot(allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway);
        this.tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
        return true;
    }

    @Override
    public Optional<ResourceID> failAllocation(AllocationID allocationID, Exception cause) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        PendingRequest pendingRequest = this.pendingRequests.getValueByKeyB(allocationID);
        if (pendingRequest != null) {
            if (this.isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {
                this.log.debug("Ignoring allocation failure for batch slot request {}.", (Object)pendingRequest.getSlotRequestId());
            } else {
                this.removePendingRequest(pendingRequest.getSlotRequestId());
                this.failPendingRequest(pendingRequest, cause);
            }
            return Optional.empty();
        }
        return this.tryFailingAllocatedSlot(allocationID, cause);
    }

    @Override
    public Optional<ResourceID> failAllocation(@Nullable ResourceID resourceId, AllocationID allocationID, Exception cause) {
        return this.failAllocation(allocationID, cause);
    }

    private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
        AllocatedSlot allocatedSlot = this.availableSlots.tryRemove(allocationID);
        if (allocatedSlot == null) {
            allocatedSlot = this.allocatedSlots.remove(allocationID);
        }
        if (allocatedSlot != null) {
            this.log.debug("Failed allocated slot [{}]: {}", (Object)allocationID, (Object)cause.getMessage());
            allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, this.rpcTimeout);
            allocatedSlot.releasePayload(cause);
            ResourceID taskManagerId = allocatedSlot.getTaskManagerId();
            if (!this.availableSlots.containsTaskManager(taskManagerId) && !this.allocatedSlots.containResource(taskManagerId)) {
                return Optional.of(taskManagerId);
            }
        }
        return Optional.empty();
    }

    @Override
    public boolean registerTaskManager(ResourceID resourceID) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        this.log.debug("Register new TaskExecutor {}.", (Object)resourceID.getStringWithMetadata());
        return this.registeredTaskManagers.add(resourceID);
    }

    @Override
    public boolean releaseTaskManager(ResourceID resourceId, Exception cause) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        if (this.registeredTaskManagers.remove(resourceId)) {
            this.releaseTaskManagerInternal(resourceId, cause);
            return true;
        }
        return false;
    }

    @Override
    public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
        Set<AllocatedSlot> availableSlotsForTaskManager = this.availableSlots.getSlotsForTaskManager(taskManagerId);
        Set<AllocatedSlot> allocatedSlotsForTaskManager = this.allocatedSlots.getSlotsForTaskManager(taskManagerId);
        ArrayList<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<AllocatedSlotInfo>(availableSlotsForTaskManager.size() + allocatedSlotsForTaskManager.size());
        for (AllocatedSlot allocatedSlot : Iterables.concat(availableSlotsForTaskManager, allocatedSlotsForTaskManager)) {
            allocatedSlotInfos.add(new AllocatedSlotInfo(allocatedSlot.getPhysicalSlotNumber(), allocatedSlot.getAllocationId()));
        }
        return new AllocatedSlotReport(this.jobId, allocatedSlotInfos);
    }

    @VisibleForTesting
    protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
        this.log.info("Pending slot request [{}] timed out.", (Object)slotRequestId);
        PendingRequest pendingRequest = this.removePendingRequest(slotRequestId);
        if (pendingRequest != null) {
            pendingRequest.getAllocatedSlotFuture().completeExceptionally(new TimeoutException("Pending slot request timed out in SlotPool."));
        }
    }

    private void releaseTaskManagerInternal(ResourceID resourceId, Exception cause) {
        HashSet<AllocatedSlot> removedSlots = new HashSet<AllocatedSlot>(this.allocatedSlots.removeSlotsForTaskManager(resourceId));
        for (AllocatedSlot allocatedSlot : removedSlots) {
            allocatedSlot.releasePayload(cause);
        }
        removedSlots.addAll(this.availableSlots.removeAllForTaskManager(resourceId));
        for (AllocatedSlot removedSlot : removedSlots) {
            TaskManagerGateway taskManagerGateway = removedSlot.getTaskManagerGateway();
            taskManagerGateway.freeSlot(removedSlot.getAllocationId(), cause, this.rpcTimeout);
        }
    }

    protected void checkIdleSlot() {
        long currentRelativeTimeMillis = this.clock.relativeTimeMillis();
        ArrayList<AllocatedSlot> expiredSlots = new ArrayList<AllocatedSlot>(this.availableSlots.size());
        for (SlotAndTimestamp slotAndTimestamp : this.availableSlots.availableSlots.values()) {
            if (currentRelativeTimeMillis - slotAndTimestamp.timestamp <= this.idleSlotTimeout.toMilliseconds()) continue;
            expiredSlots.add(slotAndTimestamp.slot);
        }
        FlinkException cause = new FlinkException("Releasing idle slot.");
        for (AllocatedSlot expiredSlot : expiredSlots) {
            AllocationID allocationID = expiredSlot.getAllocationId();
            if (this.availableSlots.tryRemove(allocationID) == null) continue;
            this.log.info("Releasing idle slot [{}].", (Object)allocationID);
            CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(allocationID, cause, this.rpcTimeout);
            FutureUtils.whenCompleteAsyncIfNotDone(freeSlotFuture, this.componentMainThreadExecutor, (ignored, throwable) -> {
                if (throwable != null) {
                    this.log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.", new Object[]{allocationID, expiredSlot.getTaskManagerId(), throwable});
                }
            });
        }
        this.scheduleRunAsync(this::checkIdleSlot, this.idleSlotTimeout);
    }

    protected void checkBatchSlotTimeout() {
        if (!this.batchSlotRequestTimeoutCheckEnabled) {
            return;
        }
        Collection<PendingRequest> pendingBatchRequests = this.getPendingBatchRequests();
        if (!pendingBatchRequests.isEmpty()) {
            Set<ResourceProfile> allocatedResourceProfiles = this.getAllocatedResourceProfiles();
            Map<Boolean, List<PendingRequest>> fulfillableAndUnfulfillableRequests = pendingBatchRequests.stream().collect(Collectors.partitioningBy(this.canBeFulfilledWithAllocatedSlot(allocatedResourceProfiles)));
            List<PendingRequest> fulfillableRequests = fulfillableAndUnfulfillableRequests.get(true);
            List<PendingRequest> unfulfillableRequests = fulfillableAndUnfulfillableRequests.get(false);
            long currentTimestamp = this.clock.relativeTimeMillis();
            for (PendingRequest fulfillableRequest : fulfillableRequests) {
                fulfillableRequest.markFulfillable();
            }
            for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
                unfulfillableRequest.markUnfulfillable(currentTimestamp);
                if (unfulfillableRequest.getUnfulfillableSince() + this.batchSlotTimeout.toMilliseconds() > currentTimestamp) continue;
                this.timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
            }
        }
        this.scheduleRunAsync(this::checkBatchSlotTimeout, this.batchSlotTimeout);
    }

    private Set<ResourceProfile> getAllocatedResourceProfiles() {
        return Stream.concat(this.getAvailableSlotsInformation().stream(), this.getAllocatedSlotsInformation().stream()).map(SlotInfo::getResourceProfile).collect(Collectors.toSet());
    }

    private Collection<PendingRequest> getPendingBatchRequests() {
        return Stream.concat(this.pendingRequests.values().stream(), this.waitingForResourceManager.values().stream()).filter(PendingRequest::isBatchRequest).collect(Collectors.toList());
    }

    private Predicate<PendingRequest> canBeFulfilledWithAllocatedSlot(Set<ResourceProfile> allocatedResourceProfiles) {
        return pendingRequest -> {
            for (ResourceProfile allocatedResourceProfile : allocatedResourceProfiles) {
                if (!allocatedResourceProfile.isMatching(pendingRequest.getResourceProfile())) continue;
                return true;
            }
            return false;
        };
    }

    private void clear() {
        this.availableSlots.clear();
        this.allocatedSlots.clear();
        this.pendingRequests.clear();
        this.waitingForResourceManager.clear();
        this.registeredTaskManagers.clear();
    }

    private void scheduledLogStatus() {
        this.log.debug(this.printStatus());
        this.scheduleRunAsync(this::scheduledLogStatus, 60000L, TimeUnit.MILLISECONDS);
    }

    private String printStatus() {
        StringBuilder builder = new StringBuilder(1024).append("Slot Pool Status:\n");
        builder.append("\tstatus: ");
        if (this.resourceManagerGateway != null) {
            builder.append("connected to ").append(this.resourceManagerGateway.getAddress()).append('\n');
        } else {
            builder.append("unconnected and waiting for ResourceManager ").append(this.waitingForResourceManager).append('\n');
        }
        builder.append("\tregistered TaskManagers: ").append(this.registeredTaskManagers).append('\n');
        builder.append("\tavailable slots: ").append(this.availableSlots.printAllSlots()).append('\n');
        builder.append("\tallocated slots: ").append(this.allocatedSlots.printAllSlots()).append('\n');
        builder.append("\tpending requests: ").append(this.pendingRequests.values()).append('\n');
        builder.append("\t}\n");
        return builder.toString();
    }

    protected void runAsync(Runnable runnable) {
        this.componentMainThreadExecutor.execute(runnable);
    }

    protected void scheduleRunAsync(Runnable runnable, Time delay) {
        this.scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
    }

    protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
        this.componentMainThreadExecutor.schedule(runnable, delay, unit);
    }

    private static class SlotAndTimestamp {
        private final AllocatedSlot slot;
        private final long timestamp;

        SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
            this.slot = slot;
            this.timestamp = timestamp;
        }

        public AllocatedSlot slot() {
            return this.slot;
        }

        public long timestamp() {
            return this.timestamp;
        }

        public String toString() {
            return this.slot + " @ " + this.timestamp;
        }
    }

    protected static class PendingRequest {
        private final SlotRequestId slotRequestId;
        private final ResourceProfile resourceProfile;
        private final boolean isBatchRequest;
        private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;
        @Nullable
        private AllocationID allocationId;
        private long unfillableSince;

        private PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean isBatchRequest) {
            this(slotRequestId, resourceProfile, isBatchRequest, new CompletableFuture<AllocatedSlot>());
        }

        private PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean isBatchRequest, CompletableFuture<AllocatedSlot> allocatedSlotFuture) {
            this.slotRequestId = (SlotRequestId)((Object)Preconditions.checkNotNull((Object)((Object)slotRequestId)));
            this.resourceProfile = (ResourceProfile)Preconditions.checkNotNull((Object)resourceProfile);
            this.isBatchRequest = isBatchRequest;
            this.allocatedSlotFuture = (CompletableFuture)Preconditions.checkNotNull(allocatedSlotFuture);
            this.unfillableSince = Long.MAX_VALUE;
        }

        static PendingRequest createStreamingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            return new PendingRequest(slotRequestId, resourceProfile, false);
        }

        static PendingRequest createBatchRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            return new PendingRequest(slotRequestId, resourceProfile, true);
        }

        public SlotRequestId getSlotRequestId() {
            return this.slotRequestId;
        }

        public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
            return this.allocatedSlotFuture;
        }

        public boolean isBatchRequest() {
            return this.isBatchRequest;
        }

        public ResourceProfile getResourceProfile() {
            return this.resourceProfile;
        }

        public String toString() {
            return "PendingRequest{slotRequestId=" + (Object)((Object)this.slotRequestId) + ", resourceProfile=" + this.resourceProfile + ", allocatedSlotFuture=" + this.allocatedSlotFuture + '}';
        }

        void markFulfillable() {
            this.unfillableSince = Long.MAX_VALUE;
        }

        void markUnfulfillable(long currentTimestamp) {
            if (this.isFulfillable()) {
                this.unfillableSince = currentTimestamp;
            }
        }

        private boolean isFulfillable() {
            return this.unfillableSince == Long.MAX_VALUE;
        }

        long getUnfulfillableSince() {
            return this.unfillableSince;
        }

        void setAllocationId(AllocationID allocationId) {
            this.allocationId = allocationId;
        }

        Optional<AllocationID> getAllocationId() {
            return Optional.ofNullable(this.allocationId);
        }
    }

    protected static class AvailableSlots {
        private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = new HashMap();
        private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost = new HashMap();
        private final HashMap<AllocationID, SlotAndTimestamp> availableSlots = new HashMap();

        AvailableSlots() {
        }

        void add(AllocatedSlot slot, long timestamp) {
            Preconditions.checkNotNull((Object)slot);
            SlotAndTimestamp previous = this.availableSlots.put(slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
            if (previous != null) {
                throw new IllegalStateException("slot already contained");
            }
            ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
            String host = slot.getTaskManagerLocation().getFQDNHostname();
            Set slotsForTaskManager = this.availableSlotsByTaskManager.computeIfAbsent(resourceID, k -> new HashSet());
            slotsForTaskManager.add(slot);
            Set slotsForHost = this.availableSlotsByHost.computeIfAbsent(host, k -> new HashSet());
            slotsForHost.add(slot);
        }

        boolean contains(AllocationID slotId) {
            return this.availableSlots.containsKey((Object)slotId);
        }

        AllocatedSlot get(AllocationID allocationID) {
            SlotAndTimestamp slotAndTimestamp = this.availableSlots.get((Object)allocationID);
            if (slotAndTimestamp != null) {
                return slotAndTimestamp.slot();
            }
            return null;
        }

        Set<AllocatedSlot> removeAllForTaskManager(ResourceID taskManager) {
            Set<AllocatedSlot> slotsForTm = this.availableSlotsByTaskManager.remove(taskManager);
            if (slotsForTm != null && slotsForTm.size() > 0) {
                String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
                Set<AllocatedSlot> slotsForHost = this.availableSlotsByHost.get(host);
                for (AllocatedSlot slot : slotsForTm) {
                    this.availableSlots.remove((Object)slot.getAllocationId());
                    slotsForHost.remove(slot);
                }
                if (slotsForHost.isEmpty()) {
                    this.availableSlotsByHost.remove(host);
                }
                return slotsForTm;
            }
            return Collections.emptySet();
        }

        AllocatedSlot tryRemove(AllocationID slotId) {
            SlotAndTimestamp sat = this.availableSlots.remove((Object)slotId);
            if (sat != null) {
                AllocatedSlot slot = sat.slot();
                ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
                String host = slot.getTaskManagerLocation().getFQDNHostname();
                Set<AllocatedSlot> slotsForTm = this.availableSlotsByTaskManager.get(resourceID);
                Set<AllocatedSlot> slotsForHost = this.availableSlotsByHost.get(host);
                slotsForTm.remove(slot);
                slotsForHost.remove(slot);
                if (slotsForTm.isEmpty()) {
                    this.availableSlotsByTaskManager.remove(resourceID);
                }
                if (slotsForHost.isEmpty()) {
                    this.availableSlotsByHost.remove(host);
                }
                return slot;
            }
            return null;
        }

        @Nonnull
        List<SlotInfo> listSlotInfo() {
            return this.availableSlots.values().stream().map(SlotAndTimestamp::slot).collect(Collectors.toList());
        }

        private void remove(AllocationID slotId) throws IllegalStateException {
            if (this.tryRemove(slotId) == null) {
                throw new IllegalStateException("slot not contained");
            }
        }

        String printAllSlots() {
            return this.availableSlots.values().toString();
        }

        @VisibleForTesting
        boolean containsTaskManager(ResourceID resourceID) {
            return this.availableSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        public int size() {
            return this.availableSlots.size();
        }

        @VisibleForTesting
        void clear() {
            this.availableSlots.clear();
            this.availableSlotsByTaskManager.clear();
            this.availableSlotsByHost.clear();
        }

        Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
            return this.availableSlotsByTaskManager.getOrDefault(resourceId, Collections.emptySet());
        }

        Map<ResourceID, Set<AllocatedSlot>> getSlotsByTaskManager() {
            return Collections.unmodifiableMap(this.availableSlotsByTaskManager);
        }
    }

    static class AllocatedSlots {
        private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager = new HashMap<ResourceID, Set<AllocatedSlot>>(16);
        private final DualKeyLinkedMap<AllocationID, SlotRequestId, AllocatedSlot> allocatedSlotsById = new DualKeyLinkedMap(16);

        AllocatedSlots() {
        }

        void add(SlotRequestId slotRequestId, AllocatedSlot allocatedSlot) {
            this.allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
            ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
            Set slotsForTaskManager = this.allocatedSlotsByTaskManager.computeIfAbsent(resourceID, resourceId -> new HashSet(4));
            slotsForTaskManager.add(allocatedSlot);
        }

        @Nullable
        AllocatedSlot get(AllocationID allocationID) {
            return this.allocatedSlotsById.getValueByKeyA(allocationID);
        }

        boolean contains(AllocationID slotAllocationId) {
            return this.allocatedSlotsById.containsKeyA(slotAllocationId);
        }

        @Nullable
        AllocatedSlot remove(AllocationID allocationID) {
            AllocatedSlot allocatedSlot = this.allocatedSlotsById.removeKeyA(allocationID);
            if (allocatedSlot != null) {
                this.removeAllocatedSlot(allocatedSlot);
            }
            return allocatedSlot;
        }

        @Nullable
        AllocatedSlot remove(SlotRequestId slotRequestId) {
            AllocatedSlot allocatedSlot = this.allocatedSlotsById.removeKeyB(slotRequestId);
            if (allocatedSlot != null) {
                this.removeAllocatedSlot(allocatedSlot);
            }
            return allocatedSlot;
        }

        private void removeAllocatedSlot(AllocatedSlot allocatedSlot) {
            Preconditions.checkNotNull((Object)allocatedSlot);
            ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
            Set<AllocatedSlot> slotsForTM = this.allocatedSlotsByTaskManager.get(taskManagerId);
            slotsForTM.remove(allocatedSlot);
            if (slotsForTM.isEmpty()) {
                this.allocatedSlotsByTaskManager.remove(taskManagerId);
            }
        }

        Set<AllocatedSlot> removeSlotsForTaskManager(ResourceID resourceID) {
            Set<AllocatedSlot> slotsForTaskManager = this.allocatedSlotsByTaskManager.remove(resourceID);
            if (slotsForTaskManager != null) {
                for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
                    this.allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
                }
                return slotsForTaskManager;
            }
            return Collections.emptySet();
        }

        void clear() {
            this.allocatedSlotsById.clear();
            this.allocatedSlotsByTaskManager.clear();
        }

        String printAllSlots() {
            return this.allocatedSlotsByTaskManager.values().toString();
        }

        @VisibleForTesting
        boolean containResource(ResourceID resourceID) {
            return this.allocatedSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        int size() {
            return this.allocatedSlotsById.size();
        }

        @VisibleForTesting
        Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
            return this.allocatedSlotsByTaskManager.getOrDefault(resourceId, Collections.emptySet());
        }

        Collection<SlotInfo> listSlotInfo() {
            return new ArrayList<SlotInfo>(this.allocatedSlotsById.values());
        }

        Map<ResourceID, Set<AllocatedSlot>> getSlotsByTaskManager() {
            return Collections.unmodifiableMap(this.allocatedSlotsByTaskManager);
        }
    }
}

