package com.taobao.notify.remotingservice;

import com.taobao.notify.client.IOClientSelector;
import com.taobao.notify.client.manager.ClientSubscriptionManager;
import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.config.NotifyClientConfig;
import com.taobao.notify.diagnosis.manager.NotifyDiagnosisRecordManager;
import com.taobao.notify.remotingclient.addresses.AbstractMultiModeNSAddrListener;
import com.taobao.notify.remotingclient.addresses.MultiModeNSAddrDispatcherRegCenter;
import com.taobao.notify.remotingclient.addresses.NSAddressProcessor;
import com.taobao.notify.remotingclient.addresses.impl.DefaultMultiModeNSAddrDispatcherRegCenter;
import com.taobao.notify.remotingclient.addresses.impl.NSAddressLoadMode;
import com.taobao.notify.remotingclient.addresses.impl.listener.ConfigServerModeNSAddrListener;
import com.taobao.notify.remotingclient.addresses.impl.listener.LocalFileModeNSAddrListener;
import com.taobao.notify.remotingclient.addresses.impl.listener.RuntimeConfigModeNSAddrListener;
import com.taobao.notify.remotingclient.addresses.impl.processor.DefaultNSAddressProcessor;
import com.taobao.notify.remotingservice.responsitory.NewUrlManager;
import com.taobao.notify.remotingservice.responsitory.UrlManager;
import com.taobao.notify.subscription.Binding;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.task.TaskManager;
import com.taobao.notify.utils.threadpool.ManagedThreadPoolExecutor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/taobao/notify/remotingservice/DefaultRemotingService.class */
public class DefaultRemotingService implements RemotingService {
    static final Logger logger = Logger.getLogger(DefaultRemotingService.class);
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(DefaultRemotingService.class);
    final Map<String, NSAddressProcessor<String, List<String>>> csListeners;
    final UrlManager urlManager;
    private final MultiModeNSAddrDispatcherRegCenter<String, List<String>> addrRegCenter;
    private final ThreadPoolExecutor deliverMessageWorkTP;
    private final ThreadPoolExecutor checkMessageWorkTP;
    private final TaskManager taskManager;
    private NSAddressLoadMode nsAddressLoadMode;

    public DefaultRemotingService(RemotingType remotingType, NotifyGroupManager notifyGroupManager, NotifyClientConfig notifyClientConfig, MultiModeNSAddrDispatcherRegCenter<String, List<String>> multiModeNSAddrDispatcherRegCenter) {
        this.csListeners = new HashMap();
        this.taskManager = new TaskManager("notify-client-task-manager");
        this.addrRegCenter = multiModeNSAddrDispatcherRegCenter;
        this.nsAddressLoadMode = notifyClientConfig.getNSAddressLoadMode();
        this.deliverMessageWorkTP = new ManagedThreadPoolExecutor(notifyClientConfig.getMessageTPConfig().getCorePoolSize(), notifyClientConfig.getMessageTPConfig().getMaxPoolSize(), notifyClientConfig.getMessageTPConfig().getKeepAliveTime(), notifyClientConfig.getMessageTPConfig().getMaxQueueSize(), "msgWorkTP-" + hashCode(), new ThreadPoolExecutor.AbortPolicy());
        this.checkMessageWorkTP = new ManagedThreadPoolExecutor(notifyClientConfig.getCheckMessageTPConfig().getCorePoolSize(), notifyClientConfig.getCheckMessageTPConfig().getMaxPoolSize(), notifyClientConfig.getCheckMessageTPConfig().getKeepAliveTime(), notifyClientConfig.getCheckMessageTPConfig().getMaxQueueSize(), "checkMsgWorkTP-" + hashCode(), new ThreadPoolExecutor.AbortPolicy());
        if (RemotingType.NotifyRemoting == remotingType) {
            this.urlManager = new NewUrlManager(notifyGroupManager, this.taskManager, notifyClientConfig.getConnectionCount(), this.deliverMessageWorkTP, this.checkMessageWorkTP);
        } else {
            this.urlManager = new NewUrlManager(notifyGroupManager, this.taskManager, notifyClientConfig.getConnectionCount(), this.deliverMessageWorkTP, this.checkMessageWorkTP);
            logger.warn(LogPrefix + "无效的通信层参数，使用新的通信层");
        }
    }

    public DefaultRemotingService(RemotingType remotingType, NotifyGroupManager notifyGroupManager) {
        this(remotingType, notifyGroupManager, new NotifyClientConfig(), new DefaultMultiModeNSAddrDispatcherRegCenter());
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public synchronized void closeAllIOClients() {
        this.checkMessageWorkTP.shutdown();
        this.deliverMessageWorkTP.shutdown();
        this.urlManager.close();
        for (String str : this.csListeners.keySet()) {
            this.addrRegCenter.unregisteAddressListener(str);
            this.addrRegCenter.unregisteProcessCallback(str);
        }
        this.csListeners.clear();
        this.taskManager.close();
        showStatus();
    }

    public UrlManager getUrlManager() {
        return this.urlManager;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public synchronized void closeIOClients(String str, String str2, boolean z) {
        this.urlManager.deleteClientCfg(str, str2);
        if (!this.urlManager.isValidTopic(str)) {
            this.csListeners.remove(str);
            this.addrRegCenter.unregisteAddressListener(str);
            this.addrRegCenter.unregisteProcessCallback(str);
        }
        showStatus();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void sendSubscription(String str, String str2, ClientSubscriptionManager clientSubscriptionManager) {
        this.urlManager.addSendSubscriptionTask(str, str2, this, clientSubscriptionManager);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public boolean isMock() {
        return false;
    }

    private AbstractMultiModeNSAddrListener<String, List<String>> buildCustomizeNSAddrListener(String str, NSAddressLoadMode nSAddressLoadMode, MultiModeNSAddrDispatcherRegCenter<String, List<String>> multiModeNSAddrDispatcherRegCenter, boolean z) {
        switch (nSAddressLoadMode) {
            case ConfigServer:
                return new ConfigServerModeNSAddrListener(multiModeNSAddrDispatcherRegCenter, z);
            case LocalFile:
                return new LocalFileModeNSAddrListener(multiModeNSAddrDispatcherRegCenter);
            case RuntimeConfig:
                return new RuntimeConfigModeNSAddrListener(multiModeNSAddrDispatcherRegCenter);
            default:
                return new ConfigServerModeNSAddrListener(multiModeNSAddrDispatcherRegCenter, z);
        }
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public synchronized void createIOClients(String str, NotifyGroup notifyGroup) {
        NSAddressProcessor<String, List<String>> nSAddressProcessor = this.csListeners.get(str);
        if (null == nSAddressProcessor) {
            nSAddressProcessor = new DefaultNSAddressProcessor(this.urlManager, this.addrRegCenter);
            this.csListeners.put(str, nSAddressProcessor);
            AbstractMultiModeNSAddrListener<String, List<String>> buildCustomizeNSAddrListener = buildCustomizeNSAddrListener(str, this.nsAddressLoadMode, this.addrRegCenter, notifyGroup.isCacheable());
            NotifyDiagnosisRecordManager.getInstance().recordConnectStart(str);
            nSAddressProcessor.start(str);
            buildCustomizeNSAddrListener.start(str);
        }
        this.urlManager.addClientCfg(str, notifyGroup.getGroupId());
        synchronized (nSAddressProcessor) {
            while (!nSAddressProcessor.isHandled()) {
                try {
                    nSAddressProcessor.wait(notifyGroup.getWaitForConnTime());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (!nSAddressProcessor.isHandled()) {
                    NotifyDiagnosisRecordManager.getInstance().recordConnectConfigError(str);
                    throw new IllegalStateException("等待 " + notifyGroup.getWaitForConnTime() + " 毫秒超时,可能原因是您所在的本地hosts.xml文件配置或者ConfigServer环境中，目前没有NotifyServer支持Topic[" + str + "]");
                    break;
                }
            }
        }
        this.urlManager.createClients(str);
        showStatus();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public WrappedIOClient getIOClient(String str, String str2) throws RuntimeException {
        if (null == this.csListeners.get(str)) {
            String str3 = "没有向配置中心订阅TOPIC[" + str + "]对应的NS地址.";
            logger.warn(LogPrefix + str3);
            throw new RuntimeException(str3);
        }
        WrappedIOClient wrappedIOClient = null;
        for (int i = 0; i < 3 && null == wrappedIOClient; i++) {
            wrappedIOClient = this.urlManager.selectClient(str, str2);
        }
        if (null != wrappedIOClient) {
            return wrappedIOClient;
        }
        throw new RuntimeException("对应的nsServersTopic中没有可用的连接，nsServersTopic为：" + str);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getUrlCount(String str) {
        return this.urlManager.getUrls(str).size();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public List<String> getUrls(String str) {
        return this.urlManager.getUrls(str);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void openSubscription(Binding binding) {
        this.urlManager.removeCloseSubscription(binding);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void closeSubscription(Binding binding) {
        this.urlManager.addCloseSubscription(binding);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public Set<Binding> getClosedSubscriptions() {
        return this.urlManager.getClosedSubscriptions();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getConnectionCount() {
        return this.urlManager.getConnectionCount();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setConnectionCount(int i) {
        this.urlManager.setConnectionCount(i);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setMessageTPCorePoolSize(int i) {
        this.deliverMessageWorkTP.setCorePoolSize(i);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setMessageTPMaximumPoolSize(int i) {
        this.deliverMessageWorkTP.setMaximumPoolSize(i);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setMessageTPKeepAliveTime(long j) {
        this.deliverMessageWorkTP.setKeepAliveTime(j, TimeUnit.MILLISECONDS);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setCheckMessageTPCorePoolSize(int i) {
        this.checkMessageWorkTP.setCorePoolSize(i);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setCheckMessageTPMaximumPoolSize(int i) {
        this.checkMessageWorkTP.setMaximumPoolSize(i);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setCheckMessageTPKeepAliveTime(long j) {
        this.checkMessageWorkTP.setKeepAliveTime(j, TimeUnit.MILLISECONDS);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getMessageTPCorePoolSize() {
        return this.deliverMessageWorkTP.getCorePoolSize();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getMessageTPMaximumPoolSize() {
        return this.deliverMessageWorkTP.getMaximumPoolSize();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public long getMessageTPKeepAliveTime() {
        return this.deliverMessageWorkTP.getKeepAliveTime(TimeUnit.MILLISECONDS);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getCheckMessageTPCorePoolSize() {
        return this.checkMessageWorkTP.getCorePoolSize();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public int getCheckMessageTPMaximumPoolSize() {
        return this.checkMessageWorkTP.getMaximumPoolSize();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public long getCheckMessageTPKeepAliveTime() {
        return this.checkMessageWorkTP.getKeepAliveTime(TimeUnit.MILLISECONDS);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public IOClientSelector getIoClientSelector() {
        return this.urlManager.getIoClientSelector();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void awaitReadyInterruptibly() throws InterruptedException {
        this.taskManager.await();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void awaitReadyInterruptibly(long j, TimeUnit timeUnit) throws InterruptedException {
        this.taskManager.await(j, timeUnit);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setIoClientSelector(IOClientSelector iOClientSelector) {
        this.urlManager.setIoClientSelector(iOClientSelector);
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void showStatus() {
        if (logger.isInfoEnabled()) {
            logger.info(LogPrefix + "csListeners size is: " + this.csListeners.size());
            logger.info(LogPrefix + "csListeners is: " + this.csListeners);
        }
        this.urlManager.showStatus();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public void setNSAddressLoadMode(NSAddressLoadMode nSAddressLoadMode) {
        this.nsAddressLoadMode = nSAddressLoadMode;
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public Map<String, Set<String>> getSnapshotOfTopics2Group() {
        return this.urlManager.getSnapshotOfTopics2Group();
    }

    @Override // com.taobao.notify.remotingservice.RemotingService
    public Map<String, List<String>> getSnapshotOfTopics2ServerUrl() {
        return this.urlManager.getSnapshotOfTopics2ServerUrl();
    }
}
