/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.benchmark.e2e;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;

public class SchedulerBenchmarkBase {
    ScheduledExecutorService scheduledExecutorService;
    ComponentMainThreadExecutor mainThreadExecutor;
    JobGraph jobGraph;
    PhysicalSlotProvider physicalSlotProvider;

    public void setup(JobConfiguration jobConfiguration) throws Exception {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        List<JobVertex> jobVertices = SchedulerBenchmarkUtils.createDefaultJobVertices(jobConfiguration);
        this.jobGraph = SchedulerBenchmarkUtils.createJobGraph(jobVertices, jobConfiguration);
        this.physicalSlotProvider = SchedulerBenchmarkBase.createPhysicalSlotProvider(jobConfiguration, jobVertices.size(), this.mainThreadExecutor);
    }

    public void teardown() throws Exception {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    private static PhysicalSlotProvider createPhysicalSlotProvider(JobConfiguration jobConfiguration, int numberOfJobVertices, ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
        int slotPoolSize = jobConfiguration.getParallelism() * numberOfJobVertices;
        TestingSlotPoolImpl slotPool = new SlotPoolBuilder(mainThreadExecutor).build();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        SchedulerBenchmarkBase.offerSlots(slotPool, (TaskManagerGateway)new RpcTaskManagerGateway((TaskExecutorGateway)testingTaskExecutorGateway, JobMasterId.generate()), slotPoolSize, mainThreadExecutor);
        return new PhysicalSlotProviderImpl((SlotSelectionStrategy)LocationPreferenceSlotSelectionStrategy.createDefault(), (SlotPool)slotPool);
    }

    private static void offerSlots(SlotPoolImpl slotPool, TaskManagerGateway taskManagerGateway, int slotPoolSize, ComponentMainThreadExecutor mainThreadExecutor) {
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        CompletableFuture.runAsync(() -> {
            slotPool.registerTaskManager(taskManagerLocation.getResourceID());
            Collection slotOffers = IntStream.range(0, slotPoolSize).mapToObj(i -> new SlotOffer(new AllocationID(), i, ResourceProfile.ANY)).collect(Collectors.toList());
            slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
        }, (Executor)mainThreadExecutor).join();
    }

    static DefaultScheduler createScheduler(JobGraph jobGraph, PhysicalSlotProvider physicalSlotProvider, ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, mainThreadExecutor).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
    }
}

