package com.taobao.drc.clusterclient.impl;

import com.taobao.drc.clusterclient.AbstractClusterMessage;
import com.taobao.drc.clusterclient.BaseClusterContext;
import com.taobao.drc.clusterclient.MessageListener;
import com.taobao.drc.clusterclient.MessageNotifier;
import com.taobao.drc.clusterclient.PartitionClientFactory;
import com.taobao.drc.clusterclient.coordinator.CoordinatorManager;
import com.taobao.drc.clusterclient.coordinator.impl.DefaultCoordinatorFactory;
import com.taobao.drc.clusterclient.partition.BaseCheckpoint;
import com.taobao.drc.clusterclient.partition.IPartition;
import com.taobao.drc.clusterclient.partition.PartitionStateChangeListener;
import com.taobao.drc.clusterclient.util.NetworkUtils;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/taobao/drc/clusterclient/impl/DefaultClusterClientImpl.class */
public class DefaultClusterClientImpl<C extends BaseClusterContext, M extends AbstractClusterMessage<?, ? extends IPartition, ? extends BaseCheckpoint>, L extends MessageListener<M>, F extends PartitionClientFactory> {
    private static final String DEFAULT_LOCAL_IP = "127.0.0.1";
    private static final Logger logger = LoggerFactory.getLogger(DefaultClusterClientImpl.class);
    private final List<MessageNotifier<M>> notifiers;
    private final DefaultPartitionManager<C, MessageNotifier<M>> partitionManager;
    private boolean started;
    private boolean stopped;
    private final C context;

    public DefaultClusterClientImpl(C c, PartitionClientFactory<C, MessageNotifier<M>> partitionClientFactory, List<L> list, List<PartitionStateChangeListener> list2, String str) {
        this(c, partitionClientFactory, list, list2, str, CoordinatorManager.getInstance(true, new DefaultCoordinatorFactory(), c.getThreadNamePrefix(), c.getMaxConsumerNumPerCoordinator(), c.getMaxIOThreadNumPerCoordinator()));
    }

    public DefaultClusterClientImpl(C c, PartitionClientFactory<C, MessageNotifier<M>> partitionClientFactory, List<L> list, List<PartitionStateChangeListener> list2, String str, CoordinatorManager coordinatorManager) {
        this.started = false;
        this.stopped = false;
        if (list.isEmpty()) {
            throw new IllegalArgumentException("DefaultClusterClientImpl No listeners!");
        }
        this.context = c;
        this.notifiers = new ArrayList();
        IdentityHashMap identityHashMap = new IdentityHashMap();
        for (L l : list) {
            if (identityHashMap.put(l, true) == null) {
                this.notifiers.add(new MessageNotifier<>(l, c.getMaxSizeOfNotifyMessageQueue(), c.getThreadNamePrefix(), c.isNotifierFair()));
            } else {
                logger.warn("Listener [{}] has been added multiple times", l);
            }
        }
        logger.debug("DefaultClusterClientImpl:messageListeners num:[{}]", Integer.valueOf(list.size()));
        this.partitionManager = new DefaultPartitionManager<>(coordinatorManager, partitionClientFactory, c, str, getLocalIp(), this.notifiers, list2);
    }

    private String getLocalIp() {
        try {
            return NetworkUtils.getLocalIp();
        } catch (Exception e) {
            logger.error("Failed to get local ip, fallback to [{}]", DEFAULT_LOCAL_IP, e);
            return DEFAULT_LOCAL_IP;
        }
    }

    public void start() throws ExecutionException, InterruptedException {
        if (this.started) {
            logger.warn("Cluster client for [{}][{}] has already been started", this.context.getAppGuid(), this.context.getAppGroup());
            return;
        }
        Iterator<MessageNotifier<M>> it = this.notifiers.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        this.partitionManager.start();
        this.started = true;
    }

    public void stop() throws InterruptedException {
        if (!this.started) {
            logger.warn("Cluster client for [{}][{}] cannot be stopped before started", this.context.getAppGuid(), this.context.getAppGroup());
            return;
        }
        if (this.stopped) {
            this.partitionManager.waitForStop(60L);
            logger.warn("Cluster client for [{}][{}] has already been stopped", this.context.getAppGuid(), this.context.getAppGroup());
            return;
        }
        this.partitionManager.stop();
        Iterator<MessageNotifier<M>> it = this.notifiers.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.partitionManager.waitForStop(60L);
        this.stopped = true;
        logger.info("Stopped cluster client for [{}][{}]", this.context.getAppGuid(), this.context.getAppGroup());
    }

    public void waitForStop(long j) throws InterruptedException {
        if (this.stopped) {
            this.partitionManager.waitForStop(j);
        } else {
            logger.warn("Cluster client for [{}][{}] has not yet been stopped", this.context.getAppGuid(), this.context.getAppGroup());
        }
    }
}
