/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.drc.clusterclient.impl;

import com.taobao.drc.clusterclient.AbstractClusterMessage;
import com.taobao.drc.clusterclient.BaseClusterContext;
import com.taobao.drc.clusterclient.MessageListener;
import com.taobao.drc.clusterclient.MessageNotifier;
import com.taobao.drc.clusterclient.PartitionClientFactory;
import com.taobao.drc.clusterclient.coordinator.CoordinatorManager;
import com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinatorFactory;
import com.taobao.drc.clusterclient.impl.DefaultPartitionManager;
import com.taobao.drc.clusterclient.partition.BaseCheckpoint;
import com.taobao.drc.clusterclient.partition.IPartition;
import com.taobao.drc.clusterclient.partition.PartitionStateChangeListener;
import com.taobao.drc.clusterclient.util.NetworkUtils;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultClusterClientImpl<C extends BaseClusterContext, M extends AbstractClusterMessage<?, ? extends IPartition, ? extends BaseCheckpoint>, L extends MessageListener<M>, F extends PartitionClientFactory> {
    private static final String DEFAULT_LOCAL_IP = "127.0.0.1";
    private static final Logger logger = LoggerFactory.getLogger(DefaultClusterClientImpl.class);
    private final List<MessageNotifier<M>> notifiers;
    private final DefaultPartitionManager<C, MessageNotifier<M>> partitionManager;
    private boolean started = false;
    private boolean stopped = false;
    private final C context;

    public DefaultClusterClientImpl(C context, PartitionClientFactory<C, MessageNotifier<M>> partitionClientFactory, List<L> messageListeners, List<PartitionStateChangeListener> partitionStateChangeListeners, String version) {
        this(context, partitionClientFactory, messageListeners, partitionStateChangeListeners, version, CoordinatorManager.getInstance(true, new DefaultCoordinatorFactory(), ((BaseClusterContext)context).getThreadNamePrefix(), ((BaseClusterContext)context).getMaxConsumerNumPerCoordinator(), ((BaseClusterContext)context).getMaxIOThreadNumPerCoordinator()));
    }

    public DefaultClusterClientImpl(C context, PartitionClientFactory<C, MessageNotifier<M>> partitionClientFactory, List<L> messageListeners, List<PartitionStateChangeListener> partitionStateChangeListeners, String version, CoordinatorManager coordinatorManager) {
        if (messageListeners.isEmpty()) {
            throw new IllegalArgumentException("No listeners!");
        }
        this.context = context;
        this.notifiers = new ArrayList<MessageNotifier<M>>();
        IdentityHashMap<MessageListener, Boolean> listenerMap = new IdentityHashMap<MessageListener, Boolean>();
        for (MessageListener listener : messageListeners) {
            if (listenerMap.put(listener, true) == null) {
                this.notifiers.add(new MessageNotifier(listener, ((BaseClusterContext)context).getMaxSizeOfNotifyMessageQueue(), ((BaseClusterContext)context).getThreadNamePrefix()));
                continue;
            }
            logger.warn("Listener [{}] has been added multiple times", (Object)listener);
        }
        this.partitionManager = new DefaultPartitionManager<C, MessageNotifier<M>>(coordinatorManager, partitionClientFactory, context, version, this.getLocalIp(), this.notifiers, partitionStateChangeListeners);
    }

    private String getLocalIp() {
        try {
            return NetworkUtils.getLocalIp();
        }
        catch (Exception e) {
            logger.error("Failed to get local ip, fallback to [{}]", (Object)DEFAULT_LOCAL_IP, (Object)e);
            return DEFAULT_LOCAL_IP;
        }
    }

    public void start() throws ExecutionException, InterruptedException {
        if (!this.started) {
            for (MessageNotifier<M> notifier : this.notifiers) {
                notifier.start();
            }
            this.partitionManager.start();
            this.started = true;
            logger.info("Started cluster client for [{}][{}]", (Object)((BaseClusterContext)this.context).getAppGuid(), (Object)((BaseClusterContext)this.context).getAppGroup());
        } else {
            logger.warn("Cluster client for [{}][{}] has already been started", (Object)((BaseClusterContext)this.context).getAppGuid(), (Object)((BaseClusterContext)this.context).getAppGroup());
        }
    }

    public void stop() throws InterruptedException {
        if (!this.started) {
            logger.warn("Cluster client for [{}][{}] cannot be stopped before started", (Object)((BaseClusterContext)this.context).getAppGuid(), (Object)((BaseClusterContext)this.context).getAppGroup());
            return;
        }
        if (!this.stopped) {
            this.partitionManager.stop();
            for (MessageNotifier<M> notifier : this.notifiers) {
                notifier.shutdown();
            }
            this.partitionManager.waitForStop(60L);
            this.stopped = true;
            logger.info("Stopped cluster client for [{}][{}]", (Object)((BaseClusterContext)this.context).getAppGuid(), (Object)((BaseClusterContext)this.context).getAppGroup());
        } else {
            this.partitionManager.waitForStop(60L);
            logger.warn("Cluster client for [{}][{}] has already been stopped", (Object)((BaseClusterContext)this.context).getAppGuid(), (Object)((BaseClusterContext)this.context).getAppGroup());
        }
    }

    public void waitForStop(long timeLimitInSec) throws InterruptedException {
        if (!this.stopped) {
            logger.warn("Cluster client for [{}][{}] has not yet been stopped", (Object)((BaseClusterContext)this.context).getAppGuid(), (Object)((BaseClusterContext)this.context).getAppGroup());
            return;
        }
        this.partitionManager.waitForStop(timeLimitInSec);
    }
}

