/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.active;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.active.PendingWorkerCounter;
import org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler;
import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;

public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
extends ResourceManager<WorkerType>
implements ResourceEventHandler<WorkerType> {
    protected final Configuration flinkConfig;
    private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
    private final Map<ResourceID, WorkerType> workerNodeMap;
    private final PendingWorkerCounter pendingWorkerCounter;
    private final Map<ResourceID, WorkerResourceSpec> currentAttemptUnregisteredWorkers;

    public ActiveResourceManager(ResourceManagerDriver<WorkerType> resourceManagerDriver, Configuration flinkConfig, RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, Executor ioExecutor) {
        super(rpcService, resourceId, highAvailabilityServices, heartbeatServices, slotManager, clusterPartitionTrackerFactory, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, AkkaUtils.getTimeoutAsTime((Configuration)Preconditions.checkNotNull((Object)flinkConfig)), ioExecutor);
        this.flinkConfig = flinkConfig;
        this.resourceManagerDriver = resourceManagerDriver;
        this.workerNodeMap = new HashMap<ResourceID, WorkerType>();
        this.pendingWorkerCounter = new PendingWorkerCounter();
        this.currentAttemptUnregisteredWorkers = new HashMap<ResourceID, WorkerResourceSpec>();
    }

    @Override
    protected void initialize() throws ResourceManagerException {
        try {
            this.resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), this.ioExecutor);
        }
        catch (Exception e) {
            throw new ResourceManagerException("Cannot initialize resource provider.", e);
        }
    }

    @Override
    protected void terminate() throws ResourceManagerException {
        try {
            this.resourceManagerDriver.terminate().get();
        }
        catch (Exception e) {
            throw new ResourceManagerException("Cannot terminate resource provider.", e);
        }
    }

    @Override
    protected CompletableFuture<Void> prepareLeadershipAsync() {
        return this.resourceManagerDriver.onGrantLeadership();
    }

    @Override
    protected CompletableFuture<Void> clearStateAsync() {
        return this.resourceManagerDriver.onRevokeLeadership();
    }

    @Override
    protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException {
        try {
            this.resourceManagerDriver.deregisterApplication(finalStatus, optionalDiagnostics);
        }
        catch (Exception e) {
            throw new ResourceManagerException("Cannot deregister application.", e);
        }
    }

    @Override
    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
        this.requestNewWorker(workerResourceSpec);
        return true;
    }

    @Override
    protected WorkerType workerStarted(ResourceID resourceID) {
        return (WorkerType)((ResourceIDRetrievable)this.workerNodeMap.get(resourceID));
    }

    @Override
    public boolean stopWorker(WorkerType worker) {
        ResourceID resourceId = worker.getResourceID();
        this.resourceManagerDriver.releaseResource(worker);
        this.log.info("Stopping worker {}.", (Object)resourceId.getStringWithMetadata());
        this.clearStateForWorker(resourceId);
        return true;
    }

    @Override
    protected void onWorkerRegistered(WorkerType worker) {
        ResourceID resourceId = worker.getResourceID();
        this.log.info("Worker {} is registered.", (Object)resourceId.getStringWithMetadata());
        WorkerResourceSpec workerResourceSpec = this.currentAttemptUnregisteredWorkers.remove(resourceId);
        if (workerResourceSpec != null) {
            int count = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
            this.log.info("Worker {} with resource spec {} was requested in current attempt. Current pending count after registering: {}.", new Object[]{resourceId.getStringWithMetadata(), workerResourceSpec, count});
        }
    }

    @Override
    public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWorkers) {
        this.getMainThreadExecutor().assertRunningInMainThread();
        this.log.info("Recovered {} workers from previous attempt.", (Object)recoveredWorkers.size());
        for (ResourceIDRetrievable worker : recoveredWorkers) {
            ResourceID resourceId = worker.getResourceID();
            this.workerNodeMap.put(resourceId, worker);
            this.log.info("Worker {} recovered from previous attempt.", (Object)resourceId.getStringWithMetadata());
        }
    }

    @Override
    public void onWorkerTerminated(ResourceID resourceId, String diagnostics) {
        if (this.clearStateForWorker(resourceId)) {
            this.log.info("Worker {} is terminated. Diagnostics: {}", (Object)resourceId.getStringWithMetadata(), (Object)diagnostics);
            this.requestWorkerIfRequired();
        }
        this.closeTaskManagerConnection(resourceId, new Exception(diagnostics));
    }

    @Override
    public void onError(Throwable exception) {
        this.onFatalError(exception);
    }

    private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
        TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, workerResourceSpec);
        int pendingCount = this.pendingWorkerCounter.increaseAndGet(workerResourceSpec);
        this.log.info("Requesting new worker with resource spec {}, current pending count: {}.", (Object)workerResourceSpec, (Object)pendingCount);
        CompletableFuture<WorkerType> requestResourceFuture = this.resourceManagerDriver.requestResource(taskExecutorProcessSpec);
        FutureUtils.assertNoException(requestResourceFuture.handle((worker, exception) -> {
            if (exception != null) {
                int count = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
                this.log.warn("Failed requesting worker with resource spec {}, current pending count: {}", new Object[]{workerResourceSpec, count, exception});
                this.requestWorkerIfRequired();
            } else {
                ResourceID resourceId = worker.getResourceID();
                this.workerNodeMap.put(resourceId, worker);
                this.currentAttemptUnregisteredWorkers.put(resourceId, workerResourceSpec);
                this.log.info("Requested worker {} with resource spec {}.", (Object)resourceId.getStringWithMetadata(), (Object)workerResourceSpec);
            }
            return null;
        }));
    }

    private boolean clearStateForWorker(ResourceID resourceId) {
        ResourceIDRetrievable worker = (ResourceIDRetrievable)this.workerNodeMap.remove(resourceId);
        if (worker == null) {
            this.log.debug("Ignore unrecognized worker {}.", (Object)resourceId.getStringWithMetadata());
            return false;
        }
        WorkerResourceSpec workerResourceSpec = this.currentAttemptUnregisteredWorkers.remove(resourceId);
        if (workerResourceSpec != null) {
            int count = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
            this.log.info("Worker {} with resource spec {} was requested in current attempt and has not registered. Current pending count after removing: {}.", new Object[]{resourceId.getStringWithMetadata(), workerResourceSpec, count});
        }
        return true;
    }

    private void requestWorkerIfRequired() {
        for (Map.Entry<WorkerResourceSpec, Integer> entry : this.getRequiredResources().entrySet()) {
            WorkerResourceSpec workerResourceSpec = entry.getKey();
            int requiredCount = entry.getValue();
            while (requiredCount > this.pendingWorkerCounter.getNum(workerResourceSpec)) {
                this.requestNewWorker(workerResourceSpec);
            }
        }
    }

    @VisibleForTesting
    <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) {
        return this.callAsync(callable, timeout);
    }

    private class GatewayMainThreadExecutor
    implements ScheduledExecutor {
        private GatewayMainThreadExecutor() {
        }

        @Override
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(command, delay, unit);
        }

        @Override
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(callable, delay, unit);
        }

        @Override
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
        }

        @Override
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }

        @Override
        public void execute(Runnable command) {
            ActiveResourceManager.this.getMainThreadExecutor().execute(command);
        }
    }
}

