/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DummyPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SlotPoolImplTest
extends TestLogger {
    private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT;
    private TaskManagerLocation taskManagerLocation;
    private SimpleAckingTaskManagerGateway taskManagerGateway;
    private TestingResourceManagerGateway resourceManagerGateway;
    private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

    @Before
    public void setUp() throws Exception {
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.taskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.resourceManagerGateway = new TestingResourceManagerGateway();
    }

    @Test
    public void testAllocateSimpleSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            SlotRequestId requestId = new SlotRequestId();
            CompletableFuture<PhysicalSlot> future = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, requestId);
            Assert.assertFalse((boolean)future.isDone());
            SlotRequest slotRequest = (SlotRequest)slotRequestFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertTrue((boolean)this.registerAndOfferSlot(this.taskManagerLocation, slotPool, slotRequest.getAllocationId()));
            PhysicalSlot physicalSlot = future.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future.isDone());
            Assert.assertEquals((Object)this.taskManagerLocation, (Object)physicalSlot.getTaskManagerLocation());
            Assert.assertEquals((Object)slotRequest.getAllocationId(), (Object)physicalSlot.getAllocationId());
        }
    }

    @Test
    public void testAllocationFulfilledByReturnedSlot() throws Exception {
        ArrayBlockingQueue slotRequestQueue = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            while (!slotRequestQueue.offer(slotRequest)) {
            }
        });
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            SlotRequestId requestId1 = new SlotRequestId();
            CompletableFuture<PhysicalSlot> future1 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, requestId1);
            SlotRequestId requestId2 = new SlotRequestId();
            CompletableFuture<PhysicalSlot> future2 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, requestId2);
            Assert.assertFalse((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            ArrayList slotRequests = new ArrayList(2);
            for (int i = 0; i < 2; ++i) {
                slotRequests.add(slotRequestQueue.poll(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
            }
            Assert.assertTrue((boolean)this.registerAndOfferSlot(this.taskManagerLocation, slotPool, ((SlotRequest)slotRequests.get(0)).getAllocationId()));
            PhysicalSlot slot1 = future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            slotPool.releaseSlot(requestId1, null);
            PhysicalSlot slot2 = future2.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future2.isDone());
            Assert.assertEquals((Object)slot1, (Object)slot2);
        }
    }

    @Test
    public void testAllocateWithFreeSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            AllocationID allocationId = new AllocationID();
            Assert.assertTrue((boolean)this.registerAndOfferSlot(this.taskManagerLocation, slotPool, allocationId));
            Assert.assertEquals((long)1L, (long)slotPool.getAvailableSlots().size());
            Assert.assertEquals((long)0L, (long)slotPool.getAllocatedSlots().size());
            Optional physicalSlot = slotPool.allocateAvailableSlot(new SlotRequestId(), allocationId, ResourceProfile.ANY);
            Assert.assertTrue((boolean)physicalSlot.isPresent());
            Assert.assertEquals((long)0L, (long)slotPool.getAvailableSlots().size());
            Assert.assertEquals((long)1L, (long)slotPool.getAllocatedSlots().size());
        }
    }

    @Test
    public void testOfferSlot() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            SlotRequestId requestId = new SlotRequestId();
            CompletableFuture<PhysicalSlot> future = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, requestId);
            Assert.assertFalse((boolean)future.isDone());
            SlotRequest slotRequest = (SlotRequest)slotRequestFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            SlotOffer slotOffer = new SlotOffer(slotRequest.getAllocationId(), 0, ResourceProfile.ANY);
            LocalTaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation();
            Assert.assertFalse((boolean)slotPool.offerSlot(invalidTaskManagerLocation, this.taskManagerGateway, slotOffer));
            SlotOffer nonRequestedSlotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, nonRequestedSlotOffer));
            Assert.assertEquals((long)1L, (long)slotPool.getAllocatedSlots().size());
            PhysicalSlot slot = future.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.assertEquals((Object)this.taskManagerLocation, (Object)slot.getTaskManagerLocation());
            Assert.assertEquals((Object)nonRequestedSlotOffer.getAllocationId(), (Object)slot.getAllocationId());
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer));
            Assert.assertEquals((long)1L, (long)slotPool.getAvailableSlots().size());
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer));
            Assert.assertEquals((long)1L, (long)slotPool.getAvailableSlots().size());
            Assert.assertEquals((long)1L, (long)slotPool.getAllocatedSlots().size());
            SlotOffer anotherSlotOfferWithSameAllocationId = new SlotOffer(slotRequest.getAllocationId(), 1, ResourceProfile.ANY);
            Assert.assertFalse((boolean)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, anotherSlotOfferWithSameAllocationId));
            LocalTaskManagerLocation anotherTaskManagerLocation = new LocalTaskManagerLocation();
            Assert.assertFalse((boolean)slotPool.offerSlot(anotherTaskManagerLocation, this.taskManagerGateway, slotOffer));
            slotPool.releaseSlot(requestId, null);
            Assert.assertTrue((boolean)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotOffer));
            Assert.assertFalse((boolean)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, anotherSlotOfferWithSameAllocationId));
            Assert.assertFalse((boolean)slotPool.offerSlot(anotherTaskManagerLocation, this.taskManagerGateway, slotOffer));
        }
    }

    @Test
    public void testReleaseResource() throws Exception {
        CompletableFuture slotRequestFuture = new CompletableFuture();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            SlotRequestId requestId1 = new SlotRequestId();
            CompletableFuture<PhysicalSlot> future1 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, requestId1);
            SlotRequest slotRequest = (SlotRequest)slotRequestFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            CompletableFuture<PhysicalSlot> future2 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId());
            Assert.assertTrue((boolean)this.registerAndOfferSlot(this.taskManagerLocation, slotPool, slotRequest.getAllocationId()));
            PhysicalSlot slot1 = future1.get(1L, TimeUnit.SECONDS);
            Assert.assertTrue((boolean)future1.isDone());
            Assert.assertFalse((boolean)future2.isDone());
            CompletableFuture releaseFuture = new CompletableFuture();
            SingleLogicalSlot logicalSlot = SingleLogicalSlot.allocateFromPhysicalSlot((SlotRequestId)requestId1, (PhysicalSlot)slot1, (Locality)Locality.UNKNOWN, (SlotOwner)new DummySlotOwner(), (boolean)true);
            logicalSlot.tryAssignPayload((LogicalSlot.Payload)new DummyPayload(releaseFuture));
            slotPool.releaseTaskManager(this.taskManagerLocation.getResourceID(), new Exception("Releasing TaskManager in SlotPool for tests"));
            releaseFuture.get();
            Assert.assertFalse((boolean)logicalSlot.isAlive());
            Thread.sleep(10L);
            Assert.assertFalse((boolean)future2.isDone());
        }
    }

    @Test
    public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(2);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
            ArrayBlockingQueue canceledAllocations = new ArrayBlockingQueue(2);
            this.resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
            SlotRequestId slotRequestId1 = new SlotRequestId();
            SlotRequestId slotRequestId2 = new SlotRequestId();
            CompletableFuture<PhysicalSlot> slotFuture1 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, slotRequestId1);
            AllocationID allocationId1 = (AllocationID)allocationIds.take();
            CompletableFuture<PhysicalSlot> slotFuture2 = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, slotRequestId2);
            AllocationID allocationId2 = (AllocationID)allocationIds.take();
            slotPool.releaseSlot(slotRequestId1, null);
            try {
                slotFuture1.get();
                Assert.fail((String)"The first slot future should have failed because it was cancelled.");
            }
            catch (ExecutionException ee) {
                Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)ee) instanceof FlinkException));
            }
            Assert.assertEquals((Object)allocationId1, canceledAllocations.take());
            Assert.assertTrue((boolean)this.registerAndOfferSlot(this.taskManagerLocation, slotPool, allocationId1));
            Assert.assertEquals((Object)allocationId1, (Object)slotFuture2.get().getAllocationId());
            Assert.assertEquals((Object)allocationId2, canceledAllocations.take());
        }
    }

    @Test
    public void testShutdownReleasesAllSlots() throws Exception {
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            int numSlotOffers = 2;
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(2);
            for (int i = 0; i < 2; ++i) {
                slotOffers.add(new SlotOffer(new AllocationID(), i, ResourceProfile.ANY));
            }
            ArrayBlockingQueue freedSlotQueue = new ArrayBlockingQueue(2);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, cause) -> {
                try {
                    freedSlotQueue.put(allocationID);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            Collection acceptedSlotOffers = slotPool.offerSlots(this.taskManagerLocation, this.taskManagerGateway, slotOffers);
            MatcherAssert.assertThat((Object)acceptedSlotOffers, (Matcher)Matchers.equalTo(slotOffers));
            slotPool.close();
            ArrayList freedSlots = new ArrayList(2);
            while (freedSlots.size() < 2) {
                freedSlotQueue.drainTo(freedSlots);
            }
            MatcherAssert.assertThat(freedSlots, (Matcher)Matchers.containsInAnyOrder((Object[])slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
        }
    }

    @Test
    public void testShutdownCancelsAllPendingRequests() throws Exception {
        ArrayBlockingQueue canceledAllocations = new ArrayBlockingQueue(2);
        this.resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer);
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            slotPool.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, TIMEOUT);
            slotPool.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, TIMEOUT);
            MatcherAssert.assertThat((Object)slotPool.getPendingRequests().values(), (Matcher)Matchers.hasSize((int)2));
            slotPool.close();
            MatcherAssert.assertThat((Object)slotPool.getPendingRequests().values(), (Matcher)Matchers.hasSize((int)0));
            MatcherAssert.assertThat(canceledAllocations, (Matcher)Matchers.hasSize((int)2));
        }
    }

    @Test
    public void testCheckIdleSlot() throws Exception {
        ManualClock clock = new ManualClock();
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway, (Clock)clock, TIMEOUT);){
            ArrayBlockingQueue freedSlots = new ArrayBlockingQueue(1);
            this.taskManagerGateway.setFreeSlotFunction((allocationId, cause) -> {
                try {
                    freedSlots.put(allocationId);
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }
                catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally((Throwable)e);
                }
            });
            AllocationID expiredSlotID = new AllocationID();
            AllocationID freshSlotID = new AllocationID();
            SlotOffer slotToExpire = new SlotOffer(expiredSlotID, 0, ResourceProfile.ANY);
            SlotOffer slotToNotExpire = new SlotOffer(freshSlotID, 1, ResourceProfile.ANY);
            MatcherAssert.assertThat((Object)slotPool.registerTaskManager(this.taskManagerLocation.getResourceID()), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotToExpire), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotToNotExpire), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(1L, TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            AllocationID freedSlot = (AllocationID)freedSlots.poll(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            MatcherAssert.assertThat((Object)freedSlot, (Matcher)Matchers.is((Object)expiredSlotID));
            MatcherAssert.assertThat((Object)freedSlots.isEmpty(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    public void testDiscardIdleSlotIfReleasingFailed() throws Exception {
        ManualClock clock = new ManualClock();
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway, (Clock)clock, TIMEOUT);){
            AllocationID expiredAllocationId = new AllocationID();
            SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.ANY);
            OneShotLatch freeSlotLatch = new OneShotLatch();
            this.taskManagerGateway.setFreeSlotFunction((allocationId, cause) -> {
                freeSlotLatch.trigger();
                return FutureUtils.completedExceptionally((Throwable)new TimeoutException("Test failure"));
            });
            MatcherAssert.assertThat((Object)slotPool.registerTaskManager(this.taskManagerLocation.getResourceID()), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)slotPool.offerSlot(this.taskManagerLocation, this.taskManagerGateway, slotToExpire), (Matcher)Matchers.is((Object)true));
            clock.advanceTime(TIMEOUT.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
            slotPool.triggerCheckIdleSlot();
            freeSlotLatch.await();
            CompletableFuture<PhysicalSlot> allocatedSlotFuture = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId());
            try {
                allocatedSlotFuture.get(10L, TimeUnit.MILLISECONDS);
                Assert.fail((String)"Expected to fail with a timeout.");
            }
            catch (TimeoutException ignored) {
                Assert.assertEquals((long)0L, (long)slotPool.getAvailableSlots().size());
            }
        }
    }

    @Test
    public void testFreeFailedSlots() throws Exception {
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            int parallelism = 5;
            ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(5);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
            HashMap<SlotRequestId, CompletableFuture<PhysicalSlot>> slotRequestFutures = new HashMap<SlotRequestId, CompletableFuture<PhysicalSlot>>(5);
            for (int i = 0; i < 5; ++i) {
                SlotRequestId slotRequestId = new SlotRequestId();
                slotRequestFutures.put(slotRequestId, SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, slotRequestId));
            }
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(5);
            for (int i = 0; i < 5; ++i) {
                slotOffers.add(new SlotOffer((AllocationID)allocationIds.take(), i, ResourceProfile.ANY));
            }
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            slotPool.offerSlots(this.taskManagerLocation, this.taskManagerGateway, slotOffers);
            FutureUtils.waitForAll(slotRequestFutures.values()).get();
            ArrayBlockingQueue freedSlots = new ArrayBlockingQueue(1);
            this.taskManagerGateway.setFreeSlotFunction((allocationID, throwable) -> {
                freedSlots.offer(allocationID);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            FlinkException failException = new FlinkException("Test fail exception");
            for (int i = 0; i < 4; ++i) {
                SlotOffer slotOffer = (SlotOffer)slotOffers.get(i);
                Optional emptyTaskExecutorFuture = slotPool.failAllocation(slotOffer.getAllocationId(), (Exception)failException);
                MatcherAssert.assertThat((Object)emptyTaskExecutorFuture.isPresent(), (Matcher)Matchers.is((Object)false));
                MatcherAssert.assertThat(freedSlots.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)slotOffer.getAllocationId())));
            }
            SlotOffer slotOffer = (SlotOffer)slotOffers.get(4);
            Optional emptyTaskExecutorFuture = slotPool.failAllocation(slotOffer.getAllocationId(), (Exception)failException);
            MatcherAssert.assertThat(emptyTaskExecutorFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.taskManagerLocation.getResourceID())));
            MatcherAssert.assertThat(freedSlots.take(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)slotOffer.getAllocationId())));
        }
    }

    @Test
    public void testCreateAllocatedSlotReport() throws Exception {
        JobID jobId = new JobID();
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway, jobId);){
            ArrayBlockingQueue allocationIds = new ArrayBlockingQueue(1);
            this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
            CompletableFuture<PhysicalSlot> slotRequestFuture = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, new SlotRequestId());
            ArrayList<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<AllocatedSlotInfo>(2);
            ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>(2);
            AllocationID allocatedId = (AllocationID)allocationIds.take();
            slotOffers.add(new SlotOffer(allocatedId, 0, ResourceProfile.ANY));
            allocatedSlotInfos.add(new AllocatedSlotInfo(0, allocatedId));
            AllocationID availableId = new AllocationID();
            slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.ANY));
            allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));
            slotPool.registerTaskManager(this.taskManagerLocation.getResourceID());
            slotPool.offerSlots(this.taskManagerLocation, this.taskManagerGateway, slotOffers);
            slotRequestFuture.get();
            AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(this.taskManagerLocation.getResourceID());
            MatcherAssert.assertThat((Object)jobId, (Matcher)Matchers.is((Object)slotReport.getJobId()));
            MatcherAssert.assertThat((Object)slotReport.getAllocatedSlotInfos(), (Matcher)Matchers.containsInAnyOrder(SlotPoolImplTest.isEachEqual(allocatedSlotInfos)));
        }
    }

    @Test
    public void testCalculationOfTaskExecutorUtilization() throws Exception {
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            LocalTaskManagerLocation firstTaskManagerLocation = new LocalTaskManagerLocation();
            LocalTaskManagerLocation secondTaskManagerLocation = new LocalTaskManagerLocation();
            List<AllocationID> firstTaskManagersSlots = this.registerAndOfferSlots(firstTaskManagerLocation, slotPool, 4);
            List<AllocationID> secondTaskManagersSlots = this.registerAndOfferSlots(secondTaskManagerLocation, slotPool, 4);
            slotPool.allocateAvailableSlot(new SlotRequestId(), firstTaskManagersSlots.get(0), ResourceProfile.ANY);
            slotPool.allocateAvailableSlot(new SlotRequestId(), firstTaskManagersSlots.get(1), ResourceProfile.ANY);
            slotPool.allocateAvailableSlot(new SlotRequestId(), secondTaskManagersSlots.get(3), ResourceProfile.ANY);
            Collection availableSlotsInformation = slotPool.getAvailableSlotsInformation();
            ImmutableMap utilizationPerTaskExecutor = ImmutableMap.of((Object)((Object)firstTaskManagerLocation), (Object)0.5, (Object)((Object)secondTaskManagerLocation), (Object)0.25);
            for (SlotInfoWithUtilization slotInfoWithUtilization : availableSlotsInformation) {
                double expectedTaskExecutorUtilization = (Double)utilizationPerTaskExecutor.get(slotInfoWithUtilization.getTaskManagerLocation());
                MatcherAssert.assertThat((Object)slotInfoWithUtilization.getTaskExecutorUtilization(), (Matcher)Matchers.is((Matcher)Matchers.closeTo((double)expectedTaskExecutorUtilization, (double)0.1)));
            }
        }
    }

    @Test
    public void testOrphanedAllocationCanBeRemapped() throws Exception {
        ArrayList allocationIds = new ArrayList();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
        ArrayList canceledAllocations = new ArrayList();
        this.resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
        try (TestingSlotPoolImpl slotPool = SlotPoolImplTest.createAndSetUpSlotPool(this.resourceManagerGateway);){
            SlotRequestId slotRequestId1 = new SlotRequestId();
            SlotRequestId slotRequestId2 = new SlotRequestId();
            SlotPoolUtils.requestNewAllocatedSlots((SlotPool)slotPool, slotRequestId1, slotRequestId2);
            AllocationID allocationId1 = (AllocationID)allocationIds.get(0);
            AllocationID allocationId2 = (AllocationID)allocationIds.get(1);
            this.registerAndOfferSlot(this.taskManagerLocation, slotPool, allocationId2);
            MatcherAssert.assertThat((Object)slotPool.getPendingRequests().values(), (Matcher)Matchers.hasSize((int)1));
            MatcherAssert.assertThat((Object)slotPool.getPendingRequests().containsKeyA((Object)slotRequestId2), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)slotPool.getPendingRequests().containsKeyB((Object)allocationId1), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat(canceledAllocations, (Matcher)Matchers.hasSize((int)0));
        }
    }

    @Test
    public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception {
        ArrayList allocationIds = new ArrayList();
        this.resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIds.add(slotRequest.getAllocationId()));
        ArrayList canceledAllocations = new ArrayList();
        this.resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
        try (TestingSlotPoolImpl slotPool = SlotPoolUtils.createAndSetUpSlotPool(this.resourceManagerGateway);){
            AllocationID randomAllocationId;
            SlotRequestId slotRequestId1 = new SlotRequestId();
            SlotRequestId slotRequestId2 = new SlotRequestId();
            SlotPoolUtils.requestNewAllocatedSlots((SlotPool)slotPool, slotRequestId1, slotRequestId2);
            AllocationID allocationId1 = (AllocationID)allocationIds.get(0);
            AllocationID allocationId2 = (AllocationID)allocationIds.get(1);
            while ((randomAllocationId = new AllocationID()).equals((Object)allocationId1) || randomAllocationId.equals((Object)allocationId2)) {
            }
            this.registerAndOfferSlot(this.taskManagerLocation, slotPool, randomAllocationId);
            MatcherAssert.assertThat((Object)slotPool.getPendingRequests().values(), (Matcher)Matchers.hasSize((int)1));
            MatcherAssert.assertThat(canceledAllocations, (Matcher)Matchers.contains((Object[])new AllocationID[]{allocationId1}));
        }
    }

    @Test
    public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception {
        try (TestingSlotPoolImpl slotPool = new TestingSlotPoolImpl(new JobID());){
            slotPool.start(JobMasterId.generate(), "mock-address", mainThreadExecutor);
            SlotRequestId slotRequestId = new SlotRequestId();
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedSlot((SlotPool)slotPool, slotRequestId);
            MatcherAssert.assertThat(slotPool.getWaitingForResourceManager().values(), (Matcher)Matchers.hasSize((int)1));
            AllocationID allocationId = new AllocationID();
            this.registerAndOfferSlot(this.taskManagerLocation, slotPool, allocationId);
            MatcherAssert.assertThat(slotPool.getWaitingForResourceManager().values(), (Matcher)Matchers.hasSize((int)0));
            MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)slotFuture.isCompletedExceptionally(), (Matcher)Matchers.is((Object)false));
            MatcherAssert.assertThat((Object)((PhysicalSlot)slotFuture.getNow(null)).getAllocationId(), (Matcher)Matchers.is((Object)allocationId));
        }
    }

    private static TestingSlotPoolImpl createAndSetUpSlotPool(ResourceManagerGateway resourceManagerGateway) throws Exception {
        return new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway).build();
    }

    private static TestingSlotPoolImpl createAndSetUpSlotPool(ResourceManagerGateway resourceManagerGateway, JobID jobId) throws Exception {
        return new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway).setJobId(jobId).build();
    }

    private static TestingSlotPoolImpl createAndSetUpSlotPool(ResourceManagerGateway resourceManagerGateway, Clock clock, Time idleSlotTimeout) throws Exception {
        return new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway).setClock(clock).setIdleSlotTimeout(idleSlotTimeout).build();
    }

    private boolean registerAndOfferSlot(TaskManagerLocation taskManagerLocation, SlotPoolImpl slotPool, AllocationID allocationId) {
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
        return slotPool.offerSlot(taskManagerLocation, (TaskManagerGateway)this.taskManagerGateway, slotOffer);
    }

    private List<AllocationID> registerAndOfferSlots(TaskManagerLocation taskManagerLocation, SlotPoolImpl slotPool, int numberOfSlotsToRegister) {
        slotPool.registerTaskManager(taskManagerLocation.getResourceID());
        List<AllocationID> allocationIds = IntStream.range(0, numberOfSlotsToRegister).mapToObj(ignored -> new AllocationID()).collect(Collectors.toList());
        Collection slotOffers = IntStream.range(0, numberOfSlotsToRegister).mapToObj(index -> new SlotOffer((AllocationID)allocationIds.get(index), index, ResourceProfile.ANY)).collect(Collectors.toList());
        slotPool.offerSlots(taskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), slotOffers);
        return allocationIds;
    }

    private static Collection<Matcher<? super AllocatedSlotInfo>> isEachEqual(Collection<AllocatedSlotInfo> allocatedSlotInfos) {
        return allocatedSlotInfos.stream().map(SlotPoolImplTest::isEqualAllocatedSlotInfo).collect(Collectors.toList());
    }

    private static Matcher<AllocatedSlotInfo> isEqualAllocatedSlotInfo(final AllocatedSlotInfo expectedAllocatedSlotInfo) {
        return new TypeSafeDiagnosingMatcher<AllocatedSlotInfo>(){

            public void describeTo(Description description) {
                description.appendText(this.describeAllocatedSlotInformation(expectedAllocatedSlotInfo));
            }

            private String describeAllocatedSlotInformation(AllocatedSlotInfo expectedAllocatedSlotInformation) {
                return expectedAllocatedSlotInformation.toString();
            }

            protected boolean matchesSafely(AllocatedSlotInfo item, Description mismatchDescription) {
                boolean matches;
                boolean bl = matches = item.getAllocationId().equals((Object)expectedAllocatedSlotInfo.getAllocationId()) && item.getSlotIndex() == expectedAllocatedSlotInfo.getSlotIndex();
                if (!matches) {
                    mismatchDescription.appendText("Actual value ").appendText(this.describeAllocatedSlotInformation(item)).appendText(" differs from expected value ").appendText(this.describeAllocatedSlotInformation(expectedAllocatedSlotInfo));
                }
                return matches;
            }
        };
    }
}

