/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ResourceManagerJobMasterTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    private TestingRpcService rpcService;
    private JobID jobId;
    private TestingJobMasterGateway jobMasterGateway;
    private ResourceID jobMasterResourceId;
    private SettableLeaderRetrievalService jobMasterLeaderRetrievalService;
    private TestingLeaderElectionService resourceManagerLeaderElectionService;
    private TestingHighAvailabilityServices haServices;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceManager<?> resourceManager;
    private ResourceManagerGateway resourceManagerGateway;

    @Before
    public void setup() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
        this.createAndRegisterJobMasterGateway();
        this.jobMasterResourceId = ResourceID.generate();
        this.jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(this.jobMasterGateway.getAddress(), this.jobMasterGateway.getFencingToken().toUUID());
        this.resourceManagerLeaderElectionService = new TestingLeaderElectionService();
        this.haServices = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(requestedJobId -> {
            if (requestedJobId.equals((Object)this.jobId)) {
                return this.jobMasterLeaderRetrievalService;
            }
            throw new FlinkRuntimeException(String.format("Unknown job id %s", this.jobId));
        }).setResourceManagerLeaderElectionService(this.resourceManagerLeaderElectionService).build();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManager = this.createAndStartResourceManager();
        this.resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
        this.resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
    }

    private void createAndRegisterJobMasterGateway() {
        this.jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.rpcService.registerGateway(this.jobMasterGateway.getAddress(), (RpcGateway)this.jobMasterGateway);
    }

    private ResourceManager<?> createAndStartResourceManager() throws Exception {
        ResourceID rmResourceId = ResourceID.generate();
        HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)this.haServices, this.rpcService.getScheduledExecutor(), Time.minutes((long)5L));
        SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setScheduledExecutor(this.rpcService.getScheduledExecutor()).build();
        StandaloneResourceManager resourceManager = new StandaloneResourceManager((RpcService)this.rpcService, rmResourceId, (HighAvailabilityServices)this.haServices, heartbeatServices, (SlotManager)slotManager, NoOpResourceManagerPartitionTracker::get, (JobLeaderIdService)jobLeaderIdService, new ClusterInformation("localhost", 1234), (FatalErrorHandler)this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), Time.minutes((long)5L), RpcUtils.INF_TIMEOUT, (Executor)ForkJoinPool.commonPool());
        resourceManager.start();
        return resourceManager;
    }

    @After
    public void teardown() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint(this.resourceManager, (Time)TIMEOUT);
        }
        if (this.haServices != null) {
            this.haServices.closeAndCleanupAllData();
        }
        if (this.rpcService != null) {
            RpcUtils.terminateRpcService((RpcService)this.rpcService, (Time)TIMEOUT);
        }
        if (this.testingFatalErrorHandler != null && this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @Test
    public void testRegisterJobMaster() throws Exception {
        CompletableFuture successfulFuture = this.resourceManagerGateway.registerJobManager(this.jobMasterGateway.getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT);
        RegistrationResponse response = (RegistrationResponse)successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertTrue((boolean)(response instanceof JobMasterRegistrationSuccess));
    }

    @Test
    public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
        ResourceManagerGateway wronglyFencedGateway = this.rpcService.connect(this.resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        CompletableFuture unMatchedLeaderFuture = wronglyFencedGateway.registerJobManager(this.jobMasterGateway.getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT);
        try {
            unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
            Assert.fail((String)"Should fail because we are using the wrong fencing token.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
        }
    }

    @Test
    public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception {
        JobMasterId differentJobMasterId = JobMasterId.generate();
        CompletableFuture unMatchedLeaderFuture = this.resourceManagerGateway.registerJobManager(differentJobMasterId, this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT);
        Assert.assertTrue((boolean)(unMatchedLeaderFuture.get() instanceof RegistrationResponse.Failure));
    }

    @Test
    public void testRegisterJobMasterFromInvalidAddress() throws Exception {
        String invalidAddress = "/jobMasterAddress2";
        CompletableFuture invalidAddressFuture = this.resourceManagerGateway.registerJobManager(new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), this.jobMasterResourceId, invalidAddress, this.jobId, TIMEOUT);
        Assert.assertTrue((boolean)(invalidAddressFuture.get(5L, TimeUnit.SECONDS) instanceof RegistrationResponse.Failure));
    }

    @Test
    public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
        JobID unknownJobIDToHAServices = new JobID();
        CompletableFuture registrationFuture = this.resourceManagerGateway.registerJobManager(this.jobMasterGateway.getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), unknownJobIDToHAServices, TIMEOUT);
        try {
            registrationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail((String)"Expected to fail with a ResourceManagerException.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof ResourceManagerException));
        }
        this.testingFatalErrorHandler.clearError();
    }
}

