package com.taobao.notify.client.impl;

import com.taobao.config.client.PublisherRegistrar;
import com.taobao.config.client.PublisherRegistration;
import com.taobao.eagleeye.EagleEye;
import com.taobao.notify.client.INotifyClientEx;
import com.taobao.notify.client.IOClientSelector;
import com.taobao.notify.client.NotifyClient;
import com.taobao.notify.client.exception.NotifyClientAsynSendMessageException;
import com.taobao.notify.client.exception.NotifyClientCreateConnectionException;
import com.taobao.notify.client.exception.NotifyClientIllegalArgumentException;
import com.taobao.notify.client.manager.ClientSubscriptionManager;
import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.client.manager.PublishTopicsManager;
import com.taobao.notify.common.config.MessageProperties;
import com.taobao.notify.common.config.threadpool.ThreadPoolConfig;
import com.taobao.notify.config.NotifyClientConfig;
import com.taobao.notify.config.SubscriptMsgDetailInfo;
import com.taobao.notify.message.BytesMessage;
import com.taobao.notify.message.Message;
import com.taobao.notify.message.MessageAccessor;
import com.taobao.notify.message.PackagedMessage;
import com.taobao.notify.message.StringMessage;
import com.taobao.notify.remoting.core.command.request.MessageCommitRollBackCommand;
import com.taobao.notify.remotingclient.AsynSendResultListener;
import com.taobao.notify.remotingclient.CheckMessageListener;
import com.taobao.notify.remotingclient.DefaultNotifyManager;
import com.taobao.notify.remotingclient.InnerSendResult;
import com.taobao.notify.remotingclient.MessageListener;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.remotingclient.SendMessageCallback;
import com.taobao.notify.remotingclient.SendResult;
import com.taobao.notify.remotingclient.SendResultType;
import com.taobao.notify.remotingclient.addresses.MultiModeNSAddrDispatcherRegCenter;
import com.taobao.notify.remotingclient.addresses.MultiModeNSAddrListener;
import com.taobao.notify.remotingclient.addresses.impl.DefaultMultiModeNSAddrDispatcherRegCenter;
import com.taobao.notify.remotingclient.addresses.impl.NSAddressLoadMode;
import com.taobao.notify.remotingclient.impl.AsynSendMessageTask;
import com.taobao.notify.remotingclient.logging.LoggingService;
import com.taobao.notify.remotingservice.DefaultRemotingService;
import com.taobao.notify.remotingservice.IntegratedMockRemotingService;
import com.taobao.notify.remotingservice.RemotingService;
import com.taobao.notify.remotingservice.WrappedIOClient;
import com.taobao.notify.subscription.Binding;
import com.taobao.notify.tools.ClientUtils;
import com.taobao.notify.tools.DataIdTools;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.NotifyStatLog;
import com.taobao.notify.utils.UniqId;
import com.taobao.notify.utils.threadpool.ManagedThreadPoolExecutor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/taobao/notify/client/impl/DefaultNotifyClient.class */
public class DefaultNotifyClient implements NotifyClient, INotifyClientEx {
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(DefaultNotifyClient.class);
    static final Logger logger = Logger.getLogger(DefaultNotifyClient.class);
    private volatile NotifyClientConfig notifyClientConfig;
    private static final int MAX_TIMES = 3;
    protected final NotifyGroupManager notifyGroupManager;
    private final RemotingService remotingService;
    protected final ClientSubscriptionManager clientSubscriptionManager;
    protected final PublishTopicsManager publishTopicsManager;
    private final ThreadPoolExecutor asynSendMessageWorkTP;
    private final LoggingService loggingService;
    private final ReliableAsynSendManager reliableAsynSendManager;
    private final MessageProperties reliableMessageProperties;
    private IOClientSelector ioClientSelector;
    private final MultiModeNSAddrDispatcherRegCenter<String, List<String>> addrRegCenter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.taobao.notify.client.impl.DefaultNotifyClient$1, reason: invalid class name */
    /* loaded from: input_file:com/taobao/notify/client/impl/DefaultNotifyClient$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status = new int[MessageCommitRollBackCommand.Status.values().length];

        static {
            try {
                $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[MessageCommitRollBackCommand.Status.ROLLBACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[MessageCommitRollBackCommand.Status.COMMITTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[MessageCommitRollBackCommand.Status.NOACTION.ordinal()] = DefaultNotifyClient.MAX_TIMES;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public DefaultNotifyClient() {
        this(new NotifyClientConfig());
    }

    public DefaultNotifyClient(NotifyClientConfig notifyClientConfig) {
        this.notifyClientConfig = null;
        this.notifyGroupManager = new NotifyGroupManager();
        this.clientSubscriptionManager = new ClientSubscriptionManager();
        this.publishTopicsManager = new PublishTopicsManager();
        this.loggingService = LoggingService.getInstance();
        this.reliableMessageProperties = new MessageProperties();
        if (null == this.notifyClientConfig) {
            this.notifyClientConfig = notifyClientConfig;
        }
        this.addrRegCenter = new DefaultMultiModeNSAddrDispatcherRegCenter();
        if (notifyClientConfig.isDebug()) {
            this.remotingService = new IntegratedMockRemotingService(this.clientSubscriptionManager, notifyClientConfig.getDebugRemoteSubscribers(), this.notifyGroupManager, notifyClientConfig.getDebugLocalPort());
            logger.warn(LogPrefix + "注意：现在使用的是集成测试Debug模式的NotifyClient");
            logger.warn(LogPrefix + "目前集成测试Debug模式的本地端口为：" + notifyClientConfig.getDebugLocalPort());
            logger.warn(LogPrefix + "目前集成测试Debug模式使用的直接连接有：" + notifyClientConfig.getDebugRemoteSubscribers());
        } else {
            this.remotingService = new DefaultRemotingService(this.notifyClientConfig.getRemotingType(), this.notifyGroupManager, notifyClientConfig, this.addrRegCenter);
        }
        this.reliableMessageProperties.setCompressSize(Integer.MAX_VALUE);
        this.reliableMessageProperties.setMaxStringMessageSize(Integer.MAX_VALUE);
        this.asynSendMessageWorkTP = new ManagedThreadPoolExecutor(notifyClientConfig.getAsynSendMessageTPConfig().getCorePoolSize(), notifyClientConfig.getAsynSendMessageTPConfig().getMaxPoolSize(), notifyClientConfig.getAsynSendMessageTPConfig().getKeepAliveTime(), notifyClientConfig.getAsynSendMessageTPConfig().getMaxQueueSize(), "asynSendMsgTP-" + hashCode(), new ThreadPoolExecutor.AbortPolicy());
        this.reliableAsynSendManager = new ReliableAsynSendManager(this, this.notifyGroupManager, notifyClientConfig);
        this.ioClientSelector = this.remotingService.getIoClientSelector();
    }

    public RemotingService getRemotingService() {
        return this.remotingService;
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public NotifyClientConfig getNotifyClientConfig() {
        return this.notifyClientConfig;
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public synchronized void addGroup(String str, String str2, String str3, MessageListener messageListener, CheckMessageListener checkMessageListener, MessageProperties messageProperties, long j) {
        addGroup(str, str2, str3, messageListener, checkMessageListener, messageProperties, j, null);
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public synchronized void addGroup(String str, String str2, String str3, MessageListener messageListener, CheckMessageListener checkMessageListener, MessageProperties messageProperties, long j, NotifyClientConfig notifyClientConfig) {
        if (this.notifyGroupManager.containsGroup(str)) {
            logger.warn(LogPrefix + "已添加该组，不允许重复添加, GroupID为：" + str);
            return;
        }
        NotifyGroup build = new NotifyGroup.Builder(str, str2, str3).setMsgListener(messageListener).setCheckMsgListener(checkMessageListener).setMessageProperties(messageProperties).setWaitForConnTime(j).build();
        if (null != notifyClientConfig) {
            if (notifyClientConfig.getCheckMessageTPConfig().isPropertyChanged()) {
                build.setCheckMessageWorkTP(buildThreadPoolExecutor(notifyClientConfig.getCheckMessageTPConfig(), str, "-checkMsgWorkTP-"));
            }
            if (notifyClientConfig.getMessageTPConfig().isPropertyChanged()) {
                build.setDeliverMessageWorkTP(buildThreadPoolExecutor(notifyClientConfig.getMessageTPConfig(), str, "-msgWorkTP-"));
            }
            build.setCacheable(notifyClientConfig.isCacheable());
        }
        this.notifyGroupManager.addGroup(build);
    }

    private ThreadPoolExecutor buildThreadPoolExecutor(ThreadPoolConfig threadPoolConfig, String str, String str2) {
        return new ManagedThreadPoolExecutor(threadPoolConfig.getCorePoolSize(), threadPoolConfig.getMaxPoolSize(), threadPoolConfig.getKeepAliveTime(), threadPoolConfig.getMaxQueueSize(), str + str2 + hashCode(), new ThreadPoolExecutor.AbortPolicy());
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void setMessageListener(String str, MessageListener messageListener) {
        if (!this.notifyGroupManager.containsGroup(str)) {
            throw new NotifyClientIllegalArgumentException("无效的GroupID" + str);
        }
        this.notifyGroupManager.getGroup(str).setMsgListener(messageListener);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void setCheckMessageListener(String str, CheckMessageListener checkMessageListener) {
        if (!this.notifyGroupManager.containsGroup(str)) {
            throw new NotifyClientIllegalArgumentException("无效的GroupID" + str);
        }
        this.notifyGroupManager.getGroup(str).setCheckMsgListener(checkMessageListener);
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public boolean isValidGroup(String str) {
        if (null == str) {
            return false;
        }
        return this.notifyGroupManager.containsGroup(str);
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public synchronized void removeGroup(String str) {
        if (!this.notifyGroupManager.containsGroup(str)) {
            logger.warn(LogPrefix + "没有添加过该组，不能删除, GroupID为：" + str);
            return;
        }
        Set<String> removeTopics = this.publishTopicsManager.removeTopics(str);
        Map<String, Set<Binding>> removeBindingByGroup = this.clientSubscriptionManager.removeBindingByGroup(str);
        if (null != removeBindingByGroup) {
            innerCloseSubscribeConn(str, removeBindingByGroup.keySet());
        }
        innerClosePublishConn(str, removeTopics);
        this.notifyGroupManager.removeGroup(str);
        if (this.notifyGroupManager.groupSize() == 0) {
            this.reliableAsynSendManager.close();
        }
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public synchronized void close() {
        this.reliableAsynSendManager.close();
        this.asynSendMessageWorkTP.shutdown();
        Iterator it = new HashSet(this.notifyGroupManager.getGroupIds()).iterator();
        while (it.hasNext()) {
            removeGroup((String) it.next());
        }
        this.remotingService.closeAllIOClients();
        this.notifyGroupManager.clear();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public synchronized void addPublishTopic(String str, String str2) {
        if (this.publishTopicsManager.isValidTopic(str, str2)) {
            return;
        }
        this.publishTopicsManager.addTopic(str, str2);
        innerCreatePublishConn(str, str2);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public synchronized void resetPublishTopics(String str, Collection<String> collection) {
        innerClosePublishConn(str, this.publishTopicsManager.resetTopics(str, collection));
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            innerCreatePublishConn(str, it.next());
        }
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public Future<SendResult> asynSendMessage(Message message, SendMessageCallback sendMessageCallback, AsynSendResultListener asynSendResultListener) {
        try {
            return this.asynSendMessageWorkTP.submit(new AsynSendMessageTask(this, asynSendResultListener, message, sendMessageCallback));
        } catch (RejectedExecutionException e) {
            throw new NotifyClientAsynSendMessageException(e, "RejectedExecutionException, 线程队列满了");
        }
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public Future<SendResult> asynSendMessage(Message message, AsynSendResultListener asynSendResultListener) {
        return asynSendMessage(message, null, asynSendResultListener);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public SendResult reliableAsynSendMessage(Message message, SendMessageCallback sendMessageCallback) {
        recordEagleEyeLog(message, "raSend~", true);
        if (null == message.getMessageId()) {
            message.setMessageId(UniqId.getInstance().getUniqIDHash());
        }
        InnerSendResult checkMessage = checkMessage(message);
        if (null != checkMessage) {
            EagleEye.rpcClientRecv("01", MAX_TIMES);
            return getSendResult(checkMessage);
        }
        boolean z = false;
        if (null == sendMessageCallback) {
            z = true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        Message sendType = message.toSendType(this.reliableMessageProperties);
        SendResult addMessage = this.reliableAsynSendManager.addMessage(sendType, z);
        NotifyStatLog.addStatValue2("notifyClient", addMessage.isSuccess() ? "SendMessageCount_RA_S" : "SendMessageCount_RA_F", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), System.currentTimeMillis() - currentTimeMillis);
        if (!addMessage.isSuccess()) {
            EagleEye.rpcClientRecv("01", MAX_TIMES);
            return addMessage;
        }
        EagleEye.rpcClientRecv("00", MAX_TIMES);
        if (null != sendMessageCallback) {
            MessageStatus messageStatus = new MessageStatus();
            try {
                addMessage.setModel(sendMessageCallback.doInTransaction(messageStatus));
            } catch (Throwable th) {
                logger.error(LogPrefix + "RuntimeException throw in doInTranscation", th);
                addMessage.setErrorMessage("RuntimeException throw in doInTranscation. Detail:" + th.getMessage());
                messageStatus.setRollbackOnly();
            }
            if (messageStatus.getStatus() != null) {
                switch (AnonymousClass1.$SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[messageStatus.getStatus().ordinal()]) {
                    case 1:
                        addMessage.setErrorMessage("Notify Client 主动回滚消息，原因为：" + messageStatus.getReason());
                        addMessage.setSuccess(false);
                        addMessage.setSendResultType(SendResultType.ROLLBACK);
                        addMessage = this.reliableAsynSendManager.rollbackMessage(sendType, addMessage);
                        break;
                    case 2:
                        addMessage = this.reliableAsynSendManager.commitMessage(sendType, addMessage);
                        break;
                    case MAX_TIMES /* 3 */:
                        addMessage.setErrorMessage("Notify Client 主动设置NoAction，原因为：" + messageStatus.getReason());
                        addMessage.setSuccess(false);
                        addMessage.setSendResultType(SendResultType.ERROR);
                        break;
                }
            } else if (messageStatus.isRollbackOnly()) {
                addMessage.setErrorMessage("Notify Client 主动回滚消息，原因为：" + messageStatus.getReason());
                addMessage.setSuccess(false);
                addMessage.setSendResultType(SendResultType.ROLLBACK);
                addMessage = this.reliableAsynSendManager.rollbackMessage(sendType, addMessage);
            } else {
                addMessage = this.reliableAsynSendManager.commitMessage(sendType, addMessage);
            }
        }
        return addMessage;
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public SendResult reliableAsynSendMessage(Message message) {
        return reliableAsynSendMessage(message, null);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public SendResult sendMessage(Message message) {
        return getSendResult(sendMessage(message, true));
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public SendResult sendMessage(Message message, SendMessageCallback sendMessageCallback) {
        InnerSendResult sendMessage = sendMessage(message, false);
        if (!sendMessage.isSuccess()) {
            return getSendResult(sendMessage);
        }
        MessageStatus messageStatus = new MessageStatus();
        RuntimeException runtimeException = null;
        if (null != sendMessageCallback) {
            try {
                sendMessage.setModel(sendMessageCallback.doInTransaction(messageStatus));
            } catch (Throwable th) {
                runtimeException = new RuntimeException(th);
                logger.error(LogPrefix + "runtimeException throw in doInTranscation", th);
                sendMessage.setErrorMessage("RuntimeException throw in doInTranscation. Detail:" + th.getMessage());
                messageStatus.setRollbackOnly();
            }
            if (messageStatus.getStatus() != null) {
                switch (AnonymousClass1.$SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[messageStatus.getStatus().ordinal()]) {
                    case 1:
                        sendMessage.setErrorMessage("Notify Client 主动回滚消息，原因为：" + messageStatus.getReason());
                        sendMessage.setSuccess(false);
                        sendMessage.setSendResultType(SendResultType.ROLLBACK);
                        break;
                    case MAX_TIMES /* 3 */:
                        sendMessage.setErrorMessage("Notify Client 主动设置noAction状态,等待check，topic=" + message.getTopic() + "  messageType=" + message.getMessageType() + " groupId=" + message.getGroupId() + "  原因为：" + messageStatus.getReason());
                        sendMessage.setSuccess(false);
                        sendMessage.setSendResultType(SendResultType.ERROR);
                        break;
                }
            } else if (messageStatus.isRollbackOnly()) {
                sendMessage.setErrorMessage("Notify Client 主动回滚消息，原因为：" + messageStatus.getReason());
                sendMessage.setSuccess(false);
                sendMessage.setSendResultType(SendResultType.ROLLBACK);
            }
            if (message instanceof PackagedMessage) {
                List<Message> messageList = ((PackagedMessage) message).getMessageList();
                HashMap hashMap = new HashMap();
                for (Message message2 : messageList) {
                    hashMap.put(UniqId.getInstance().bytes2string(message2.getMessageId()), message2);
                }
                int sendResultMessageInfoSize = sendMessage.getSendResultMessageInfoSize();
                long postDelayTime = message.getPostDelayTime();
                for (int i = 0; i < sendResultMessageInfoSize; i++) {
                    Message message3 = (Message) hashMap.get(sendMessage.getMessageId(i));
                    if (null == message3) {
                        logger.error(LogPrefix + "send half message：回馈消息ID不在发送的消息中：回馈的消息ID为：" + sendMessage.getMessageId(i));
                    } else {
                        sendMessage.getRemotingClient().sendOneWay(message3.getMessageId(), sendMessage.getServerData(i), !messageStatus.isRollbackOnly(), message3.getPostDelayTime() > postDelayTime ? message3.getPostDelayTime() : postDelayTime, messageStatus.getStatus());
                    }
                }
            } else {
                if (sendMessage.getSendResultMessageInfoSize() != 1) {
                    logger.error(LogPrefix + "send half message：回馈消息数量不对，期望是1，实际是" + sendMessage.getSendResultMessageInfoSize());
                }
                sendMessage.getRemotingClient().sendOneWay(message.getMessageId(), sendMessage.getServerData(0), !messageStatus.isRollbackOnly(), message.getPostDelayTime(), messageStatus.getStatus());
            }
        }
        SendResult sendResult = getSendResult(sendMessage);
        sendResult.setRuntimeException(runtimeException);
        return sendResult;
    }

    private long getMessageSize(Message message) {
        if ((message instanceof BytesMessage) && ((BytesMessage) message).getBody() != null) {
            return ((BytesMessage) message).getBody().length;
        }
        if (!(message instanceof StringMessage) || ((StringMessage) message).getBody() == null) {
            return 0L;
        }
        return ((StringMessage) message).getBody().getBytes().length;
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public InnerSendResult sendMessage(Message message, boolean z) {
        String topic = message.getTopic();
        String groupId = message.getGroupId();
        if (null == message.getMessageId()) {
            message.setMessageId(UniqId.getInstance().getUniqIDHash());
        }
        message.setCommitted(z);
        InnerSendResult checkMessage = checkMessage(message);
        if (null != checkMessage) {
            if (!isRaMessage(message)) {
                recordEagleEyeLog(message, "send~", false);
                EagleEye.rpcClientRecv("01", MAX_TIMES);
            }
            return checkMessage;
        }
        if (!this.publishTopicsManager.isValidTopic(groupId, topic)) {
            addPublishTopic(groupId, topic);
        }
        String nSServersPubDataId = DataIdTools.getNSServersPubDataId(topic);
        if (message instanceof StringMessage) {
            if (((StringMessage) message).getBody() == null) {
                ((StringMessage) message).setBody("");
            }
        } else if ((message instanceof BytesMessage) && ((BytesMessage) message).getBody() == null) {
            ((BytesMessage) message).setBody(new byte[0]);
        }
        Message sendType = message.toSendType(this.notifyGroupManager.getMessageProperties(groupId));
        if (!isRaMessage(sendType)) {
            recordEagleEyeLog(sendType, "send~", false);
        }
        return innerSendMessage(sendType, groupId, nSServersPubDataId);
    }

    private boolean isRaMessage(Message message) {
        if (message instanceof PackagedMessage) {
            List messageList = ((PackagedMessage) message).getMessageList();
            if (messageList.isEmpty()) {
                return false;
            }
            message = (Message) messageList.iterator().next();
        }
        try {
            return message.getBooleanProperty("eagleRaSend");
        } catch (Exception e) {
            return false;
        }
    }

    private void recordEagleEyeLog(Message message, String str, boolean z) {
        StringBuilder sb = new StringBuilder();
        sb.append(str).append(MessageAccessor.getOriginType(message).toString()).append(":").append(message.getTopic()).append(":").append(message.getMessageType() == null ? "" : message.getMessageType()).append(":").append(message.getGroupId());
        EagleEye.startRpc("Notify", sb.toString());
        EagleEye.rpcClientSend();
        String traceId = EagleEye.getTraceId();
        String rpcId = EagleEye.getRpcId();
        String exportUserData = EagleEye.exportUserData();
        long j = 0;
        if (StringUtils.isNotBlank(traceId) && StringUtils.isNotBlank(rpcId)) {
            if (message instanceof PackagedMessage) {
                int i = 1;
                for (Message message2 : ((PackagedMessage) message).getMessageList()) {
                    message2.setStringProperty("eagleTraceId", traceId);
                    message2.setStringProperty("eagleRpcId", rpcId + "." + i);
                    message2.setBooleanProperty("eagleRaSend", z);
                    if (StringUtils.isNotBlank(exportUserData)) {
                        message2.setStringProperty("eagleData", exportUserData);
                    }
                    i++;
                    j += getMessageSize(message2);
                }
            } else {
                message.setStringProperty("eagleTraceId", traceId);
                message.setStringProperty("eagleRpcId", rpcId + ".1");
                message.setBooleanProperty("eagleRaSend", z);
                if (StringUtils.isNotBlank(exportUserData)) {
                    message.setStringProperty("eagleData", exportUserData);
                }
                j = 0 + getMessageSize(message);
            }
        }
        EagleEye.requestSize(j);
    }

    protected InnerSendResult checkMessage(Message message) {
        String topic = message.getTopic();
        String groupId = message.getGroupId();
        String messageType = message.getMessageType();
        InnerSendResult innerSendResult = new InnerSendResult();
        if (null == topic) {
            logger.error(LogPrefix + "没有设置Topic" + message.toExtStringWithoutBody());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("没有设置Topic");
            innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
            return innerSendResult;
        }
        if (null == messageType) {
            logger.error(LogPrefix + "没有设置MessageType" + message.toExtStringWithoutBody());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("没有设置MessageType");
            innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
            return innerSendResult;
        }
        if (null == groupId) {
            logger.error(LogPrefix + "没有设置GroupID" + message.toExtStringWithoutBody());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("没有设置GroupID");
            innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
            return innerSendResult;
        }
        if (message instanceof PackagedMessage) {
            List messageList = ((PackagedMessage) message).getMessageList();
            if (((PackagedMessage) message).getMessageListSize() == 0) {
                logger.error(LogPrefix + "不能传输不包含Message的PackagedMessage" + message.toExtStringWithoutBody());
                innerSendResult.clearSendResultMessageInfo();
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setSuccess(false);
                innerSendResult.setErrorMessage("不能传输不包含Message的PackagedMessage");
                innerSendResult.setSendResultType(SendResultType.ERROR);
                return innerSendResult;
            }
            String topic2 = ((Message) messageList.iterator().next()).getTopic();
            Iterator it = messageList.iterator();
            while (it.hasNext()) {
                if (!((Message) it.next()).getTopic().equals(topic2)) {
                    logger.error(LogPrefix + "不能传输包含不同的topic的PackagedMessage" + message.toExtStringWithoutBody());
                    innerSendResult.clearSendResultMessageInfo();
                    innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                    innerSendResult.setSuccess(false);
                    innerSendResult.setErrorMessage("不能传输包含不同的topic的PackagedMessage");
                    innerSendResult.setSendResultType(SendResultType.ERROR);
                    return innerSendResult;
                }
            }
        }
        if (this.notifyGroupManager.containsGroup(groupId)) {
            return null;
        }
        logger.error(LogPrefix + "设置了无效的GroupID" + message.toExtStringWithoutBody());
        innerSendResult.clearSendResultMessageInfo();
        innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
        innerSendResult.setSuccess(false);
        innerSendResult.setErrorMessage("设置了无效的GroupID");
        innerSendResult.setSendResultType(SendResultType.FORMAT_ERROR);
        return innerSendResult;
    }

    /* JADX WARN: Finally extract failed */
    protected InnerSendResult innerSendMessage(Message message, String str, String str2) {
        String logMessage = this.loggingService.logMessage(message);
        InnerSendResult innerSendResult = new InnerSendResult();
        if (message instanceof BytesMessage) {
            BytesMessage bytesMessage = (BytesMessage) message;
            if (bytesMessage.getBody() != null && bytesMessage.getBody().length > 1232896) {
                innerSendResult.setErrorMessage("消息大小超过限定字节大小 1232896");
                innerSendResult.setSuccess(false);
                innerSendResult.clearSendResultMessageInfo();
                innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
                innerSendResult.setSendResultType(SendResultType.ERROR);
                return innerSendResult;
            }
        }
        int i = 0;
        boolean z = true;
        int clientPostTimeout = message.getClientPostTimeout();
        long j = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= MAX_TIMES) {
                if (innerSendResult.isSuccess() && j > 200) {
                    NotifyStatLog.addStatValue2("notifyClient", "SendMessageCount_WS", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), j);
                }
                EagleEye.rpcClientRecv("01", MAX_TIMES);
                this.loggingService.logResult(logMessage, message, "Send Message Error Result: " + innerSendResult);
                return innerSendResult;
            }
            if (!z && j > clientPostTimeout) {
                EagleEye.rpcClientRecv("01", MAX_TIMES);
                return createTimeoutResult(message, logMessage, innerSendResult, j);
            }
            z = false;
            WrappedIOClient innerGetClient = innerGetClient(message, innerSendResult, str2);
            if (innerGetClient == null) {
                EagleEye.rpcClientRecv("01", MAX_TIMES);
                return createNoClientResult(message, logMessage, innerSendResult, System.currentTimeMillis());
            }
            long currentTimeMillis = System.currentTimeMillis();
            try {
                try {
                    innerSendResult = innerGetClient.sendWithSync(message, clientPostTimeout);
                    innerSendResult.setAcceptMessageHost(innerGetClient.getRemoteAddress());
                    EagleEye.remoteIp(innerGetClient.getRemoteAddress());
                    this.loggingService.logResult(logMessage, message, "Send Message to " + innerGetClient.getRemoteAddress() + " Result: " + innerSendResult);
                } catch (Throwable th) {
                    innerSendResult = createExceptionResult(message, System.currentTimeMillis(), th);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    j += currentTimeMillis2;
                    NotifyStatLog.addStatValue2("notifyClient", innerSendResult.isSuccess() ? "SendMessageCount_S" : "SendMessageCount_F", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), j);
                    if (this.ioClientSelector instanceof ResponseTimeWeightIOClientSelector) {
                        ((ResponseTimeWeightIOClientSelector) this.ioClientSelector).updateRT(innerGetClient.getURL(), clientPostTimeout, currentTimeMillis2);
                    }
                }
                if (innerSendResult.isSuccess()) {
                    EagleEye.rpcClientRecv("00", MAX_TIMES);
                    long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                    NotifyStatLog.addStatValue2("notifyClient", innerSendResult.isSuccess() ? "SendMessageCount_S" : "SendMessageCount_F", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), j + currentTimeMillis3);
                    if (this.ioClientSelector instanceof ResponseTimeWeightIOClientSelector) {
                        ((ResponseTimeWeightIOClientSelector) this.ioClientSelector).updateRT(innerGetClient.getURL(), clientPostTimeout, currentTimeMillis3);
                    }
                    return innerSendResult;
                }
                long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis;
                j += currentTimeMillis4;
                NotifyStatLog.addStatValue2("notifyClient", innerSendResult.isSuccess() ? "SendMessageCount_S" : "SendMessageCount_F", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), j);
                if (this.ioClientSelector instanceof ResponseTimeWeightIOClientSelector) {
                    ((ResponseTimeWeightIOClientSelector) this.ioClientSelector).updateRT(innerGetClient.getURL(), clientPostTimeout, currentTimeMillis4);
                }
            } catch (Throwable th2) {
                long currentTimeMillis5 = System.currentTimeMillis() - currentTimeMillis;
                NotifyStatLog.addStatValue2("notifyClient", innerSendResult.isSuccess() ? "SendMessageCount_S" : "SendMessageCount_F", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), j + currentTimeMillis5);
                if (this.ioClientSelector instanceof ResponseTimeWeightIOClientSelector) {
                    ((ResponseTimeWeightIOClientSelector) this.ioClientSelector).updateRT(innerGetClient.getURL(), clientPostTimeout, currentTimeMillis5);
                }
                throw th2;
            }
        }
    }

    private InnerSendResult createTimeoutResult(Message message, String str, InnerSendResult innerSendResult, long j) {
        innerSendResult.setErrorMessage("发消息时通信异常：发送超时, " + message.toString());
        innerSendResult.setSuccess(false);
        innerSendResult.clearSendResultMessageInfo();
        innerSendResult.setSendResultType(SendResultType.TIMEOUT);
        innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
        this.loggingService.logResult(str, message, "Send Message Timeout Result: " + innerSendResult);
        NotifyStatLog.addStatValue2("notifyClient", "SendMessageCount_Timeout", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), j);
        return innerSendResult;
    }

    private InnerSendResult createExceptionResult(Message message, long j, Throwable th) {
        InnerSendResult innerSendResult = new InnerSendResult();
        innerSendResult.setSuccess(false);
        innerSendResult.clearSendResultMessageInfo();
        innerSendResult.setSendResultType(SendResultType.EXCEPTION);
        innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
        innerSendResult.setErrorMessage("发消息时通信异常：" + th.getMessage());
        logger.error(LogPrefix + innerSendResult.getErrorMessage(), th);
        NotifyStatLog.addStatValue2("notifyClient", "SendMessageCount_RE", message.getGroupId(), message.getTopic() + "=>" + message.getMessageType(), System.currentTimeMillis() - j);
        return innerSendResult;
    }

    private InnerSendResult createNoClientResult(Message message, String str, InnerSendResult innerSendResult, long j) {
        innerSendResult.setErrorMessage("发消息时无对应连接,Topic[" + message.getTopic() + "]MessageType[" + message.getMessageType() + "]GroupId[" + message.getGroupId() + "]");
        innerSendResult.setSuccess(false);
        innerSendResult.clearSendResultMessageInfo();
        innerSendResult.setSendResultType(SendResultType.NO_CONNECTION);
        innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
        long currentTimeMillis = System.currentTimeMillis() - j;
        this.loggingService.logResult(str, message, new StringBuilder().append("Send Message client==null Result: ").append(innerSendResult).toString());
        NotifyStatLog.addStatValue2("notifyClient", "SendMessageCount_NC", message.getGroupId(), new StringBuilder().append(message.getTopic()).append("=>").append(message.getMessageType()).toString(), currentTimeMillis);
        return innerSendResult;
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void subscribeMessage(String str, String str2, String str3, boolean z, int i) {
        subscribeMessage(str, str2, str3, z, i, 5);
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void subscribe(Binding binding) {
        this.clientSubscriptionManager.addBinding(binding);
        innerSubscribeMessage(binding.getGroup(), this.clientSubscriptionManager.getBindingStringByGroup(binding.getGroup()));
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void subscribe(List<Binding> list) {
        Set<String> addBindings = this.clientSubscriptionManager.addBindings(list);
        for (String str : getGroupSetFromBindingList(list)) {
            innerCloseSubscribeConn(str, addBindings);
            innerSubscribeMessage(str, this.clientSubscriptionManager.getBindingStringByGroup(str));
        }
    }

    private Set<String> getGroupSetFromBindingList(List<Binding> list) {
        HashSet hashSet = new HashSet();
        Iterator<Binding> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getGroup());
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Binding> innerSubsrcibeMessages(String str, Map<String, Map<String, SubscriptMsgDetailInfo>> map) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, Map<String, SubscriptMsgDetailInfo>> entry : map.entrySet()) {
            String key = entry.getKey();
            Map<String, SubscriptMsgDetailInfo> value = entry.getValue();
            if (value != null) {
                for (Map.Entry<String, SubscriptMsgDetailInfo> entry2 : value.entrySet()) {
                    String key2 = entry2.getKey();
                    SubscriptMsgDetailInfo value2 = entry2.getValue();
                    arrayList.add(Binding.direct(key, key2, str, value2.getWaterMark(), value2.isPersistence()));
                }
            }
        }
        return arrayList;
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void subscribeMessages(String str, Map<String, Map<String, SubscriptMsgDetailInfo>> map) {
        innerCloseSubscribeConn(str, this.clientSubscriptionManager.addBindings(str, innerSubsrcibeMessages(str, map)));
        innerSubscribeMessage(str, this.clientSubscriptionManager.getBindingStringByGroup(str));
    }

    private WrappedIOClient innerGetClient(Message message, InnerSendResult innerSendResult, String str) {
        WrappedIOClient wrappedIOClient = null;
        try {
            wrappedIOClient = this.remotingService.getIOClient(str, message.getGroupId());
        } catch (RuntimeException e) {
            innerSendResult.setSuccess(false);
            innerSendResult.setErrorMessage("发消息时取连接失败：" + e.getMessage() + ".MessageTopic:" + message.getTopic() + ".MessageType:" + message.getMessageType());
            innerSendResult.clearSendResultMessageInfo();
            innerSendResult.addSendResultMessageInfo(UniqId.getInstance().bytes2string(message.getMessageId()), null);
            logger.error(new StringBuilder().append(LogPrefix).append(innerSendResult.getErrorMessage()).toString());
        }
        return wrappedIOClient;
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void setInitServiceHostsForPub(String str, List<String> list) {
        this.addrRegCenter.setInitServiceHostsForPub(str, list);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public MultiModeNSAddrListener<String, List<String>> getAddressListenerForPub(String str) {
        return this.addrRegCenter.getAddressListenerForPub(str);
    }

    private void innerCreatePublishConn(String str, String str2) {
        String nSServersPubDataId = DataIdTools.getNSServersPubDataId(str2);
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (null == group) {
            throw new NotifyClientIllegalArgumentException("无效的groupId: " + str);
        }
        try {
            this.remotingService.createIOClients(nSServersPubDataId, group);
        } catch (IllegalStateException e) {
            throw new NotifyClientCreateConnectionException(e, "创建发布消息的连接失败, Topic是[" + nSServersPubDataId + "], GroupID是[" + str + "]");
        } catch (Throwable th) {
            logger.error(LogPrefix + "创建发布消息的连接失败, Topic是[" + nSServersPubDataId + "], GroupID是[" + str + "]");
        }
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public MultiModeNSAddrListener<String, List<String>> getAddressListenerForSub(String str) {
        return this.addrRegCenter.getAddressListenerForSub(str);
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void setInitServiceHostsForSub(String str, List<String> list) {
        this.addrRegCenter.setInitServiceHostsForSub(str, list);
    }

    private void innerCreateSubscribeConn(String str, String str2) {
        String nSServersSubDataId = DataIdTools.getNSServersSubDataId(str2);
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (null == group) {
            throw new NotifyClientIllegalArgumentException("无效的groupId: " + str);
        }
        try {
            this.remotingService.createIOClients(nSServersSubDataId, group);
        } catch (IllegalStateException e) {
            throw new NotifyClientCreateConnectionException(e, "创建订阅消息的连接失败, Topic是[" + nSServersSubDataId + "], GroupID是[" + str + "]");
        } catch (Throwable th) {
            logger.error(LogPrefix + "创建订阅消息的连接失败");
        }
    }

    private void innerClosePublishConn(String str, Collection<String> collection) {
        if (null == collection) {
            return;
        }
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            this.remotingService.closeIOClients(DataIdTools.getNSServersPubDataId(it.next()), str, false);
        }
    }

    private void innerCloseSubscribeConn(String str, Collection<String> collection) {
        if (null == collection) {
            return;
        }
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            this.remotingService.closeIOClients(DataIdTools.getNSServersSubDataId(it.next()), str, true);
        }
    }

    protected SendResult getSendResult(InnerSendResult innerSendResult) {
        SendResult sendResult = new SendResult();
        sendResult.setSuccess(innerSendResult.isSuccess());
        sendResult.setMessageId(innerSendResult.getMessageId(0));
        sendResult.setErrorMessage(innerSendResult.getErrorMessage());
        sendResult.setModel(innerSendResult.getModel());
        sendResult.setSendResultType(innerSendResult.getSendResultType());
        sendResult.setAcceptMessageHost(innerSendResult.getAcceptMessageHost());
        return sendResult;
    }

    private void innerSubscribeMessage(String str, Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            innerCreateSubscribeConn(str, key);
            if (this.remotingService.isMock()) {
                innerSubscribeMessageToConfigServer(str, key, value);
            } else {
                this.remotingService.sendSubscription(key, str, this.clientSubscriptionManager);
            }
        }
        logger.info(LogPrefix + "订阅消息：" + subInfoToString(str, map));
    }

    private void innerSubscribeMessageToConfigServer(String str, String str2, String str3) {
        PublisherRegistration publisherRegistration = new PublisherRegistration(DefaultNotifyManager.class.getName(), DataIdTools.getNotifySubscriptionBindingInfoDataId(str2), ClientUtils.getAppName() + " " + str);
        publisherRegistration.setPersistency(false);
        PublisherRegistrar.register(publisherRegistration).publish(str3);
    }

    private String subInfoToString(String str, Map<String, String> map) {
        StringBuilder sb = new StringBuilder(str);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String value = entry.getValue();
            sb.append("Topic: " + entry.getKey() + "[\r\n");
            sb.append(value);
            sb.append("]");
        }
        return sb.toString();
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void setMessageTPCorePoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MessageTPCorePoolSize");
        }
        this.remotingService.setMessageTPCorePoolSize(i);
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void setMessageTPMaximumPoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MessageTPMaximumPoolSize");
        }
        this.remotingService.setMessageTPMaximumPoolSize(i);
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void setMessageTPKeepAliveTime(long j) {
        if (j <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MessageTPKeepAliveTime");
        }
        this.remotingService.setMessageTPKeepAliveTime(j);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void setCheckMessageTPCorePoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的CheckMessageTPCorePoolSize");
        }
        this.remotingService.setCheckMessageTPCorePoolSize(i);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void setCheckMessageTPMaximumPoolSize(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的CheckMessageTPMaximumPoolSize");
        }
        this.remotingService.setCheckMessageTPMaximumPoolSize(i);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void setCheckMessageTPKeepAliveTime(long j) {
        if (j <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的CheckMessageTPKeepAliveTime");
        }
        this.remotingService.setCheckMessageTPKeepAliveTime(j);
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public int getMessageTPCorePoolSize() {
        return this.remotingService.getMessageTPCorePoolSize();
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public synchronized int getMessageTPMaximumPoolSize() {
        return this.remotingService.getMessageTPMaximumPoolSize();
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public long getMessageTPKeepAliveTime() {
        return this.remotingService.getMessageTPKeepAliveTime();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public int getCheckMessageTPCorePoolSize() {
        return this.remotingService.getCheckMessageTPCorePoolSize();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public int getCheckMessageTPMaximumPoolSize() {
        return this.remotingService.getCheckMessageTPMaximumPoolSize();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public long getCheckMessageTPKeepAliveTime() {
        return this.remotingService.getCheckMessageTPKeepAliveTime();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void setMessageCompressSize(String str, int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MessageCompressSize");
        }
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (null == group) {
            throw new NotifyClientIllegalArgumentException("无效的GroupID");
        }
        group.getMessageProperties().setCompressSize(i);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void setMaxStringMessageSize(String str, int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的MaxStringMessageSize");
        }
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (null == group) {
            throw new NotifyClientIllegalArgumentException("无效的GroupID");
        }
        group.getMessageProperties().setMaxStringMessageSize(i);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public int getMessageCompressSize(String str) {
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (null == group) {
            throw new NotifyClientIllegalArgumentException("无效的GroupID");
        }
        return group.getMessageProperties().getCompressSize();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public int getMaxStringMessageSize(String str) {
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (null == group) {
            throw new NotifyClientIllegalArgumentException("无效的GroupID");
        }
        return group.getMessageProperties().getMaxStringMessageSize();
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void closeSubscription(Binding binding) {
        this.remotingService.closeSubscription(binding);
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void openSubscription(Binding binding) {
        this.remotingService.openSubscription(binding);
    }

    @Override // com.taobao.notify.client.NotifyClient
    public void setReliableAsyncSendConfig(NotifyClientConfig notifyClientConfig) {
        this.reliableAsynSendManager.setNotifyClientConfig(notifyClientConfig);
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public int getConnectionCount() {
        return this.remotingService.getConnectionCount();
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public void setConnectionCount(int i) {
        if (i <= 0) {
            throw new NotifyClientIllegalArgumentException("无效的ConnectionCount");
        }
        this.remotingService.setConnectionCount(i);
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public void showStatus() {
        this.remotingService.showStatus();
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public Set<String> getGroupIds() {
        return this.notifyGroupManager.getGroupIds();
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized void setCheckMessageTPCorePoolSize(String str, int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("corePoolSize必须大于0");
        }
        ThreadPoolExecutor checkMessageWorkTPCreateNessary = getCheckMessageWorkTPCreateNessary(str);
        if (checkMessageWorkTPCreateNessary != null) {
            checkMessageWorkTPCreateNessary.setCorePoolSize(i);
        }
    }

    private ThreadPoolExecutor getCheckMessageWorkTPCreateNessary(String str) {
        ThreadPoolExecutor checkMessageWorkTP = getCheckMessageWorkTP(str);
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (checkMessageWorkTP == null) {
            checkMessageWorkTP = new ManagedThreadPoolExecutor(this.notifyClientConfig.getCheckMessageTPConfig().getCorePoolSize(), this.notifyClientConfig.getCheckMessageTPConfig().getMaxPoolSize(), this.notifyClientConfig.getCheckMessageTPConfig().getKeepAliveTime(), this.notifyClientConfig.getCheckMessageTPConfig().getMaxQueueSize(), str + "-checkMsgWorkTP-" + hashCode(), new ThreadPoolExecutor.AbortPolicy());
            group.setCheckMessageWorkTP(checkMessageWorkTP);
        }
        return checkMessageWorkTP;
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized void setCheckMessageTPKeepAliveTime(String str, long j) {
        if (j <= 0) {
            throw new IllegalArgumentException("keepAliveTime必须大于0");
        }
        ThreadPoolExecutor checkMessageWorkTPCreateNessary = getCheckMessageWorkTPCreateNessary(str);
        if (checkMessageWorkTPCreateNessary != null) {
            checkMessageWorkTPCreateNessary.setKeepAliveTime(j, TimeUnit.MILLISECONDS);
        }
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized void setCheckMessageTPMaximumPoolSize(String str, int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("maximumPoolSize必须大于0");
        }
        ThreadPoolExecutor checkMessageWorkTPCreateNessary = getCheckMessageWorkTPCreateNessary(str);
        if (checkMessageWorkTPCreateNessary != null) {
            checkMessageWorkTPCreateNessary.setMaximumPoolSize(i);
        }
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized void setMessageTPCorePoolSize(String str, int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("corePoolSize必须大于0");
        }
        ThreadPoolExecutor deliverMessageWorkTPCreateNessary = getDeliverMessageWorkTPCreateNessary(str);
        if (deliverMessageWorkTPCreateNessary != null) {
            deliverMessageWorkTPCreateNessary.setCorePoolSize(i);
        }
    }

    private ThreadPoolExecutor getDeliverMessageWorkTPCreateNessary(String str) {
        ThreadPoolExecutor deliverMessageWorkTP = getDeliverMessageWorkTP(str);
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (deliverMessageWorkTP == null) {
            deliverMessageWorkTP = new ManagedThreadPoolExecutor(this.notifyClientConfig.getMessageTPConfig().getCorePoolSize(), this.notifyClientConfig.getMessageTPConfig().getMaxPoolSize(), this.notifyClientConfig.getMessageTPConfig().getKeepAliveTime(), this.notifyClientConfig.getMessageTPConfig().getMaxQueueSize(), str + "-msgWorkTP-" + hashCode(), new ThreadPoolExecutor.AbortPolicy());
            group.setDeliverMessageWorkTP(deliverMessageWorkTP);
        }
        return deliverMessageWorkTP;
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized void setMessageTPKeepAliveTime(String str, long j) {
        if (j <= 0) {
            throw new IllegalArgumentException("keepAliveTime必须大于0");
        }
        ThreadPoolExecutor deliverMessageWorkTPCreateNessary = getDeliverMessageWorkTPCreateNessary(str);
        if (deliverMessageWorkTPCreateNessary != null) {
            deliverMessageWorkTPCreateNessary.setKeepAliveTime(j, TimeUnit.MILLISECONDS);
        }
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized void setMessageTPMaximumPoolSize(String str, int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("maximumPoolSize必须大于0");
        }
        ThreadPoolExecutor deliverMessageWorkTPCreateNessary = getDeliverMessageWorkTPCreateNessary(str);
        if (deliverMessageWorkTPCreateNessary != null) {
            deliverMessageWorkTPCreateNessary.setMaximumPoolSize(i);
        }
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized int getCheckMessageTPCorePoolSize(String str) {
        ThreadPoolExecutor checkMessageWorkTP = getCheckMessageWorkTP(str);
        if (checkMessageWorkTP != null) {
            return checkMessageWorkTP.getCorePoolSize();
        }
        return -1;
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized long getCheckMessageTPKeepAliveTime(String str) {
        ThreadPoolExecutor checkMessageWorkTP = getCheckMessageWorkTP(str);
        if (checkMessageWorkTP != null) {
            return checkMessageWorkTP.getKeepAliveTime(TimeUnit.MILLISECONDS);
        }
        return -1L;
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized int getCheckMessageTPMaximumPoolSize(String str) {
        ThreadPoolExecutor checkMessageWorkTP = getCheckMessageWorkTP(str);
        if (checkMessageWorkTP != null) {
            return checkMessageWorkTP.getMaximumPoolSize();
        }
        return -1;
    }

    private ThreadPoolExecutor getCheckMessageWorkTP(String str) {
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (group == null) {
            throw new IllegalArgumentException("找不到分组" + str);
        }
        return group.getCheckMessageWorkTP();
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized int getMessageTPCorePoolSize(String str) {
        ThreadPoolExecutor deliverMessageWorkTP = getDeliverMessageWorkTP(str);
        if (deliverMessageWorkTP != null) {
            return deliverMessageWorkTP.getCorePoolSize();
        }
        return -1;
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized long getMessageTPKeepAliveTime(String str) {
        ThreadPoolExecutor deliverMessageWorkTP = getDeliverMessageWorkTP(str);
        if (deliverMessageWorkTP != null) {
            return deliverMessageWorkTP.getKeepAliveTime(TimeUnit.MILLISECONDS);
        }
        return -1L;
    }

    @Override // com.taobao.notify.client.NotifyClient
    public synchronized int getMessageTPMaximumPoolSize(String str) {
        ThreadPoolExecutor deliverMessageWorkTP = getDeliverMessageWorkTP(str);
        if (deliverMessageWorkTP != null) {
            return deliverMessageWorkTP.getMaximumPoolSize();
        }
        return -1;
    }

    private ThreadPoolExecutor getDeliverMessageWorkTP(String str) {
        NotifyGroup group = this.notifyGroupManager.getGroup(str);
        if (group == null) {
            throw new IllegalArgumentException("找不到分组" + str);
        }
        return group.getDeliverMessageWorkTP();
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public void awaitReadyInterruptibly() throws InterruptedException {
        this.remotingService.awaitReadyInterruptibly();
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public void awaitReadyInterruptibly(long j, TimeUnit timeUnit) throws InterruptedException {
        this.remotingService.awaitReadyInterruptibly(j, timeUnit);
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public boolean initReliableAsynSendMessageModule() {
        return this.reliableAsynSendManager.init();
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public IOClientSelector getIoClientSelector() {
        return this.remotingService.getIoClientSelector();
    }

    @Override // com.taobao.notify.client.NotifyClientStub
    public void setIoClientSelector(IOClientSelector iOClientSelector) {
        this.remotingService.setIoClientSelector(iOClientSelector);
        this.ioClientSelector = iOClientSelector;
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public int getMessageTotalCount() {
        return this.reliableAsynSendManager.getMessageTotalCount();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public int getRemainMessageCount() {
        return this.reliableAsynSendManager.getRemainCommitMessageCount();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public boolean isSuspendRaliableAsynTask() {
        return this.reliableAsynSendManager.isSuspendRaliableAsynTask();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void resumeReliableAsynTask() {
        this.reliableAsynSendManager.resumeReliableAsynTask();
    }

    @Override // com.taobao.notify.client.NotifyPublisher
    public void suspendRaliableAsynTask() {
        this.reliableAsynSendManager.suspendRaliableAsynTask();
    }

    @Override // com.taobao.notify.client.NotifyClient
    public NSAddressLoadMode getNSAddressLoadMode() {
        return this.notifyClientConfig.getNSAddressLoadMode();
    }

    @Override // com.taobao.notify.client.NotifyClient
    public void setNSAddressLoadMode(NSAddressLoadMode nSAddressLoadMode) {
        this.notifyClientConfig.setNSAddressLoadMode(nSAddressLoadMode);
        this.remotingService.setNSAddressLoadMode(nSAddressLoadMode);
    }

    @Override // com.taobao.notify.client.NotifyClient
    public void setServiceHostsPath(String str) {
        this.addrRegCenter.setServiceHostsPath(str);
    }

    @Override // com.taobao.notify.client.INotifyClientEx
    public Set<Binding> getClosedSubscriptions() {
        return this.remotingService.getClosedSubscriptions();
    }

    @Override // com.taobao.notify.client.INotifyClientEx
    public Map<String, List<String>> getTopics2ServerURLByGroupId(String str) {
        Map<String, Set<String>> snapshotOfTopics2Group = this.remotingService.getSnapshotOfTopics2Group();
        Map<String, List<String>> snapshotOfTopics2ServerUrl = this.remotingService.getSnapshotOfTopics2ServerUrl();
        HashMap hashMap = new HashMap(8);
        for (Map.Entry<String, Set<String>> entry : snapshotOfTopics2Group.entrySet()) {
            if (entry.getValue().contains(str)) {
                hashMap.put(entry.getKey(), snapshotOfTopics2ServerUrl.get(entry.getKey()));
            }
        }
        return hashMap;
    }

    @Override // com.taobao.notify.client.INotifyClientEx
    public boolean isCheckMessageListenerExists(String str) {
        return this.notifyGroupManager.getGroup(str).getCheckMsgListener() != null;
    }

    @Override // com.taobao.notify.client.INotifyClientEx
    public boolean isMessageListenerExists(String str) {
        return this.notifyGroupManager.getGroup(str).getMsgListener() != null;
    }

    @Override // com.taobao.notify.client.NotifySubscriber
    public void subscribeMessage(String str, String str2, String str3, boolean z, int i, int i2) {
        subscribe(Binding.direct(str2, str3, str, i, z, i2));
    }
}
