/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderretrieval;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.UUID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperLeaderRetrievalService
implements LeaderRetrievalService,
NodeCacheListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderRetrievalService.class);
    private final CuratorFramework client;
    private final NodeCache cache;
    private volatile LeaderRetrievalListener leaderListener;
    private String lastLeaderAddress;
    private UUID lastLeaderSessionID;
    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener(){

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            ZooKeeperLeaderRetrievalService.this.handleStateChange(newState);
        }
    };

    public ZooKeeperLeaderRetrievalService(CuratorFramework client, String retrievalPath) {
        this.client = client;
        this.cache = new NodeCache(client, retrievalPath);
    }

    @Override
    public void start(LeaderRetrievalListener listener) throws Exception {
        Preconditions.checkNotNull((Object)listener, (String)"Listener must not be null.");
        Preconditions.checkState((this.leaderListener == null ? 1 : 0) != 0, (Object)"ZooKeeperLeaderRetrievalService can only be started once.");
        LOG.info("Starting ZooKeeperLeaderRetrievalService.");
        this.leaderListener = listener;
        this.cache.getListenable().addListener(this);
        this.cache.start();
        this.client.getConnectionStateListenable().addListener(this.connectionStateListener);
    }

    @Override
    public void stop() throws Exception {
        LOG.info("Stopping ZooKeeperLeaderRetrievalService.");
        this.client.getConnectionStateListenable().removeListener(this.connectionStateListener);
        this.cache.close();
        this.client.close();
    }

    @Override
    public void nodeChanged() throws Exception {
        try {
            UUID leaderSessionID;
            String leaderAddress;
            LOG.debug("Leader node has changed.");
            ChildData childData = this.cache.getCurrentData();
            if (childData == null) {
                leaderAddress = null;
                leaderSessionID = null;
            } else {
                byte[] data = childData.getData();
                if (data == null || data.length == 0) {
                    leaderAddress = null;
                    leaderSessionID = null;
                } else {
                    ByteArrayInputStream bais = new ByteArrayInputStream(data);
                    ObjectInputStream ois = new ObjectInputStream(bais);
                    leaderAddress = ois.readUTF();
                    leaderSessionID = (UUID)ois.readObject();
                }
            }
            if (!Objects.equals(leaderAddress, this.lastLeaderAddress) || !Objects.equals(leaderSessionID, this.lastLeaderSessionID)) {
                LOG.debug("New leader information: Leader={}, session ID={}.", leaderAddress, leaderSessionID);
                this.lastLeaderAddress = leaderAddress;
                this.lastLeaderSessionID = leaderSessionID;
                this.leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
            }
        }
        catch (Exception e) {
            this.leaderListener.handleError(new Exception("Could not handle node changed event.", e));
            throw e;
        }
    }

    protected void handleStateChange(ConnectionState newState) {
        switch (newState) {
            case CONNECTED: {
                LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
                break;
            }
            case SUSPENDED: {
                LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.");
                break;
            }
            case RECONNECTED: {
                LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
                break;
            }
            case LOST: {
                LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from ZooKeeper.");
            }
        }
    }
}

