/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.LocalAwareExecutorService;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrationCoordinator {
    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
    private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
    private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null);
    private static final int MIGRATION_DELAY_IN_MS = 60000;
    private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
    public static final MigrationCoordinator instance = new MigrationCoordinator();
    public static final String IGNORED_VERSIONS_PROP = "cassandra.skip_schema_check_for_versions";
    public static final String IGNORED_ENDPOINTS_PROP = "cassandra.skip_schema_check_for_endpoints";
    private static final Set<UUID> IGNORED_VERSIONS = MigrationCoordinator.getIgnoredVersions();
    private final Map<UUID, VersionInfo> versionInfo = new HashMap<UUID, VersionInfo>();
    private final Map<InetAddress, UUID> endpointVersions = new HashMap<InetAddress, UUID>();
    private final Set<InetAddress> ignoredEndpoints = MigrationCoordinator.getIgnoredEndpoints();

    private static ImmutableSet<UUID> getIgnoredVersions() {
        String s = System.getProperty(IGNORED_VERSIONS_PROP);
        if (s == null || s.isEmpty()) {
            return ImmutableSet.of();
        }
        ImmutableSet.Builder versions = ImmutableSet.builder();
        for (String version : s.split(",")) {
            versions.add((Object)UUID.fromString(version));
        }
        return versions.build();
    }

    private static Set<InetAddress> getIgnoredEndpoints() {
        HashSet<InetAddress> endpoints = new HashSet<InetAddress>();
        String s = System.getProperty(IGNORED_ENDPOINTS_PROP);
        if (s == null || s.isEmpty()) {
            return endpoints;
        }
        for (String endpoint : s.split(",")) {
            try {
                endpoints.add(InetAddress.getByName(endpoint));
            }
            catch (UnknownHostException e) {
                throw new RuntimeException(e);
            }
        }
        return endpoints;
    }

    public void start() {
        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1L, 1L, TimeUnit.MINUTES);
    }

    public synchronized void reset() {
        this.versionInfo.clear();
    }

    synchronized List<Future<Void>> pullUnreceivedSchemaVersions() {
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (VersionInfo info : this.versionInfo.values()) {
            Future<Void> future;
            if (info.wasReceived() || info.outstandingRequests.size() > 0 || (future = this.maybePullSchema(info)) == null || future == FINISHED_FUTURE) continue;
            futures.add(future);
        }
        return futures;
    }

    synchronized Future<Void> maybePullSchema(VersionInfo info) {
        if (info.endpoints.isEmpty() || info.wasReceived() || !this.shouldPullSchema(info.version)) {
            return FINISHED_FUTURE;
        }
        if (info.outstandingRequests.size() >= this.getMaxOutstandingVersionRequests()) {
            return FINISHED_FUTURE;
        }
        int isize = info.requestQueue.size();
        for (int i = 0; i < isize; ++i) {
            InetAddress endpoint = info.requestQueue.remove();
            if (!info.endpoints.contains(endpoint)) continue;
            if (this.shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint)) {
                return this.scheduleSchemaPull(endpoint, info);
            }
            info.requestQueue.offer(endpoint);
        }
        return null;
    }

    public synchronized Map<UUID, Set<InetAddress>> outstandingVersions() {
        HashMap<UUID, Set<InetAddress>> map = new HashMap<UUID, Set<InetAddress>>();
        for (VersionInfo info : this.versionInfo.values()) {
            if (info.wasReceived()) continue;
            map.put(info.version, (Set<InetAddress>)ImmutableSet.copyOf(info.endpoints));
        }
        return map;
    }

    @VisibleForTesting
    protected VersionInfo getVersionInfoUnsafe(UUID version) {
        return this.versionInfo.get(version);
    }

    @VisibleForTesting
    protected int getMaxOutstandingVersionRequests() {
        return 3;
    }

    @VisibleForTesting
    protected boolean isAlive(InetAddress endpoint) {
        return FailureDetector.instance.isAlive(endpoint);
    }

    @VisibleForTesting
    protected boolean shouldPullSchema(UUID version) {
        if (Schema.instance.getVersion() == null) {
            logger.debug("Not pulling schema for version {}, because local schama version is not known yet", (Object)version);
            return false;
        }
        if (Schema.instance.isSameVersion(version)) {
            logger.debug("Not pulling schema for version {}, because schema versions match: local/real={}, local/compatible={}, remote={}", new Object[]{version, Schema.schemaVersionToString(Schema.instance.getRealVersion()), Schema.schemaVersionToString(Schema.instance.getAltVersion()), Schema.schemaVersionToString(version)});
            return false;
        }
        return true;
    }

    private static boolean is30Compatible(int version) {
        return version == MessagingService.current_version || version == 11;
    }

    @VisibleForTesting
    protected boolean shouldPullFromEndpoint(InetAddress endpoint) {
        if (endpoint.equals(FBUtilities.getBroadcastAddress())) {
            return false;
        }
        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
        if (state == null) {
            return false;
        }
        String releaseVersion = state.getApplicationState((ApplicationState)ApplicationState.RELEASE_VERSION).value;
        String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
        if (!releaseVersion.startsWith(ourMajorVersion)) {
            logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}", new Object[]{endpoint, ourMajorVersion, releaseVersion});
            return false;
        }
        if (!MessagingService.instance().knowsVersion(endpoint)) {
            logger.debug("Not pulling schema from {} because their messaging version is unknown", (Object)endpoint);
            return false;
        }
        if (!MigrationCoordinator.is30Compatible(MessagingService.instance().getRawVersion(endpoint))) {
            logger.debug("Not pulling schema from {} because their schema format is incompatible", (Object)endpoint);
            return false;
        }
        if (Gossiper.instance.isGossipOnlyMember(endpoint)) {
            logger.debug("Not pulling schema from {} because it's a gossip only member", (Object)endpoint);
            return false;
        }
        return true;
    }

    @VisibleForTesting
    protected boolean shouldPullImmediately(InetAddress endpoint, UUID version) {
        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < 60000L) {
            logger.debug("Immediately submitting migration task for {}, schema versions: local/real={}, local/compatible={}, remote={}", new Object[]{endpoint, Schema.schemaVersionToString(Schema.instance.getRealVersion()), Schema.schemaVersionToString(Schema.instance.getAltVersion()), Schema.schemaVersionToString(version)});
            return true;
        }
        return false;
    }

    @VisibleForTesting
    protected boolean isLocalVersion(UUID version) {
        return Schema.instance.isSameVersion(version);
    }

    synchronized boolean shouldApplySchemaFor(VersionInfo info) {
        if (info.wasReceived()) {
            return false;
        }
        return !this.isLocalVersion(info.version);
    }

    synchronized Future<Void> reportEndpointVersion(InetAddress endpoint, UUID version) {
        if (this.ignoredEndpoints.contains(endpoint) || IGNORED_VERSIONS.contains(version)) {
            this.endpointVersions.remove(endpoint);
            this.removeEndpointFromVersion(endpoint, null);
            return FINISHED_FUTURE;
        }
        UUID current = this.endpointVersions.put(endpoint, version);
        if (current != null && current.equals(version)) {
            return FINISHED_FUTURE;
        }
        VersionInfo info = this.versionInfo.computeIfAbsent(version, VersionInfo::new);
        if (this.isLocalVersion(version)) {
            info.markReceived();
        }
        info.endpoints.add(endpoint);
        info.requestQueue.addFirst(endpoint);
        this.removeEndpointFromVersion(endpoint, current);
        return this.maybePullSchema(info);
    }

    Future<Void> reportEndpointVersion(InetAddress endpoint, EndpointState state) {
        if (state == null) {
            return FINISHED_FUTURE;
        }
        UUID version = state.getSchemaVersion();
        if (version == null) {
            return FINISHED_FUTURE;
        }
        return this.reportEndpointVersion(endpoint, version);
    }

    private synchronized void removeEndpointFromVersion(InetAddress endpoint, UUID version) {
        if (version == null) {
            return;
        }
        VersionInfo info = this.versionInfo.get(version);
        if (info == null) {
            return;
        }
        info.endpoints.remove(endpoint);
        if (info.endpoints.isEmpty()) {
            info.waitQueue.signalAll();
            this.versionInfo.remove(version);
        }
    }

    public synchronized void removeAndIgnoreEndpoint(InetAddress endpoint) {
        Preconditions.checkArgument((endpoint != null ? 1 : 0) != 0);
        this.ignoredEndpoints.add(endpoint);
        ImmutableSet versions = ImmutableSet.copyOf(this.versionInfo.keySet());
        for (UUID version : versions) {
            this.removeEndpointFromVersion(endpoint, version);
        }
    }

    Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info) {
        FutureTask<Object> task = new FutureTask<Object>(() -> this.pullSchema(new Callback(endpoint, info)), null);
        if (this.shouldPullImmediately(endpoint, info.version)) {
            MigrationCoordinator.submitToMigrationIfNotShutdown(task);
        } else {
            ScheduledExecutors.nonPeriodicTasks.schedule(() -> MigrationCoordinator.submitToMigrationIfNotShutdown(task), 60000L, TimeUnit.MILLISECONDS);
        }
        return task;
    }

    private static Future<?> submitToMigrationIfNotShutdown(Runnable task) {
        LocalAwareExecutorService stage = StageManager.getStage(Stage.MIGRATION);
        if (stage.isShutdown() || stage.isTerminated()) {
            logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
            return null;
        }
        return stage.submit(task);
    }

    @VisibleForTesting
    protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations) {
        SchemaKeyspace.mergeSchemaAndAnnounceVersion(mutations);
    }

    private void pullSchema(Callback callback) {
        if (!this.isAlive(callback.endpoint)) {
            logger.warn("Can't send schema pull request: node {} is down.", (Object)callback.endpoint);
            callback.fail();
            return;
        }
        if (!this.shouldPullFromEndpoint(callback.endpoint)) {
            logger.info("Skipped sending a migration request: node {} has a higher major version now.", (Object)callback.endpoint);
            callback.fail();
            return;
        }
        logger.debug("Requesting schema from {}", (Object)callback.endpoint);
        this.sendMigrationMessage(callback);
    }

    protected void sendMigrationMessage(Callback callback) {
        MessageOut<Collection<Mutation>> message = new MessageOut<Collection<Mutation>>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
        logger.info("Sending schema pull request to {} at {} with timeout {}", new Object[]{callback.endpoint, System.currentTimeMillis(), message.getTimeout()});
        MessagingService.instance().sendRR(message, callback.endpoint, callback, message.getTimeout(), true);
    }

    private synchronized Future<Void> pullComplete(InetAddress endpoint, VersionInfo info, boolean wasSuccessful) {
        if (wasSuccessful) {
            info.markReceived();
        }
        info.outstandingRequests.remove(endpoint);
        info.requestQueue.add(endpoint);
        return this.maybePullSchema(info);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean awaitSchemaRequests(long waitMillis) {
        if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) {
            Gossiper.waitToSettle();
        }
        WaitQueue.Signal signal = null;
        try {
            MigrationCoordinator migrationCoordinator = this;
            synchronized (migrationCoordinator) {
                ArrayList<WaitQueue.Signal> signalList;
                block13: {
                    signalList = new ArrayList<WaitQueue.Signal>(this.versionInfo.size());
                    for (VersionInfo version : this.versionInfo.values()) {
                        if (version.wasReceived()) continue;
                        signalList.add(version.register());
                    }
                    if (!signalList.isEmpty()) break block13;
                    boolean bl = true;
                    return bl;
                }
                WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()];
                signalList.toArray(signals);
                signal = WaitQueue.all(signals);
            }
            boolean bl = signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis));
            return bl;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        finally {
            if (signal != null) {
                signal.cancel();
            }
        }
    }

    class Callback
    implements IAsyncCallbackWithFailure<Collection<Mutation>> {
        final InetAddress endpoint;
        final VersionInfo info;

        public Callback(InetAddress endpoint, VersionInfo info) {
            this.endpoint = endpoint;
            this.info = info;
        }

        @Override
        public void onFailure(InetAddress from, RequestFailureReason failureReason) {
            this.fail();
        }

        Future<Void> fail() {
            return MigrationCoordinator.this.pullComplete(this.endpoint, this.info, false);
        }

        @Override
        public void response(MessageIn<Collection<Mutation>> message) {
            this.response((Collection)message.payload);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Future<Void> response(Collection<Mutation> mutations) {
            VersionInfo versionInfo = this.info;
            synchronized (versionInfo) {
                if (MigrationCoordinator.this.shouldApplySchemaFor(this.info)) {
                    try {
                        logger.debug("Pulled schema from endpoint {};", (Object)this.endpoint);
                        MigrationCoordinator.this.mergeSchemaFrom(this.endpoint, mutations);
                    }
                    catch (Exception e) {
                        logger.error(String.format("Unable to merge schema from %s", this.endpoint), (Throwable)e);
                        return this.fail();
                    }
                }
                return MigrationCoordinator.this.pullComplete(this.endpoint, this.info, true);
            }
        }

        @Override
        public boolean isLatencyForSnitch() {
            return false;
        }
    }

    static class VersionInfo {
        final UUID version;
        final Set<InetAddress> endpoints = Sets.newConcurrentHashSet();
        final Set<InetAddress> outstandingRequests = Sets.newConcurrentHashSet();
        final Deque<InetAddress> requestQueue = new ArrayDeque<InetAddress>();
        private final WaitQueue waitQueue = new WaitQueue();
        volatile boolean receivedSchema;

        VersionInfo(UUID version) {
            this.version = version;
        }

        WaitQueue.Signal register() {
            return this.waitQueue.register();
        }

        void markReceived() {
            if (this.receivedSchema) {
                return;
            }
            this.receivedSchema = true;
            this.waitQueue.signalAll();
        }

        boolean wasReceived() {
            return this.receivedSchema;
        }
    }
}

