/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.JobLeaderListener;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultJobLeaderService
implements JobLeaderService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobLeaderService.class);
    private final UnresolvedTaskManagerLocation ownLocation;
    private final Map<JobID, Tuple2<LeaderRetrievalService, JobManagerLeaderListener>> jobLeaderServices;
    private final RetryingRegistrationConfiguration retryingRegistrationConfiguration;
    private volatile State state;
    private String ownerAddress;
    private RpcService rpcService;
    private HighAvailabilityServices highAvailabilityServices;
    private JobLeaderListener jobLeaderListener;

    public DefaultJobLeaderService(UnresolvedTaskManagerLocation location, RetryingRegistrationConfiguration retryingRegistrationConfiguration) {
        this.ownLocation = (UnresolvedTaskManagerLocation)Preconditions.checkNotNull((Object)location);
        this.retryingRegistrationConfiguration = (RetryingRegistrationConfiguration)Preconditions.checkNotNull((Object)retryingRegistrationConfiguration);
        this.jobLeaderServices = new ConcurrentHashMap<JobID, Tuple2<LeaderRetrievalService, JobManagerLeaderListener>>(4);
        this.state = State.CREATED;
        this.ownerAddress = null;
        this.rpcService = null;
        this.highAvailabilityServices = null;
        this.jobLeaderListener = null;
    }

    @Override
    public void start(String initialOwnerAddress, RpcService initialRpcService, HighAvailabilityServices initialHighAvailabilityServices, JobLeaderListener initialJobLeaderListener) {
        if (State.CREATED != this.state) {
            throw new IllegalStateException("The service has already been started.");
        }
        LOG.info("Start job leader service.");
        this.ownerAddress = (String)Preconditions.checkNotNull((Object)initialOwnerAddress);
        this.rpcService = (RpcService)Preconditions.checkNotNull((Object)initialRpcService);
        this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)initialHighAvailabilityServices);
        this.jobLeaderListener = (JobLeaderListener)Preconditions.checkNotNull((Object)initialJobLeaderListener);
        this.state = State.STARTED;
    }

    @Override
    public void stop() throws Exception {
        LOG.info("Stop job leader service.");
        if (State.STARTED == this.state) {
            for (Tuple2<LeaderRetrievalService, JobManagerLeaderListener> leaderRetrievalServiceEntry : this.jobLeaderServices.values()) {
                LeaderRetrievalService leaderRetrievalService = (LeaderRetrievalService)leaderRetrievalServiceEntry.f0;
                JobManagerLeaderListener jobManagerLeaderListener = (JobManagerLeaderListener)leaderRetrievalServiceEntry.f1;
                jobManagerLeaderListener.stop();
                leaderRetrievalService.stop();
            }
            this.jobLeaderServices.clear();
        }
        this.state = State.STOPPED;
    }

    @Override
    public void removeJob(JobID jobId) {
        Preconditions.checkState((State.STARTED == this.state ? 1 : 0) != 0, (Object)"The service is currently not running.");
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> entry = this.jobLeaderServices.remove(jobId);
        if (entry != null) {
            LOG.info("Remove job {} from job leader monitoring.", (Object)jobId);
            LeaderRetrievalService leaderRetrievalService = (LeaderRetrievalService)entry.f0;
            JobManagerLeaderListener jobManagerLeaderListener = (JobManagerLeaderListener)entry.f1;
            jobManagerLeaderListener.stop();
            try {
                leaderRetrievalService.stop();
            }
            catch (Exception e) {
                LOG.info("Could not properly stop the LeaderRetrievalService for job {}.", (Object)jobId, (Object)e);
            }
        }
    }

    @Override
    public void addJob(JobID jobId, String defaultTargetAddress) throws Exception {
        Preconditions.checkState((State.STARTED == this.state ? 1 : 0) != 0, (Object)"The service is currently not running.");
        LOG.info("Add job {} for job leader monitoring.", (Object)jobId);
        LeaderRetrievalService leaderRetrievalService = this.highAvailabilityServices.getJobManagerLeaderRetriever(jobId, defaultTargetAddress);
        JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> oldEntry = this.jobLeaderServices.put(jobId, (Tuple2<LeaderRetrievalService, JobManagerLeaderListener>)Tuple2.of((Object)leaderRetrievalService, (Object)jobManagerLeaderListener));
        if (oldEntry != null) {
            ((LeaderRetrievalService)oldEntry.f0).stop();
            ((JobManagerLeaderListener)oldEntry.f1).stop();
        }
        leaderRetrievalService.start(jobManagerLeaderListener);
    }

    @Override
    public void reconnect(JobID jobId) {
        Preconditions.checkNotNull((Object)jobId, (String)"JobID must not be null.");
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> jobLeaderService = this.jobLeaderServices.get(jobId);
        if (jobLeaderService != null) {
            ((JobManagerLeaderListener)jobLeaderService.f1).reconnect();
        } else {
            LOG.info("Cannot reconnect to job {} because it is not registered.", (Object)jobId);
        }
    }

    @Override
    @VisibleForTesting
    public boolean containsJob(JobID jobId) {
        Preconditions.checkState((State.STARTED == this.state ? 1 : 0) != 0, (Object)"The service is currently not running.");
        return this.jobLeaderServices.containsKey(jobId);
    }

    private static enum State {
        CREATED,
        STARTED,
        STOPPED;

    }

    private static final class JobManagerRetryingRegistration
    extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> {
        private final String taskManagerRpcAddress;
        private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;
        private final JobID jobId;

        JobManagerRetryingRegistration(Logger log, RpcService rpcService, String targetName, Class<JobMasterGateway> targetType, String targetAddress, JobMasterId jobMasterId, RetryingRegistrationConfiguration retryingRegistrationConfiguration, String taskManagerRpcAddress, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, JobID jobId) {
            super(log, rpcService, targetName, targetType, targetAddress, jobMasterId, retryingRegistrationConfiguration);
            this.taskManagerRpcAddress = taskManagerRpcAddress;
            this.unresolvedTaskManagerLocation = (UnresolvedTaskManagerLocation)Preconditions.checkNotNull((Object)unresolvedTaskManagerLocation);
            this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
        }

        @Override
        protected CompletableFuture<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, JobMasterId fencingToken, long timeoutMillis) {
            return gateway.registerTaskManager(this.taskManagerRpcAddress, this.unresolvedTaskManagerLocation, this.jobId, Time.milliseconds((long)timeoutMillis));
        }
    }

    @ThreadSafe
    private final class JobManagerLeaderListener
    implements LeaderRetrievalListener {
        private final Object lock = new Object();
        private final JobID jobId;
        @Nullable
        @GuardedBy(value="lock")
        private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> rpcConnection;
        @Nullable
        @GuardedBy(value="lock")
        private JobMasterId currentJobMasterId;
        private volatile boolean stopped;

        private JobManagerLeaderListener(JobID jobId) {
            this.jobId = (JobID)Preconditions.checkNotNull((Object)jobId);
            this.stopped = false;
            this.rpcConnection = null;
            this.currentJobMasterId = null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private JobMasterId getCurrentJobMasterId() {
            Object object = this.lock;
            synchronized (object) {
                return this.currentJobMasterId;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void stop() {
            Object object = this.lock;
            synchronized (object) {
                if (!this.stopped) {
                    this.stopped = true;
                    this.closeRpcConnection();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void reconnect() {
            Object object = this.lock;
            synchronized (object) {
                if (this.stopped) {
                    LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
                } else if (this.rpcConnection != null) {
                    Preconditions.checkState((boolean)this.rpcConnection.tryReconnect(), (Object)"Illegal concurrent modification of the JobManagerLeaderListener rpc connection.");
                } else {
                    LOG.debug("Cannot reconnect to an unknown JobMaster.");
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderId) {
            Optional<JobMasterId> jobManagerLostLeadership = Optional.empty();
            Object object = this.lock;
            synchronized (object) {
                if (this.stopped) {
                    LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. However, the service is no longer running.", (Object)DefaultJobLeaderService.class.getSimpleName(), (Object)this.jobId);
                } else {
                    JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
                    LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", new Object[]{this.jobId, leaderAddress, jobMasterId});
                    if (leaderAddress == null || leaderAddress.isEmpty()) {
                        jobManagerLostLeadership = Optional.ofNullable(this.currentJobMasterId);
                        this.closeRpcConnection();
                    } else if (Objects.equals((Object)jobMasterId, (Object)this.currentJobMasterId)) {
                        LOG.debug("Ongoing attempt to connect to leader of job {}. Ignoring duplicate leader information.", (Object)this.jobId);
                    } else {
                        this.closeRpcConnection();
                        this.openRpcConnectionTo(leaderAddress, jobMasterId);
                    }
                }
            }
            jobManagerLostLeadership.ifPresent(oldJobMasterId -> DefaultJobLeaderService.this.jobLeaderListener.jobManagerLostLeadership(this.jobId, (JobMasterId)((Object)oldJobMasterId)));
        }

        @GuardedBy(value="lock")
        private void openRpcConnectionTo(String leaderAddress, JobMasterId jobMasterId) {
            Preconditions.checkState((this.currentJobMasterId == null && this.rpcConnection == null ? 1 : 0) != 0, (Object)"Cannot open a new rpc connection if the previous connection has not been closed.");
            this.currentJobMasterId = jobMasterId;
            this.rpcConnection = new JobManagerRegisteredRpcConnection(LOG, leaderAddress, jobMasterId, DefaultJobLeaderService.this.rpcService.getExecutor());
            LOG.info("Try to register at job manager {} with leader id {}.", (Object)leaderAddress, (Object)jobMasterId.toUUID());
            this.rpcConnection.start();
        }

        @GuardedBy(value="lock")
        private void closeRpcConnection() {
            if (this.rpcConnection != null) {
                this.rpcConnection.close();
                this.rpcConnection = null;
                this.currentJobMasterId = null;
            }
        }

        @Override
        public void handleError(Exception exception) {
            if (this.stopped) {
                LOG.debug("{}'s leader retrieval listener reported an exception for job {}. However, the service is no longer running.", new Object[]{DefaultJobLeaderService.class.getSimpleName(), this.jobId, exception});
            } else {
                DefaultJobLeaderService.this.jobLeaderListener.handleError(exception);
            }
        }

        private final class JobManagerRegisteredRpcConnection
        extends RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> {
            JobManagerRegisteredRpcConnection(Logger log, String targetAddress, JobMasterId jobMasterId, Executor executor) {
                super(log, targetAddress, jobMasterId, executor);
            }

            @Override
            protected RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> generateRegistration() {
                return new JobManagerRetryingRegistration(LOG, DefaultJobLeaderService.this.rpcService, "JobManager", JobMasterGateway.class, this.getTargetAddress(), (JobMasterId)((Object)this.getTargetLeaderId()), DefaultJobLeaderService.this.retryingRegistrationConfiguration, DefaultJobLeaderService.this.ownerAddress, DefaultJobLeaderService.this.ownLocation, JobManagerLeaderListener.this.jobId);
            }

            @Override
            protected void onRegistrationSuccess(JMTMRegistrationSuccess success) {
                this.runIfValidRegistrationAttemptOrElse(() -> {
                    this.log.info("Successful registration at job manager {} for job {}.", (Object)this.getTargetAddress(), (Object)JobManagerLeaderListener.this.jobId);
                    DefaultJobLeaderService.this.jobLeaderListener.jobManagerGainedLeadership(JobManagerLeaderListener.this.jobId, (JobMasterGateway)this.getTargetGateway(), success);
                }, () -> this.log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", (Object)this.getTargetAddress(), this.getTargetLeaderId()));
            }

            @Override
            protected void onRegistrationRejection(JMTMRegistrationRejection rejection) {
                this.runIfValidRegistrationAttemptOrElse(() -> {
                    this.log.info("Rejected registration at job manager {} for job {}.", (Object)this.getTargetAddress(), (Object)JobManagerLeaderListener.this.jobId);
                    DefaultJobLeaderService.this.jobLeaderListener.jobManagerRejectedRegistration(JobManagerLeaderListener.this.jobId, this.getTargetAddress(), rejection);
                }, () -> this.log.debug("Encountered obsolete JobManager registration rejection {} from {} with leader session ID {}.", new Object[]{rejection, this.getTargetAddress(), this.getTargetLeaderId()}));
            }

            @Override
            protected void onRegistrationFailure(Throwable failure) {
                this.runIfValidRegistrationAttemptOrElse(() -> {
                    this.log.info("Failed to register at job  manager {} for job {}.", (Object)this.getTargetAddress(), (Object)JobManagerLeaderListener.this.jobId);
                    DefaultJobLeaderService.this.jobLeaderListener.handleError(failure);
                }, () -> this.log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", new Object[]{this.getTargetAddress(), this.getTargetLeaderId(), failure}));
            }

            private void runIfValidRegistrationAttemptOrElse(Runnable runIfValid, Runnable runIfInvalid) {
                if (Objects.equals(this.getTargetLeaderId(), (Object)JobManagerLeaderListener.this.getCurrentJobMasterId())) {
                    runIfValid.run();
                } else {
                    runIfInvalid.run();
                }
            }
        }
    }
}

