package com.taobao.notify.remotingservice.responsitory;

import com.taobao.gecko.core.command.Constants;
import com.taobao.gecko.service.RemotingClient;
import com.taobao.gecko.service.RemotingContext;
import com.taobao.gecko.service.RemotingFactory;
import com.taobao.gecko.service.config.ClientConfig;
import com.taobao.gecko.service.exception.NotifyRemotingException;
import com.taobao.notify.client.IOClientSelector;
import com.taobao.notify.client.exception.NotifyClientException;
import com.taobao.notify.client.impl.RoomPriorityIOClientSelector;
import com.taobao.notify.client.manager.ClientSubscriptionManager;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.clientinfo.ClientInfo;
import com.taobao.notify.clientinfo.ControlInfo;
import com.taobao.notify.diagnosis.infobean.ConnectServerResultInfo;
import com.taobao.notify.diagnosis.manager.NotifyDiagnosisRecordManager;
import com.taobao.notify.remoting.core.command.request.CheckMessageCommand;
import com.taobao.notify.remoting.core.command.request.DeliverMessageCommand;
import com.taobao.notify.remoting.core.command.request.DeliverRawMessageCommand;
import com.taobao.notify.remoting.service.config.NotifyWireFormatType;
import com.taobao.notify.remotingclient.impl.NewReconnectionListener;
import com.taobao.notify.remotingclient.impl.NewSendCloseSubscriptionTaskProcessor;
import com.taobao.notify.remotingclient.impl.NewSendControllInfoTaskProcessor;
import com.taobao.notify.remotingclient.impl.NewSendOpenSubscriptionTaskProcessor;
import com.taobao.notify.remotingclient.impl.SendCloseSubscriptionTask;
import com.taobao.notify.remotingclient.impl.SendControllInfoTask;
import com.taobao.notify.remotingclient.impl.SendOpenSubscriptionTask;
import com.taobao.notify.remotingclient.impl.SendSubscriptionTask;
import com.taobao.notify.remotingclient.impl.SendSubscriptionTaskProcessor;
import com.taobao.notify.remotingclient.processor.DeliverMessageProcessor;
import com.taobao.notify.remotingclient.processor.DeliverRawMessageProcessor;
import com.taobao.notify.remotingclient.processor.NewCheckMessageProcessor;
import com.taobao.notify.remotingservice.NewWrappedIOClient;
import com.taobao.notify.remotingservice.RemotingService;
import com.taobao.notify.remotingservice.WrappedIOClient;
import com.taobao.notify.subscription.Binding;
import com.taobao.notify.tools.ClientUtils;
import com.taobao.notify.tools.DataIdTools;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.task.TaskManager;
import com.taobao.notify.utils.task.TaskProcessor;
import com.taobao.notify.utils.threadpool.ManagedThreadPoolExecutor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/taobao/notify/remotingservice/responsitory/NewUrlManager.class */
public class NewUrlManager implements UrlManager {
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(NewUrlManager.class);
    private static final Logger log = Logger.getLogger(NewUrlManager.class);
    private final TaskManager taskManager;
    private final NotifyGroupManager notifyGroupManager;
    private volatile int connectionCount;
    private static final int CLIENT_CHECK_INTERVAL = 10;
    private ScheduledExecutorService clientCheckTP;
    private RemotingClient remotingClient = null;
    private NewWrappedIOClientFactory ioClientFactory = null;
    private volatile IOClientSelector ioClientSelector = new RoomPriorityIOClientSelector();
    private final Set<Binding> closedSubscriptions = new CopyOnWriteArraySet();
    private final TaskProcessor sendControllInfoTaskProcessor = new NewSendControllInfoTaskProcessor();
    private final TaskProcessor sendSubscriptionTaskProcessor = new SendSubscriptionTaskProcessor();
    private final TaskProcessor sendOpenSubscriptionTaskProcessor = new NewSendOpenSubscriptionTaskProcessor();
    private final TaskProcessor sendCloseSubscriptionTaskProcessor = new NewSendCloseSubscriptionTaskProcessor();
    private final ConcurrentHashMap<String, Set<String>> groupsOfTopics = new ConcurrentHashMap<>();
    private final ServerUrlResponsitory serverUrlResponsitory = new ServerUrlResponsitory();
    private final ControlInfoResponsitory controlInfoResponsitory = new ControlInfoResponsitory();
    private volatile long connectionTimeout = 5000;
    private final Map<Binding, Long> opBindingMap = new ConcurrentHashMap();
    private ThreadPoolExecutor connectWorkTP = null;
    private ConcurrentHashMap<String, List<String>> connectedClientUrls = new ConcurrentHashMap<>();

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void close() {
        log.info(LogPrefix + "......关闭所有连接开始......");
        Iterator<String> it = this.serverUrlResponsitory.getAllNsServersTopic().iterator();
        while (it.hasNext()) {
            closeClients(it.next(), true);
        }
        if (null != this.remotingClient) {
            try {
                this.remotingClient.stop();
            } catch (NotifyRemotingException e) {
                log.error(LogPrefix + "关闭remotingClient出错", e);
            }
        }
        this.groupsOfTopics.clear();
        this.controlInfoResponsitory.clear();
        log.info(LogPrefix + "......关闭所有连接结束......");
        if (this.clientCheckTP != null) {
            this.clientCheckTP.shutdown();
        }
        if (this.connectWorkTP != null) {
            this.connectWorkTP.shutdown();
        }
    }

    public NewUrlManager(NotifyGroupManager notifyGroupManager, TaskManager taskManager, int i, ThreadPoolExecutor threadPoolExecutor, ThreadPoolExecutor threadPoolExecutor2) {
        this.connectionCount = 1;
        this.notifyGroupManager = notifyGroupManager;
        this.connectionCount = i;
        this.taskManager = taskManager;
        init(threadPoolExecutor, threadPoolExecutor2);
    }

    public TaskManager getTaskManager() {
        return this.taskManager;
    }

    private void init(ThreadPoolExecutor threadPoolExecutor, ThreadPoolExecutor threadPoolExecutor2) {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setConnectTimeout(this.connectionTimeout);
        clientConfig.setWireFormatType(new NotifyWireFormatType());
        try {
            this.remotingClient = RemotingFactory.connect(clientConfig);
            this.remotingClient.addConnectionLifeCycleListener(new NewReconnectionListener(this));
            HashMap hashMap = new HashMap();
            hashMap.put(DeliverMessageCommand.class, new DeliverMessageProcessor(this.notifyGroupManager, threadPoolExecutor));
            hashMap.put(CheckMessageCommand.class, new NewCheckMessageProcessor(this.notifyGroupManager, threadPoolExecutor2));
            hashMap.put(DeliverRawMessageCommand.class, new DeliverRawMessageProcessor(this.notifyGroupManager, threadPoolExecutor));
            if (ClientUtils.isTestModel()) {
                hashMap.putAll(ClientUtils.getNewTestProcessor());
            }
            this.remotingClient.addAllProcessors(hashMap);
            this.ioClientFactory = new NewWrappedIOClientFactory(this.remotingClient);
            this.clientCheckTP = Executors.newSingleThreadScheduledExecutor();
            startClientUrlCheck();
            initConnectWorkTP();
        } catch (NotifyRemotingException e) {
            log.error(LogPrefix + "启动通信层失败", e);
            throw new NotifyClientException("启动通信层失败", e);
        }
    }

    private void initConnectWorkTP() {
        this.connectWorkTP = new ManagedThreadPoolExecutor(5, 10, 60L, 1000, "Notify-Connect-pool", new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private void startClientUrlCheck() {
        this.clientCheckTP.scheduleWithFixedDelay(new Runnable() { // from class: com.taobao.notify.remotingservice.responsitory.NewUrlManager.1
            @Override // java.lang.Runnable
            public void run() {
                NewUrlManager.this.checkClient();
            }
        }, 0L, 10L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkClient() {
        Map<String, List<String>> snapshotOfTopics2ServerUrl = this.serverUrlResponsitory.getSnapshotOfTopics2ServerUrl();
        ConcurrentHashMap<String, List<String>> concurrentHashMap = new ConcurrentHashMap<>();
        for (Map.Entry<String, List<String>> entry : snapshotOfTopics2ServerUrl.entrySet()) {
            for (String str : entry.getValue()) {
                if (this.remotingClient.isConnected(str)) {
                    List<String> list = concurrentHashMap.get(entry.getKey());
                    if (list == null) {
                        list = new CopyOnWriteArrayList();
                        concurrentHashMap.put(entry.getKey(), list);
                    }
                    list.add(str);
                }
            }
        }
        this.connectedClientUrls = concurrentHashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.util.Set] */
    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void createClients(String str, List<String> list, boolean z) {
        if (z) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                this.serverUrlResponsitory.addUrl(str, it.next());
            }
        }
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        CopyOnWriteArraySet<String> copyOnWriteArraySet2 = (Set) this.groupsOfTopics.putIfAbsent(str, copyOnWriteArraySet);
        if (null == copyOnWriteArraySet2) {
            copyOnWriteArraySet2 = copyOnWriteArraySet;
        }
        for (String str2 : list) {
            synchronized (copyOnWriteArraySet2) {
                HashMap hashMap = new HashMap();
                for (String str3 : copyOnWriteArraySet2) {
                    hashMap.put(str3, new ConnectResult(this.connectWorkTP.submit(new ConnectTask(this.remotingClient, str2, this.connectionCount)), addClientInfo(str2, this.notifyGroupManager.getGroup(str3).getClientInfo())));
                }
                checkConnectResult(str2, str, hashMap);
            }
        }
    }

    private void checkConnectResult(String str, String str2, Map<String, ConnectResult> map) {
        for (Map.Entry<String, ConnectResult> entry : map.entrySet()) {
            ConnectServerResultInfo connectServerResultInfo = new ConnectServerResultInfo();
            connectServerResultInfo.setGroupId(entry.getKey());
            connectServerResultInfo.setNsDataId(str2);
            connectServerResultInfo.setUrl(str);
            try {
                ConnectResult value = entry.getValue();
                if (value.getFuture().get(this.connectionTimeout, TimeUnit.MILLISECONDS).booleanValue()) {
                    NewWrappedIOClient newWrappedIOClient = this.ioClientFactory.getNewWrappedIOClient(str);
                    if (null != newWrappedIOClient) {
                        addSendControllInfoTask(str, newWrappedIOClient, value.getControllInfo());
                        addSendCloseSubscriptionTask(str, newWrappedIOClient);
                    }
                    log.info(LogPrefix + str2 + "下， 创建连接成功：" + str);
                    connectServerResultInfo.setSuccess(true);
                }
            } catch (InterruptedException e) {
                log.error(str2 + "连接被中断" + str, e);
                connectServerResultInfo.setSuccess(false);
                connectServerResultInfo.setFailureReason("和server等待创建[" + str2 + "]下的连接被中断" + str + e.getMessage());
            } catch (ExecutionException e2) {
                log.error(str2 + "连接异常" + str, e2);
                connectServerResultInfo.setSuccess(false);
                connectServerResultInfo.setFailureReason("和server等待创建[" + str2 + "]下的连接异常" + str + e2.getMessage());
            } catch (TimeoutException e3) {
                log.error(str2 + "连接超时" + this.connectionTimeout + "ms,连接=" + str, e3);
                connectServerResultInfo.setSuccess(false);
                connectServerResultInfo.setFailureReason("和server创建[" + str2 + "]下的连接失败或等待创建连接超时" + str + e3.getMessage());
            }
            NotifyDiagnosisRecordManager.getInstance().recordConnectNotifyResult(connectServerResultInfo);
        }
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void createClients(String str) {
        createClients(str, this.serverUrlResponsitory.getUrls(str), false);
    }

    private List<String> checkGetClientArgument(String str, String str2) {
        Set<String> set = this.groupsOfTopics.get(str);
        if (null == set) {
            StringBuilder clientArgumentStringBuilder = getClientArgumentStringBuilder(str, str2);
            clientArgumentStringBuilder.append(" 但是该Topic不存在.");
            log.error(LogPrefix + clientArgumentStringBuilder.toString());
            return null;
        }
        if (!set.contains(str2)) {
            StringBuilder clientArgumentStringBuilder2 = getClientArgumentStringBuilder(str, str2);
            clientArgumentStringBuilder2.append(" 但是该GroupID不存在.");
            log.error(LogPrefix + clientArgumentStringBuilder2.toString());
            return null;
        }
        List<String> list = this.connectedClientUrls.get(str);
        if (list == null || list.isEmpty()) {
            list = this.serverUrlResponsitory.getUrls(str);
        }
        if (null != list && !list.isEmpty()) {
            return list;
        }
        StringBuilder clientArgumentStringBuilder3 = getClientArgumentStringBuilder(str, str2);
        clientArgumentStringBuilder3.append(" 但是没有处理该TOPIC的NotifyServer服务器.");
        log.error(LogPrefix + clientArgumentStringBuilder3.toString());
        return null;
    }

    private StringBuilder getClientArgumentStringBuilder(String str, String str2) {
        StringBuilder sb = new StringBuilder();
        sb.append("取NOTIFY SERVER连接. GroupID为[").append(str2).append("]的客户端获取TOPIC[");
        sb.append(str).append("]对应的Connection");
        return sb;
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public WrappedIOClient selectClient(String str, String str2) {
        List<String> checkGetClientArgument = checkGetClientArgument(str, str2);
        if (null == checkGetClientArgument || checkGetClientArgument.size() == 0) {
            return null;
        }
        return this.ioClientFactory.getNewWrappedIOClient(this.ioClientSelector.select(str, str2, checkGetClientArgument));
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public Queue<WrappedIOClient> allClients(String str, String str2) {
        List<String> checkGetClientArgument = checkGetClientArgument(str, str2);
        if (null == checkGetClientArgument) {
            return null;
        }
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        Iterator<String> it = checkGetClientArgument.iterator();
        while (it.hasNext()) {
            NewWrappedIOClient newWrappedIOClient = this.ioClientFactory.getNewWrappedIOClient(it.next());
            if (null != newWrappedIOClient) {
                linkedBlockingQueue.offer(newWrappedIOClient);
            }
        }
        return linkedBlockingQueue;
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void closeClients(String str, boolean z) {
        Iterator<String> it = this.serverUrlResponsitory.getUrls(str).iterator();
        while (it.hasNext()) {
            closeClients(str, it.next(), z);
        }
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public boolean closeClients(String str, String str2, boolean z) {
        if (z) {
            this.serverUrlResponsitory.removeUrl(str, str2);
        }
        log.warn(LogPrefix + "删除nsServersTopic与url的对应关系, URL是：" + str2 + "nsServersTopic是：" + str);
        if (this.serverUrlResponsitory.isValidUrl(str2)) {
            return false;
        }
        try {
            deleteSendControllInfoTask(str2);
            this.remotingClient.close(str2, false);
            if (z) {
                this.controlInfoResponsitory.remove(str2);
            }
            log.warn(LogPrefix + "关闭连接成功, URL是：" + str2);
            return true;
        } catch (Exception e) {
            log.error(LogPrefix + "关闭连接失败, URL是：" + str2, e);
            return false;
        }
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public synchronized void updateClients(String str, List<String> list) {
        for (String str2 : this.serverUrlResponsitory.getUrls(str)) {
            if (!list.contains(str2)) {
                this.serverUrlResponsitory.removeUrl(str, str2);
                if (!this.serverUrlResponsitory.isValidUrl(str2)) {
                    closeClients(str, str2, true);
                    StringBuilder sb = new StringBuilder();
                    sb.append("########## ");
                    sb.append("关闭已过期的NOTIFY SERVER地址对应的连接[");
                    sb.append(str2).append("].");
                    log.warn(LogPrefix + sb.toString());
                }
            }
        }
        this.serverUrlResponsitory.replaceUrls(str, list);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.util.Set] */
    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void addClientCfg(String str, String str2) {
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        CopyOnWriteArraySet copyOnWriteArraySet2 = (Set) this.groupsOfTopics.putIfAbsent(str, copyOnWriteArraySet);
        if (null == copyOnWriteArraySet2) {
            copyOnWriteArraySet2 = copyOnWriteArraySet;
        }
        if (copyOnWriteArraySet2.contains(str2)) {
            log.info(LogPrefix + "在添加ConfigCfg中，TOPIC[" + str + "]下面不能重复添加GroupId为[" + str2 + "]的ClientCfg.");
        } else {
            copyOnWriteArraySet2.add(str2);
        }
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void deleteClientCfg(String str, String str2) {
        StringBuilder sb = new StringBuilder();
        sb.append("在删除ConfigCfg中，删除nsServersTopic[").append(str).append("]下面的GroupID为[");
        sb.append(str2).append("]的ClientCfg.");
        Set<String> set = this.groupsOfTopics.get(str);
        if (null == set) {
            sb.append("但没有该nsServersTopic");
            log.warn(LogPrefix + sb.toString());
            return;
        }
        if (0 == set.size()) {
            this.groupsOfTopics.remove(str);
            sb.append("但该nsServersTopic对应的groupID集合为空");
            log.warn(LogPrefix + sb.toString());
            return;
        }
        if (false == set.remove(str2)) {
            sb.append(" 但是该GroupID对应的ClientCfg不存在.");
            log.warn(LogPrefix + sb.toString());
            return;
        }
        if (0 == set.size()) {
            this.groupsOfTopics.remove(str);
        }
        sb.append(" 完成删除GroupID对应的ClientCfg.");
        List<String> urls = this.serverUrlResponsitory.getUrls(str);
        if (null == urls || urls.isEmpty()) {
            sb.append(" 但是没有处理该nsServersTopic的服务器 的URL.");
            log.warn(LogPrefix + sb.toString());
            return;
        }
        for (String str3 : urls) {
            ControlInfo controlInfo = this.controlInfoResponsitory.get(str3);
            if (null != controlInfo) {
                if (controlInfo.removeClientInfoByGroupId(str2)) {
                    try {
                        addSendControllInfoTask(str3, this.ioClientFactory.getNewWrappedIOClient(str3));
                    } catch (Exception e) {
                        log.error(LogPrefix + "推送ControlInfo失败,如果是在关闭DefaultNotifyManager时抛出此异常，请忽略", e);
                    }
                }
                if (controlInfo.getClientInfoCount() == 0) {
                    closeClients(str, str3, true);
                    this.controlInfoResponsitory.remove(str3);
                }
            } else {
                closeClients(str, str3, true);
            }
        }
        log.info(LogPrefix + sb.toString());
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public List<String> getUrls(String str) {
        return this.serverUrlResponsitory.getUrls(str);
    }

    private ControlInfo addClientInfo(String str, ClientInfo clientInfo) {
        ControlInfo controlInfo = new ControlInfo();
        ControlInfo putIfAbsent = this.controlInfoResponsitory.putIfAbsent(str, controlInfo);
        if (putIfAbsent == null) {
            putIfAbsent = controlInfo;
        }
        putIfAbsent.addClientInfo(clientInfo);
        return putIfAbsent;
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void addSendControllInfoTask(String str, WrappedIOClient wrappedIOClient) {
        ControlInfo controlInfo = this.controlInfoResponsitory.get(str);
        if (controlInfo != null) {
            addSendControllInfoTask(str, wrappedIOClient, controlInfo);
        }
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void addSendSubscriptionTask(String str, String str2, RemotingService remotingService, ClientSubscriptionManager clientSubscriptionManager) {
        String str3 = str + "/" + str2;
        this.taskManager.addProcessor(str3, this.sendSubscriptionTaskProcessor);
        this.taskManager.addTask(str3, new SendSubscriptionTask(str, str2, remotingService, clientSubscriptionManager));
    }

    private void addSendControllInfoTask(String str, WrappedIOClient wrappedIOClient, ControlInfo controlInfo) {
        this.taskManager.addProcessor(str, this.sendControllInfoTaskProcessor);
        this.taskManager.addTask(str, new SendControllInfoTask(controlInfo, wrappedIOClient), true);
        if (wrappedIOClient != null) {
            log.info(LogPrefix + "连接IP:" + wrappedIOClient.getRemoteAddress() + ",url:" + str + "成功，推送ControlInfo:" + controlInfo);
        }
    }

    private void deleteSendControllInfoTask(String str) {
        this.taskManager.removeTask(str);
        this.taskManager.removeProcessor(str);
    }

    private void addSendOpenSubscriptionTask(Binding binding) {
        StringBuilder sb = new StringBuilder();
        sb.append("open:").append(binding.getTopic()).append(":").append(binding.getKey()).append(":").append(binding.getGroup());
        Queue<WrappedIOClient> allClients = allClients(DataIdTools.getNSServersSubDataId(binding.getTopic()), binding.getGroup());
        this.taskManager.addProcessor(sb.toString(), this.sendOpenSubscriptionTaskProcessor);
        this.taskManager.addTask(sb.toString(), new SendOpenSubscriptionTask(binding, allClients), true);
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void addSendCloseSubscriptionTask(String str, WrappedIOClient wrappedIOClient) {
        if (0 != this.closedSubscriptions.size()) {
            for (Binding binding : this.closedSubscriptions) {
                if (this.serverUrlResponsitory.isValidUrl(DataIdTools.getNSServersSubDataId(binding.getTopic()), str)) {
                    addSendCloseSubscriptionTask(binding, wrappedIOClient);
                }
            }
        }
    }

    private void addSendCloseSubscriptionTask(Binding binding, WrappedIOClient wrappedIOClient) {
        StringBuilder sb = new StringBuilder();
        sb.append("close:").append(binding.getTopic()).append(":").append(binding.getKey()).append(":").append(binding.getGroup());
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        linkedBlockingQueue.add(wrappedIOClient);
        this.taskManager.addProcessor(sb.toString(), this.sendCloseSubscriptionTaskProcessor);
        this.taskManager.addTask(sb.toString(), new SendCloseSubscriptionTask(binding, linkedBlockingQueue), true);
    }

    private void addSendCloseSubscriptionTask(Binding binding) {
        StringBuilder sb = new StringBuilder();
        sb.append("close:").append(binding.getTopic()).append(":").append(binding.getKey()).append(":").append(binding.getGroup());
        Queue<WrappedIOClient> allClients = allClients(DataIdTools.getNSServersSubDataId(binding.getTopic()), binding.getGroup());
        this.taskManager.addProcessor(sb.toString(), this.sendCloseSubscriptionTaskProcessor);
        this.taskManager.addTask(sb.toString(), new SendCloseSubscriptionTask(binding, allClients), true);
    }

    void clearOpBindingMap() {
        long currentTimeMillis = System.currentTimeMillis();
        for (Map.Entry<Binding, Long> entry : this.opBindingMap.entrySet()) {
            if (null != entry.getValue() && currentTimeMillis - entry.getValue().longValue() > 5000) {
                this.opBindingMap.remove(entry.getKey());
            }
        }
    }

    void checkOpBindMap(Binding binding) {
        clearOpBindingMap();
        Long l = this.opBindingMap.get(binding);
        if (l != null && System.currentTimeMillis() - l.longValue() < 2000) {
            throw new NotifyClientException("不允许短时间对同一个订阅关系反复进行打开关闭操作");
        }
        this.opBindingMap.put(binding, Long.valueOf(System.currentTimeMillis()));
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void addCloseSubscription(Binding binding) {
        if (this.closedSubscriptions.contains(binding)) {
            return;
        }
        checkOpBindMap(binding);
        this.closedSubscriptions.add(binding);
        addSendCloseSubscriptionTask(binding);
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void removeCloseSubscription(Binding binding) {
        checkOpBindMap(binding);
        if (this.closedSubscriptions.remove(binding)) {
            addSendOpenSubscriptionTask(binding);
        }
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public Set<Binding> getClosedSubscriptions() {
        return new HashSet(this.closedSubscriptions);
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public boolean isValidTopic(String str) {
        return this.groupsOfTopics.keySet().contains(str);
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public int getConnectionCount() {
        return this.connectionCount;
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void setConnectionCount(int i) {
        this.connectionCount = i;
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public IOClientSelector getIoClientSelector() {
        return this.ioClientSelector;
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void setIoClientSelector(IOClientSelector iOClientSelector) {
        this.ioClientSelector = iOClientSelector;
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public void showStatus() {
        log.info(LogPrefix + "serverUrlResponsitory size is:" + this.serverUrlResponsitory.getAllUrls().size());
        log.info(LogPrefix + "serverUrlResponsitory is:" + this.serverUrlResponsitory.toString());
        log.info(LogPrefix + "serverUrlResponsitory AllUrls are:" + this.serverUrlResponsitory.getAllUrls());
        log.info(LogPrefix + "controlInfoResponsitory size is:" + this.controlInfoResponsitory.size());
        log.info(LogPrefix + "controlInfoResponsitory is:" + this.controlInfoResponsitory.toString());
        log.info(LogPrefix + "groupOfTopics is:" + this.groupsOfTopics);
        RemotingContext remotingContext = this.remotingClient.getRemotingContext();
        if (null == remotingContext) {
            log.info(LogPrefix + "all urls: null");
        } else {
            log.info(LogPrefix + "all urls: " + remotingContext.getConnectionsByGroup(Constants.DEFAULT_GROUP));
        }
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public Map<String, Set<String>> getSnapshotOfTopics2Group() {
        return new HashMap(this.groupsOfTopics);
    }

    @Override // com.taobao.notify.remotingservice.responsitory.UrlManager
    public Map<String, List<String>> getSnapshotOfTopics2ServerUrl() {
        return new HashMap(this.serverUrlResponsitory.getSnapshotOfTopics2ServerUrl());
    }
}
