/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
import org.apache.flink.runtime.slots.RequirementMatcher;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDeclarativeSlotPool
implements DeclarativeSlotPool {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDeclarativeSlotPool.class);
    private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements;
    private final Time idleSlotTimeout;
    private final Time rpcTimeout;
    private final JobID jobId;
    private final AllocatedSlotPool slotPool;
    private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings;
    private ResourceCounter totalResourceRequirements;
    private ResourceCounter fulfilledResourceRequirements;
    private DeclarativeSlotPool.NewSlotsListener newSlotsListener = DeclarativeSlotPool.NoOpNewSlotsListener.INSTANCE;
    private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher();

    public DefaultDeclarativeSlotPool(JobID jobId, AllocatedSlotPool slotPool, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Time idleSlotTimeout, Time rpcTimeout) {
        this.jobId = jobId;
        this.slotPool = slotPool;
        this.notifyNewResourceRequirements = notifyNewResourceRequirements;
        this.idleSlotTimeout = idleSlotTimeout;
        this.rpcTimeout = rpcTimeout;
        this.totalResourceRequirements = ResourceCounter.empty();
        this.fulfilledResourceRequirements = ResourceCounter.empty();
        this.slotToRequirementProfileMappings = new HashMap<AllocationID, ResourceProfile>();
    }

    @Override
    public void increaseResourceRequirementsBy(ResourceCounter increment) {
        if (increment.isEmpty()) {
            return;
        }
        this.totalResourceRequirements = this.totalResourceRequirements.add(increment);
        this.declareResourceRequirements();
    }

    @Override
    public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
        if (decrement.isEmpty()) {
            return;
        }
        this.totalResourceRequirements = this.totalResourceRequirements.subtract(decrement);
        this.declareResourceRequirements();
    }

    @Override
    public void setResourceRequirements(ResourceCounter resourceRequirements) {
        this.totalResourceRequirements = resourceRequirements;
        this.declareResourceRequirements();
    }

    private void declareResourceRequirements() {
        Collection<ResourceRequirement> resourceRequirements = this.getResourceRequirements();
        LOG.debug("Declare new resource requirements for job {}.{}\trequired resources: {}{}\tacquired resources: {}", new Object[]{this.jobId, System.lineSeparator(), resourceRequirements, System.lineSeparator(), this.fulfilledResourceRequirements});
        this.notifyNewResourceRequirements.accept(resourceRequirements);
    }

    @Override
    public Collection<ResourceRequirement> getResourceRequirements() {
        ArrayList<ResourceRequirement> currentResourceRequirements = new ArrayList<ResourceRequirement>();
        for (Map.Entry<ResourceProfile, Integer> resourceRequirement : this.totalResourceRequirements.getResourcesWithCount()) {
            currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue()));
        }
        return currentResourceRequirements;
    }

    @Override
    public Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime) {
        LOG.debug("Received {} slot offers from TaskExecutor {}.", (Object)offers.size(), (Object)taskManagerLocation);
        ArrayList<SlotOffer> acceptedSlotOffers = new ArrayList<SlotOffer>();
        ArrayList<AllocatedSlot> acceptedSlots = new ArrayList<AllocatedSlot>();
        for (SlotOffer slotOffer : offers) {
            if (this.slotPool.containsSlot(slotOffer.getAllocationId())) {
                acceptedSlotOffers.add(slotOffer);
                continue;
            }
            Optional<AllocatedSlot> acceptedSlot = this.matchOfferWithOutstandingRequirements(slotOffer, taskManagerLocation, taskManagerGateway);
            if (acceptedSlot.isPresent()) {
                acceptedSlotOffers.add(slotOffer);
                acceptedSlots.add(acceptedSlot.get());
                continue;
            }
            LOG.debug("Could not match offer {} to any outstanding requirement.", (Object)slotOffer.getAllocationId());
        }
        this.slotPool.addSlots(acceptedSlots, currentTime);
        if (!acceptedSlots.isEmpty()) {
            LOG.debug("Acquired new resources; new total acquired resources: {}", (Object)this.fulfilledResourceRequirements);
            this.newSlotsListener.notifyNewSlotsAreAvailable(acceptedSlots);
        }
        return acceptedSlotOffers;
    }

    private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(SlotOffer slotOffer, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway) {
        Optional<ResourceProfile> match = this.requirementMatcher.match(slotOffer.getResourceProfile(), this.totalResourceRequirements, this.fulfilledResourceRequirements::getResourceCount);
        if (match.isPresent()) {
            ResourceProfile matchedRequirement = match.get();
            LOG.debug("Matched slot offer {} to requirement {}.", (Object)slotOffer.getAllocationId(), (Object)matchedRequirement);
            this.increaseAvailableResources(ResourceCounter.withResource(matchedRequirement, 1));
            AllocatedSlot allocatedSlot = this.createAllocatedSlot(slotOffer, taskManagerLocation, taskManagerGateway);
            this.slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), matchedRequirement);
            return Optional.of(allocatedSlot);
        }
        return Optional.empty();
    }

    @VisibleForTesting
    ResourceCounter calculateUnfulfilledResources() {
        return this.totalResourceRequirements.subtract(this.fulfilledResourceRequirements);
    }

    private AllocatedSlot createAllocatedSlot(SlotOffer slotOffer, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway) {
        return new AllocatedSlot(slotOffer.getAllocationId(), taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway);
    }

    private void increaseAvailableResources(ResourceCounter acceptedResources) {
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.add(acceptedResources);
    }

    @Nonnull
    private ResourceProfile getMatchingResourceProfile(AllocationID allocationId) {
        return (ResourceProfile)Preconditions.checkNotNull((Object)this.slotToRequirementProfileMappings.get((Object)allocationId), (String)"No matching resource profile found for %s", (Object[])new Object[]{allocationId});
    }

    @Override
    public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile) {
        AllocatedSlot allocatedSlot = this.slotPool.reserveFreeSlot(allocationId);
        Preconditions.checkState((boolean)allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile), (String)"Slot {} cannot fulfill the given requirement. SlotProfile={} Requirement={}", (Object[])new Object[]{allocationId, allocatedSlot.getResourceProfile(), requiredSlotProfile});
        ResourceProfile previouslyMatchedResourceProfile = (ResourceProfile)Preconditions.checkNotNull((Object)this.slotToRequirementProfileMappings.get((Object)allocationId));
        if (!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) {
            LOG.debug("Adjusting requirements because a slot was reserved for a different requirement than initially assumed. Slot={} assumedRequirement={} actualRequirement={}", new Object[]{allocationId, previouslyMatchedResourceProfile, requiredSlotProfile});
            this.updateSlotToRequirementProfileMapping(allocationId, requiredSlotProfile);
            this.adjustRequirements(previouslyMatchedResourceProfile, requiredSlotProfile);
        }
        return allocatedSlot;
    }

    @Override
    public ResourceCounter freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime) {
        LOG.debug("Free reserved slot {}.", (Object)allocationId);
        Optional<AllocatedSlot> freedSlot = this.slotPool.freeReservedSlot(allocationId, currentTime);
        Optional<ResourceCounter> previouslyFulfilledRequirement = freedSlot.map(Collections::singleton).map(this::getFulfilledRequirements);
        freedSlot.ifPresent(allocatedSlot -> {
            this.releasePayload(Collections.singleton(allocatedSlot), cause);
            this.newSlotsListener.notifyNewSlotsAreAvailable(Collections.singletonList(allocatedSlot));
        });
        return previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty);
    }

    private void updateSlotToRequirementProfileMapping(AllocationID allocationId, ResourceProfile matchedResourceProfile) {
        ResourceProfile oldResourceProfile = (ResourceProfile)Preconditions.checkNotNull((Object)this.slotToRequirementProfileMappings.put(allocationId, matchedResourceProfile), (String)"Expected slot profile matching to be non-empty.");
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.add(matchedResourceProfile, 1);
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.subtract(oldResourceProfile, 1);
    }

    private void adjustRequirements(ResourceProfile oldResourceProfile, ResourceProfile newResourceProfile) {
        this.decreaseResourceRequirementsBy(ResourceCounter.withResource(newResourceProfile, 1));
        this.increaseResourceRequirementsBy(ResourceCounter.withResource(oldResourceProfile, 1));
    }

    @Override
    public void registerNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener) {
        Preconditions.checkState((this.newSlotsListener == DeclarativeSlotPool.NoOpNewSlotsListener.INSTANCE ? 1 : 0) != 0, (Object)"DefaultDeclarativeSlotPool only supports a single slot listener.");
        this.newSlotsListener = newSlotsListener;
    }

    @Override
    public ResourceCounter releaseSlots(ResourceID owner, Exception cause) {
        Collection<AllocatedSlot> removedSlots = this.slotPool.removeSlots(owner);
        ResourceCounter previouslyFulfilledRequirements = this.getFulfilledRequirements(removedSlots);
        this.releasePayload(removedSlots, cause);
        this.releaseSlots(removedSlots, (Throwable)cause);
        return previouslyFulfilledRequirements;
    }

    @Override
    public ResourceCounter releaseSlot(AllocationID allocationId, Exception cause) {
        Optional<AllocatedSlot> removedSlot = this.slotPool.removeSlot(allocationId);
        Optional<ResourceCounter> previouslyFulfilledRequirement = removedSlot.map(Collections::singleton).map(this::getFulfilledRequirements);
        removedSlot.ifPresent(allocatedSlot -> {
            this.releasePayload(Collections.singleton(allocatedSlot), cause);
            this.releaseSlots(Collections.singleton(allocatedSlot), (Throwable)cause);
        });
        return previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty);
    }

    private void releasePayload(Iterable<? extends AllocatedSlot> allocatedSlots, Throwable cause) {
        for (AllocatedSlot allocatedSlot : allocatedSlots) {
            allocatedSlot.releasePayload(cause);
        }
    }

    @Override
    public void releaseIdleSlots(long currentTimeMillis) {
        Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = this.slotPool.getFreeSlotsInformation();
        ResourceCounter excessResources = this.fulfilledResourceRequirements.subtract(this.totalResourceRequirements);
        Iterator<AllocatedSlotPool.FreeSlotInfo> freeSlotIterator = freeSlotsInformation.iterator();
        ArrayList<AllocatedSlot> slotsToReturnToOwner = new ArrayList<AllocatedSlot>();
        while (!excessResources.isEmpty() && freeSlotIterator.hasNext()) {
            ResourceProfile matchingProfile;
            AllocatedSlotPool.FreeSlotInfo idleSlot = freeSlotIterator.next();
            if (currentTimeMillis < idleSlot.getFreeSince() + this.idleSlotTimeout.toMilliseconds() || !excessResources.containsResource(matchingProfile = this.getMatchingResourceProfile(idleSlot.getAllocationId()))) continue;
            excessResources = excessResources.subtract(matchingProfile, 1);
            Optional<AllocatedSlot> removedSlot = this.slotPool.removeSlot(idleSlot.getAllocationId());
            AllocatedSlot allocatedSlot = removedSlot.orElseThrow(() -> new IllegalStateException(String.format("Could not find slot for allocation id %s.", new Object[]{idleSlot.getAllocationId()})));
            slotsToReturnToOwner.add(allocatedSlot);
        }
        this.releaseSlots(slotsToReturnToOwner, new FlinkException("Returning idle slots to their owners."));
        LOG.debug("Idle slots have been returned; new total acquired resources: {}", (Object)this.fulfilledResourceRequirements);
    }

    private void releaseSlots(Iterable<AllocatedSlot> slotsToReturnToOwner, Throwable cause) {
        for (AllocatedSlot slotToReturn : slotsToReturnToOwner) {
            Preconditions.checkState((!slotToReturn.isUsed() ? 1 : 0) != 0, (Object)"Free slot must not be used.");
            if (LOG.isDebugEnabled()) {
                LOG.info("Releasing slot [{}].", (Object)slotToReturn.getAllocationId(), (Object)cause);
            } else {
                LOG.info("Releasing slot [{}].", (Object)slotToReturn.getAllocationId());
            }
            ResourceProfile matchingResourceProfile = this.getMatchingResourceProfile(slotToReturn.getAllocationId());
            this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.subtract(matchingResourceProfile, 1);
            this.slotToRequirementProfileMappings.remove((Object)slotToReturn.getAllocationId());
            CompletableFuture<Acknowledge> freeSlotFuture = slotToReturn.getTaskManagerGateway().freeSlot(slotToReturn.getAllocationId(), cause, this.rpcTimeout);
            freeSlotFuture.whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    LOG.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.", new Object[]{slotToReturn.getAllocationId(), slotToReturn.getTaskManagerId(), throwable});
                }
            });
        }
    }

    @Override
    public Collection<SlotInfoWithUtilization> getFreeSlotsInformation() {
        return this.slotPool.getFreeSlotsInformation().stream().map(AllocatedSlotPool.FreeSlotInfo::asSlotInfo).collect(Collectors.toList());
    }

    @Override
    public Collection<? extends SlotInfo> getAllSlotsInformation() {
        return this.slotPool.getAllSlotsInformation();
    }

    @Override
    public boolean containsFreeSlot(AllocationID allocationId) {
        return this.slotPool.containsFreeSlot(allocationId);
    }

    @Override
    public boolean containsSlots(ResourceID owner) {
        return this.slotPool.containsSlots(owner);
    }

    private ResourceCounter getFulfilledRequirements(Iterable<? extends AllocatedSlot> allocatedSlots) {
        ResourceCounter resourceDecrement = ResourceCounter.empty();
        for (AllocatedSlot allocatedSlot : allocatedSlots) {
            ResourceProfile matchingResourceProfile = this.getMatchingResourceProfile(allocatedSlot.getAllocationId());
            resourceDecrement = resourceDecrement.add(matchingResourceProfile, 1);
        }
        return resourceDecrement;
    }

    @VisibleForTesting
    ResourceCounter getFulfilledResourceRequirements() {
        return this.fulfilledResourceRequirements;
    }
}

