/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.factory;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.ClientConfig;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.RemoteClientConfig;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.admin.MQAdminExtInner;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.ClientRemotingProcessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.FindBrokerResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQAdminImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientAPIImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.PullMessageService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.RebalanceService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.MQProducerInner;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.ordermessage.OrderMessageHandler;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.MQVersion;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.MixAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ServiceState;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.constant.PermName;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.filter.ExpressionType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.NamespaceUtil;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.ConsumerData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.ProducerData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.route.BrokerData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.route.QueueData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.route.QueueDesc;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.RPCHook;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.aliyun.openservices.shade.io.netty.channel.EventLoopGroup;
import com.aliyun.openservices.shade.io.netty.util.concurrent.EventExecutorGroup;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.net.DatagramSocket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MQClientInstance {
    private static final long LOCK_TIMEOUT_MILLIS = 3000L;
    private static final InternalLogger LOG = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final RemoteClientConfig remoteClientConfig = new RemoteClientConfig();
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    private final NettyClientConfig nettyClientConfig;
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;
    private final ConcurrentMap<String, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final ConcurrentMap<String, ConcurrentMap<String, Long>> topicQueueOffsetTable = new ConcurrentHashMap<String, ConcurrentMap<String, Long>>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
    private final ConcurrentMap<String, HashMap<String, Integer>> brokerVersionTable = new ConcurrentHashMap<String, HashMap<String, Integer>>();
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("MQClientFactoryScheduledThread", false));
    private final ScheduledExecutorService fetchRemoteConfigExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("MQClientFactoryFetchRemoteConfigScheduledThread", false));
    private final ClientRemotingProcessor clientRemotingProcessor;
    private final PullMessageService pullMessageService;
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    private final ConsumerStatsManager consumerStatsManager;
    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0L);
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private DatagramSocket datagramSocket;
    private Random random = new Random();

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
        this(clientConfig, instanceIndex, clientId, null);
    }

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this(clientConfig, instanceIndex, clientId, rpcHook, null, null);
    }

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook, EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutorGroup) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        this.nettyClientConfig.setSockProxyJson(clientConfig.getSockProxyJson());
        this.nettyClientConfig.setDisableCallbackExecutor(clientConfig.isDisableCallbackExecutor());
        this.nettyClientConfig.setDisableNettyWorkerGroup(clientConfig.isDisableNettyWorkerGroup());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig, eventLoopGroup, eventExecutorGroup);
        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            LOG.info("user specified name server address: {}", (Object)this.clientConfig.getNamesrvAddr());
        }
        this.clientId = clientId;
        this.mQAdminImpl = new MQAdminImpl(this);
        this.pullMessageService = new PullMessageService(this);
        this.rebalanceService = new RebalanceService(this);
        this.defaultMQProducer = new DefaultMQProducer("CLIENT_INNER_PRODUCER");
        this.defaultMQProducer.resetClientConfig(clientConfig);
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
        this.mQClientAPIImpl.setConsumerStatsManager(this.consumerStatsManager);
        LOG.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", new Object[]{this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()});
    }

    public static TopicPublishInfo topicRouteData2TopicPublishInfo(String topic, TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        info.setTopicRouteData(route);
        String orderConf = route.getOrderTopicConf();
        if (!route.isHAOrderTopic() && orderConf != null && orderConf.length() > 0) {
            String[] brokers;
            for (String broker : brokers = orderConf.split(";")) {
                String[] item = broker.split(":");
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; ++i) {
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }
            info.setOrderTopic(true);
        } else {
            List<QueueData> qds = route.getQueueDatas();
            Collections.sort(qds);
            for (QueueData qd : qds) {
                if (!PermName.isWriteable(qd.getPerm())) continue;
                BrokerData brokerData = null;
                for (BrokerData bd : route.getBrokerDatas()) {
                    if (!bd.getBrokerName().equals(qd.getBrokerName())) continue;
                    brokerData = bd;
                    break;
                }
                if (null == brokerData || !brokerData.getBrokerAddrs().containsKey(0L)) continue;
                if (route.isHAOrderTopic()) {
                    if (qd.getQueueDescList() == null) continue;
                    List<MessageQueue> messageQueueList = info.getMessageQueueList();
                    for (QueueDesc queueDesc : qd.getQueueDescList()) {
                        if (queueDesc.getQueueGroupId() >= route.getWriteQueueGroupNums()) continue;
                        messageQueueList.add(new MessageQueue(topic, qd.getBrokerName(), queueDesc.getMessageQueueId(), queueDesc.getQueueGroupId(), queueDesc.isMainQueue()));
                        if (!queueDesc.isMainQueue()) continue;
                        info.setMainQueuePreferred(true);
                    }
                    continue;
                }
                for (int i = 0; i < qd.getWriteQueueNums(); ++i) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
            if (route.isHAOrderTopic()) {
                info.setOrderTopic(true);
            } else {
                info.setOrderTopic(false);
            }
        }
        return info;
    }

    public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(String topic, TopicRouteData route) {
        HashSet<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        List<QueueDesc> qs = null;
        HashMap mqMap = new HashMap();
        for (QueueData qd : qds) {
            if (!PermName.isReadable(qd.getPerm())) continue;
            if (route.isHAOrderTopic()) {
                qs = qd.getQueueDescList();
                if (qs == null) continue;
                for (QueueDesc queueDesc : qs) {
                    if (queueDesc.getQueueGroupId() >= route.getReadQueueGroupNums()) continue;
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), queueDesc.getMessageQueueId(), queueDesc.getQueueGroupId(), queueDesc.isMainQueue());
                    if (!mqMap.containsKey(mq.getQueueGroupId())) {
                        mqMap.put(mq.getQueueGroupId(), new ArrayList());
                    }
                    ((List)mqMap.get(mq.getQueueGroupId())).add(mq);
                }
                continue;
            }
            for (int i = 0; i < qd.getReadQueueNums(); ++i) {
                MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                mqList.add(mq);
            }
        }
        Iterator it = mqMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            if (((List)entry.getValue()).size() != route.getOrderTopicQueueGroupSize()) {
                LOG.warn("Topic: {}, group id: {}. QueueGroup incomplete, expect size: {}, actual size: {}", topic, entry.getKey(), route.getOrderTopicQueueGroupSize(), ((List)entry.getValue()).size());
                it.remove();
                continue;
            }
            mqList.addAll((Collection)entry.getValue());
        }
        return mqList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws MQClientException {
        MQClientInstance mQClientInstance = this;
        synchronized (mQClientInstance) {
            switch (this.serviceState) {
                case CREATE_JUST: {
                    this.serviceState = ServiceState.START_FAILED;
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    this.mQClientAPIImpl.start();
                    this.startScheduledTask();
                    this.pullMessageService.start();
                    this.rebalanceService.start();
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    LOG.info("the client factory [{}] start OK", (Object)this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                }
                case RUNNING: {
                    break;
                }
                case SHUTDOWN_ALREADY: {
                    break;
                }
                case START_FAILED: {
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                }
            }
        }
    }

    private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    catch (Exception e) {
                        LOG.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 10000L, 120000L, TimeUnit.MILLISECONDS);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                }
                catch (Exception e) {
                    LOG.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10L, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
        if (this.clientConfig.isFetchRemoteClientConfigEnable()) {
            this.fetchRemoteConfigExecutorService.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        boolean lastOffline = MQClientInstance.this.remoteClientConfig.isOffline();
                        Properties property = MQClientInstance.this.mQClientAPIImpl.getRemoteClientConfig(3000L);
                        MixAll.properties2Object(property, MQClientInstance.this.remoteClientConfig);
                        LOG.warn("update offline config, {}, {} -> {}", MQClientInstance.this.clientId, lastOffline, MQClientInstance.this.remoteClientConfig.isOffline());
                        if (MQClientInstance.this.remoteClientConfig.isOffline()) {
                            MQClientInstance.this.allClientsOffline();
                        } else {
                            MQClientInstance.this.allClientsOnline();
                        }
                    }
                    catch (Exception e) {
                        LOG.warn("ScheduledTask getRemoteClientConfig failed, " + e.getMessage());
                    }
                }
            }, 10L, this.clientConfig.getClientConfigInterval(), TimeUnit.MILLISECONDS);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                if (MQClientInstance.this.getDefaultMQProducer().getDefaultMQProducerImpl().getServiceState() == ServiceState.RUNNING) {
                    try {
                        MQClientInstance.this.fetchConsumeQueueOffsetFromBroker();
                    }
                    catch (Exception e) {
                        LOG.warn("ScheduledTask updateConsumeQueueOffsetFromBroker failed, " + e.getMessage());
                    }
                }
            }
        }, 10L, this.clientConfig.getUpdateConsumeQueueOffsetInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                }
                catch (Exception e) {
                    LOG.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000L, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                }
                catch (Exception e) {
                    LOG.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 10000L, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                }
                catch (Exception e) {
                    LOG.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    public String getClientId() {
        return this.clientId;
    }

    public RemoteClientConfig getRemoteClientConfig() {
        return this.remoteClientConfig;
    }

    public void updateTopicRouteInfoFromNameServer() {
        HashSet<String> topicList = new HashSet<String>();
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            DefaultMQPullConsumerImpl consumer;
            String consumerGroup = (String)entry.getKey();
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            Set<SubscriptionData> subList = impl.subscriptions();
            if (subList != null) {
                for (SubscriptionData subData : subList) {
                    topicList.add(subData.getTopic());
                }
            }
            if (impl instanceof DefaultMQPullConsumerImpl && (consumer = (DefaultMQPullConsumerImpl)impl).getDefaultMQPullConsumer().isAutoUpdateTopicRoute()) {
                topicList.addAll(consumer.getRebalanceImpl().getSubscriptionInner().keySet());
            }
            if (!(impl instanceof DefaultMQPushConsumerImpl)) continue;
            String retryTopic = "%RETRY%" + consumerGroup;
            topicList.add(retryTopic);
        }
        for (Map.Entry entry : this.producerTable.entrySet()) {
            MQProducerInner impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            Set<String> lst = impl.getPublishTopicList();
            topicList.addAll(lst);
        }
        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }

    public void fetchConsumeQueueOffsetFromBroker() throws MQClientException {
        for (Map.Entry entry : this.producerTable.entrySet()) {
            MQProducerInner impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            Set<String> list = impl.getPublishTopicList();
            for (String topic : list) {
                TopicPublishInfo info = impl.getTopicPublishInfo(topic);
                if (!info.isHAOrderTopic()) continue;
                this.fetchConsumeQueueOffsetFromBroker(info);
            }
        }
    }

    public void fetchConsumeQueueOffsetFromBroker(TopicPublishInfo info) throws MQClientException {
        if (info == null) {
            return;
        }
        for (MessageQueue mq : info.getMessageQueueList()) {
            String topic = mq.getTopic();
            long newOffset = this.getDefaultMQProducer().getDefaultMQProducerImpl().maxOffset(mq, false);
            this.putQueueOffset(topic, mq.generateKey(), newOffset);
        }
    }

    public Map<MessageQueue, Long> parseOffsetTableFromBroker(Map<MessageQueue, Long> offsetTable, String namespace) {
        HashMap<MessageQueue, Long> newOffsetTable = new HashMap<MessageQueue, Long>();
        if (StringUtils.isNotEmpty(namespace)) {
            for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
                MessageQueue queue = entry.getKey();
                queue.setTopic(NamespaceUtil.withoutNamespace(queue.getTopic(), namespace));
                newOffsetTable.put(queue, entry.getValue());
            }
        } else {
            newOffsetTable.putAll(offsetTable);
        }
        return newOffsetTable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanOfflineBroker() {
        block9: {
            try {
                if (!this.lockNamesrv.tryLock(3000L, TimeUnit.MILLISECONDS)) break block9;
                try {
                    ConcurrentHashMap updatedTable = new ConcurrentHashMap();
                    Iterator itBrokerTable = this.brokerAddrTable.entrySet().iterator();
                    while (itBrokerTable.hasNext()) {
                        Map.Entry entry = itBrokerTable.next();
                        String brokerName = (String)entry.getKey();
                        HashMap oneTable = (HashMap)entry.getValue();
                        HashMap cloneAddrTable = new HashMap();
                        cloneAddrTable.putAll(oneTable);
                        Iterator it = cloneAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Map.Entry ee = it.next();
                            String addr = (String)ee.getValue();
                            if (this.isBrokerAddrExistInTopicRouteTable(addr)) continue;
                            it.remove();
                            LOG.info("the broker addr[{} {}] is offline, remove it", (Object)brokerName, (Object)addr);
                        }
                        if (cloneAddrTable.isEmpty()) {
                            itBrokerTable.remove();
                            LOG.info("the broker[{}] name's host is offline, remove it", (Object)brokerName);
                            continue;
                        }
                        updatedTable.put(brokerName, cloneAddrTable);
                    }
                    if (!updatedTable.isEmpty()) {
                        this.brokerAddrTable.putAll(updatedTable);
                    }
                }
                finally {
                    this.lockNamesrv.unlock();
                }
            }
            catch (InterruptedException e) {
                LOG.warn("cleanOfflineBroker Exception", e);
            }
        }
    }

    public void checkClientInBroker() throws MQClientException {
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            Set<SubscriptionData> subscriptionInner = ((MQConsumerInner)entry.getValue()).subscriptions();
            if (subscriptionInner == null || subscriptionInner.isEmpty()) {
                return;
            }
            for (SubscriptionData subscriptionData : subscriptionInner) {
                String addr;
                if (ExpressionType.isTagType(subscriptionData.getExpressionType()) || (addr = this.findBrokerAddrByTopic(subscriptionData.getTopic())) == null) continue;
                try {
                    this.getMQClientAPIImpl().checkClientInBroker(addr, (String)entry.getKey(), this.clientId, subscriptionData, 3000L);
                }
                catch (Exception e) {
                    if (e instanceof MQClientException) {
                        throw (MQClientException)e;
                    }
                    throw new MQClientException("Check client in broker error, maybe because you use " + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!" + "This error would not affect the launch of consumer, but may has impact on message receiving if you " + "have use the new features which are not supported by server, please check the log!", e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendHeartbeatToAllBrokerWithTimedLock() {
        block8: {
            try {
                if (this.lockHeartbeat.tryLock(3000L, TimeUnit.MILLISECONDS)) {
                    try {
                        this.sendHeartbeatToAllBroker();
                        this.uploadFilterClassSource();
                        break block8;
                    }
                    catch (Exception e) {
                        LOG.error("sendHeartbeatToAllBroker exception", e);
                        break block8;
                    }
                    finally {
                        this.lockHeartbeat.unlock();
                    }
                }
                LOG.warn("lock heartBeat, but failed.");
            }
            catch (InterruptedException e) {
                LOG.warn("sendHeartbeatToAllBrokerWithTimedLock exception", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            }
            catch (Exception e) {
                LOG.error("sendHeartbeatToAllBroker exception", e);
            }
            finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            LOG.warn("lock heartBeat, but failed.");
        }
    }

    private void persistAllConsumerOffset() {
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            if (impl.isOffline()) continue;
            impl.persistConsumerOffset();
        }
    }

    public void adjustThreadPool() {
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            try {
                if (!(impl instanceof DefaultMQPushConsumerImpl)) continue;
                DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl)impl;
                dmq.adjustThreadPool();
            }
            catch (Exception exception) {}
        }
    }

    public boolean updateTopicRouteInfoFromNameServer(String topic) {
        return this.updateTopicRouteInfoFromNameServer(topic, false, null);
    }

    private boolean isBrokerAddrExistInTopicRouteTable(String addr) {
        for (Map.Entry entry : this.topicRouteTable.entrySet()) {
            TopicRouteData topicRouteData = (TopicRouteData)entry.getValue();
            List<BrokerData> bds = topicRouteData.getBrokerDatas();
            for (BrokerData bd : bds) {
                boolean exist;
                if (bd.getBrokerAddrs() == null || !(exist = bd.getBrokerAddrs().containsValue(addr))) continue;
                return true;
            }
        }
        return false;
    }

    private void sendHeartbeatToAllBroker() {
        HashSet<String> activeBrokerNameSet = new HashSet<String>();
        HashSet<String> activeTopicSet = new HashSet<String>();
        for (String topic : this.topicRouteTable.keySet()) {
            List<QueueData> queueDataList;
            TopicRouteData topicRouteData = (TopicRouteData)this.topicRouteTable.get(topic);
            if (null == topicRouteData || null == (queueDataList = topicRouteData.getQueueDatas())) continue;
            for (QueueData queueData : queueDataList) {
                String brokerName = queueData.getBrokerName();
                int readQueueNums = queueData.getReadQueueNums();
                int writeQueueNums = queueData.getWriteQueueNums();
                int perm = queueData.getPerm();
                if ((readQueueNums <= 0 || !PermName.isReadable(perm)) && (writeQueueNums <= 0 || !PermName.isWriteable(perm))) continue;
                activeBrokerNameSet.add(brokerName);
                activeTopicSet.add(topic);
            }
        }
        HeartbeatData heartbeatData = this.prepareHeartbeatData(activeTopicSet);
        boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            LOG.warn("sending heartbeat, but no consumer and no producer");
            return;
        }
        if (this.brokerAddrTable.isEmpty()) {
            return;
        }
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        for (String brokerName : activeBrokerNameSet) {
            HashMap brokerIdAddressTable = (HashMap)this.brokerAddrTable.get(brokerName);
            if (null == brokerIdAddressTable || brokerIdAddressTable.isEmpty()) continue;
            for (Map.Entry entry : brokerIdAddressTable.entrySet()) {
                Long id = (Long)entry.getKey();
                String addr = (String)entry.getValue();
                if (null == addr || consumerEmpty && id != 0L) continue;
                try {
                    int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, 3000L);
                    if (!this.brokerVersionTable.containsKey(brokerName)) {
                        this.brokerVersionTable.put(brokerName, new HashMap(4));
                    }
                    ((HashMap)this.brokerVersionTable.get(brokerName)).put(addr, version);
                    if (times % 20L != 0L) continue;
                    LOG.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                    LOG.info(heartbeatData.toString());
                }
                catch (Exception e) {
                    if (this.isBrokerInNameServer(addr)) {
                        LOG.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                        continue;
                    }
                    LOG.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr);
                }
            }
        }
    }

    private void uploadFilterClassSource() {
        for (Map.Entry next : this.consumerTable.entrySet()) {
            MQConsumerInner consumer = (MQConsumerInner)next.getValue();
            if (ConsumeType.CONSUME_PASSIVELY != consumer.consumeType()) continue;
            Set<SubscriptionData> subscriptions = consumer.subscriptions();
            for (SubscriptionData sub : subscriptions) {
                if (!sub.isClassFilterMode() || sub.getFilterClassSource() == null) continue;
                String consumerGroup = consumer.groupName();
                String className = sub.getSubString();
                String topic = sub.getTopic();
                String filterClassSource = sub.getFilterClassSource();
                try {
                    this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
                }
                catch (Exception e) {
                    LOG.error("uploadFilterClassToAllFilterServer Exception", e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean updateTopicRouteInfoFromNameServer(String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        try {
            if (!this.lockNamesrv.tryLock(3000L, TimeUnit.MILLISECONDS)) {
                LOG.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", (Object)3000L);
                return false;
            }
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 3000L);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000L);
                }
                if (topicRouteData != null) {
                    Object impl;
                    TopicRouteData old = (TopicRouteData)this.topicRouteTable.get(topic);
                    boolean changed = this.topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        LOG.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                    }
                    if (!changed) return false;
                    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                    }
                    TopicPublishInfo publishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
                    if (publishInfo.isHAOrderTopic()) {
                        publishInfo.setQueueListMap(OrderMessageHandler.generateQueueListMap(publishInfo));
                    }
                    publishInfo.setHaveTopicRouterInfo(true);
                    for (Map.Entry entry : this.producerTable.entrySet()) {
                        impl = (MQProducerInner)entry.getValue();
                        if (impl == null) continue;
                        impl.updateTopicPublishInfo(topic, publishInfo);
                    }
                    Set<MessageQueue> subscribeInfo = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                    Iterator it = this.consumerTable.entrySet().iterator();
                    while (true) {
                        Map.Entry entry;
                        if (!it.hasNext()) {
                            LOG.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", (Object)topic, (Object)cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            boolean bl = true;
                            return bl;
                        }
                        entry = it.next();
                        impl = (MQConsumerInner)entry.getValue();
                        if (impl == null) continue;
                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                    }
                }
                LOG.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", (Object)topic);
                return false;
            }
            catch (Exception e) {
                if (topic.startsWith("%RETRY%")) return false;
                if (topic.equals("TBW102")) return false;
                LOG.warn("updateTopicRouteInfoFromNameServer Exception", e);
                if (!(e instanceof MQClientException)) return false;
                if (17 != ((MQClientException)e).getResponseCode()) return false;
                this.cleanNoneRouteTopic(topic);
                return false;
            }
            finally {
                this.lockNamesrv.unlock();
            }
        }
        catch (InterruptedException e) {
            LOG.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
        return false;
    }

    private void cleanNoneRouteTopic(String topic) {
        Object impl;
        TopicRouteData prev = (TopicRouteData)this.topicRouteTable.remove(topic);
        if (prev != null) {
            LOG.info("cleanNoneRouteTopic remove topic route data, {}, {}", (Object)topic, (Object)prev);
        }
        for (Map.Entry entry : this.producerTable.entrySet()) {
            impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            impl.removeTopicPublishInfo(topic);
        }
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            impl.removeTopicSubscribeInfo(topic);
        }
        this.defaultMQProducer.getDefaultMQProducerImpl().removeTopicPublishInfo(topic);
    }

    private HeartbeatData prepareHeartbeatData(Set<String> activeTopicSet) {
        Object impl;
        HeartbeatData heartbeatData = new HeartbeatData();
        heartbeatData.setClientID(this.clientId);
        if (this.getClientConfig().isAllClientsOffline()) {
            return heartbeatData;
        }
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            Set<SubscriptionData> subscriptions;
            impl = (MQConsumerInner)entry.getValue();
            if (impl == null || impl.isOffline() || null == (subscriptions = impl.subscriptions())) continue;
            boolean subscriptionsIsAvailable = false;
            for (SubscriptionData subscriptionData : subscriptions) {
                String topic;
                if (null == subscriptionData || !activeTopicSet.contains(topic = subscriptionData.getTopic())) continue;
                subscriptionsIsAvailable = true;
                break;
            }
            if (!subscriptionsIsAvailable) continue;
            ConsumerData consumerData = new ConsumerData();
            consumerData.setGroupName(impl.groupName());
            consumerData.setConsumeType(impl.consumeType());
            consumerData.setMessageModel(impl.messageModel());
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            consumerData.getSubscriptionDataSet().addAll(subscriptions);
            consumerData.setUnitMode(impl.isUnitMode());
            heartbeatData.getConsumerDataSet().add(consumerData);
        }
        for (Map.Entry entry : this.producerTable.entrySet()) {
            impl = (MQProducerInner)entry.getValue();
            if (impl == null || impl.isOffline()) continue;
            ProducerData producerData = new ProducerData();
            producerData.setGroupName((String)entry.getKey());
            heartbeatData.getProducerDataSet().add(producerData);
        }
        return heartbeatData;
    }

    private boolean isBrokerInNameServer(String brokerAddr) {
        for (Map.Entry itNext : this.topicRouteTable.entrySet()) {
            List<BrokerData> brokerDatas = ((TopicRouteData)itNext.getValue()).getBrokerDatas();
            for (BrokerData bd : brokerDatas) {
                boolean contain = bd.getBrokerAddrs().containsValue(brokerAddr);
                if (!contain) continue;
                return true;
            }
        }
        return false;
    }

    private void uploadFilterClassToAllFilterServer(String consumerGroup, String fullClassName, String topic, String filterClassSource) throws UnsupportedEncodingException {
        byte[] classBody = null;
        int classCRC = 0;
        try {
            classBody = filterClassSource.getBytes("UTF-8");
            classCRC = UtilAll.crc32(classBody);
        }
        catch (Exception e1) {
            LOG.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", (Object)fullClassName, (Object)RemotingHelper.exceptionSimpleDesc(e1));
        }
        TopicRouteData topicRouteData = (TopicRouteData)this.topicRouteTable.get(topic);
        if (topicRouteData != null && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
            for (Map.Entry<String, List<String>> next : topicRouteData.getFilterServerTable().entrySet()) {
                List<String> value = next.getValue();
                for (String fsAddr : value) {
                    try {
                        this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody, 5000L);
                        LOG.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup, topic, fullClassName);
                    }
                    catch (Exception e) {
                        LOG.error("uploadFilterClassToAllFilterServer Exception", e);
                    }
                }
            }
        } else {
            LOG.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}", consumerGroup, topic, fullClassName);
        }
    }

    private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
        if (olddata == null || nowdata == null) {
            return true;
        }
        TopicRouteData old = olddata.cloneTopicRouteData();
        TopicRouteData now = nowdata.cloneTopicRouteData();
        Collections.sort(old.getQueueDatas());
        Collections.sort(old.getBrokerDatas());
        Collections.sort(now.getQueueDatas());
        Collections.sort(now.getBrokerDatas());
        return !old.equals(now);
    }

    private boolean isNeedUpdateTopicRouteInfo(String topic) {
        Object impl;
        Map.Entry entry;
        boolean result = false;
        Iterator it = this.producerTable.entrySet().iterator();
        while (it.hasNext() && !result) {
            entry = it.next();
            impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            result = impl.isPublishTopicNeedUpdate(topic);
        }
        it = this.consumerTable.entrySet().iterator();
        while (it.hasNext() && !result) {
            entry = it.next();
            impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            result = impl.isSubscribeTopicNeedUpdate(topic);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        if (!this.consumerTable.isEmpty()) {
            return;
        }
        if (!this.adminExtTable.isEmpty()) {
            return;
        }
        if (this.producerTable.size() > 1) {
            return;
        }
        MQClientInstance mQClientInstance = this;
        synchronized (mQClientInstance) {
            switch (this.serviceState) {
                case CREATE_JUST: {
                    break;
                }
                case RUNNING: {
                    this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
                    this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                    this.pullMessageService.shutdown(true);
                    this.scheduledExecutorService.shutdown();
                    this.mQClientAPIImpl.shutdown();
                    this.rebalanceService.shutdown();
                    if (this.datagramSocket != null) {
                        this.datagramSocket.close();
                        this.datagramSocket = null;
                    }
                    MQClientManager.getInstance().removeClientFactory(this.clientId);
                    LOG.info("the client factory [{}] shutdown OK", (Object)this.clientId);
                    break;
                }
                case SHUTDOWN_ALREADY: {
                    break;
                }
            }
        }
    }

    public boolean registerConsumer(String group, MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }
        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            LOG.warn("the consumer group[" + group + "] exist already.");
            return false;
        }
        return true;
    }

    public void unregisterConsumer(String group) {
        this.consumerTable.remove(group);
        this.unregisterClientWithLock(null, group);
    }

    public void offlineConsumer(String group) {
        this.unregisterClientWithLock(null, group);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unregisterClientWithLock(String producerGroup, String consumerGroup) {
        block8: {
            try {
                if (this.lockHeartbeat.tryLock(3000L, TimeUnit.MILLISECONDS)) {
                    try {
                        this.unregisterClient(producerGroup, consumerGroup);
                        break block8;
                    }
                    catch (Exception e) {
                        LOG.error("unregisterClient exception", e);
                        break block8;
                    }
                    finally {
                        this.lockHeartbeat.unlock();
                    }
                }
                LOG.warn("lock heartBeat, but failed.");
            }
            catch (InterruptedException e) {
                LOG.warn("unregisterClientWithLock exception", e);
            }
        }
    }

    private void unregisterClient(String producerGroup, String consumerGroup) {
        for (Map.Entry entry : this.brokerAddrTable.entrySet()) {
            String brokerName = (String)entry.getKey();
            HashMap oneTable = (HashMap)entry.getValue();
            if (oneTable == null) continue;
            for (Map.Entry entry1 : oneTable.entrySet()) {
                String addr = (String)entry1.getValue();
                if (addr == null) continue;
                try {
                    this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000L);
                    LOG.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                }
                catch (RemotingException e) {
                    LOG.warn("unregister client RemotingException from broker: {}, {}", (Object)addr, (Object)e.getMessage());
                }
                catch (InterruptedException e) {
                    LOG.warn("unregister client InterruptedException from broker: {}, {}", (Object)addr, (Object)e.getMessage());
                }
                catch (MQBrokerException e) {
                    LOG.warn("unregister client MQBrokerException from broker: {}, {}", (Object)addr, (Object)e.getMessage());
                }
            }
        }
    }

    public boolean registerProducer(String group, MQProducerInner producer) {
        if (null == group || null == producer) {
            return false;
        }
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            LOG.warn("the producer group[{}] exist already.", (Object)group);
            return false;
        }
        return true;
    }

    public void unregisterProducer(String group) {
        this.producerTable.remove(group);
        this.unregisterClientWithLock(group, null);
    }

    public void offlineProducer(String group) {
        this.unregisterClientWithLock(group, null);
    }

    public void allClientsOffline() {
        this.getClientConfig().setAllClientsOffline(true);
        for (Map.Entry entry : this.producerTable.entrySet()) {
            String producerName = (String)entry.getKey();
            MQProducerInner producerInner = (MQProducerInner)entry.getValue();
            if (producerName == null || producerName.equals("")) continue;
            producerInner.offline();
        }
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner consumerInner;
            String consumerName = (String)entry.getKey();
            if (consumerName == null || consumerName.equals("") || null == (consumerInner = (MQConsumerInner)entry.getValue())) continue;
            consumerInner.offline();
        }
    }

    public void allClientsOnline() {
        this.getClientConfig().setAllClientsOffline(false);
        for (Map.Entry entry : this.producerTable.entrySet()) {
            String producerName = (String)entry.getKey();
            MQProducerInner producerInner = (MQProducerInner)entry.getValue();
            if (producerName == null || producerName.equals("")) continue;
            producerInner.online();
        }
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner consumerInner;
            String consumerName = (String)entry.getKey();
            if (consumerName == null || consumerName.equals("") || null == (consumerInner = (MQConsumerInner)entry.getValue())) continue;
            consumerInner.online();
        }
    }

    public boolean registerAdminExt(String group, MQAdminExtInner admin) {
        if (null == group || null == admin) {
            return false;
        }
        MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin);
        if (prev != null) {
            LOG.warn("the admin group[{}] exist already.", (Object)group);
            return false;
        }
        return true;
    }

    public void unregisterAdminExt(String group) {
        this.adminExtTable.remove(group);
    }

    public void rebalanceLater(long delayMillis) {
        if (delayMillis <= 0L) {
            this.rebalanceService.wakeup();
        } else {
            this.scheduledExecutorService.schedule(new Runnable(){

                @Override
                public void run() {
                    MQClientInstance.this.rebalanceService.wakeup();
                }
            }, delayMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void rebalanceImmediately() {
        this.rebalanceService.wakeup();
    }

    public boolean doRebalance() {
        boolean balanced = true;
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            try {
                if (impl.doRebalance()) continue;
                balanced = false;
            }
            catch (Throwable e) {
                LOG.error("doRebalance exception", e);
            }
        }
        return balanced;
    }

    public MQProducerInner selectProducer(String group) {
        return (MQProducerInner)this.producerTable.get(group);
    }

    public MQConsumerInner selectConsumer(String group) {
        return (MQConsumerInner)this.consumerTable.get(group);
    }

    public FindBrokerResult findBrokerAddressInAdmin(String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
        HashMap map = (HashMap)this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            for (Map.Entry entry : map.entrySet()) {
                Long id = (Long)entry.getKey();
                brokerAddr = (String)entry.getValue();
                if (brokerAddr == null) continue;
                found = true;
                if (0L == id) {
                    slave = false;
                    break;
                }
                slave = true;
                break;
            }
        }
        if (found) {
            return new FindBrokerResult(brokerAddr, slave, this.findBrokerVersion(brokerName, brokerAddr));
        }
        return null;
    }

    public String findBrokerAddressInPublish(String brokerName) {
        HashMap map = (HashMap)this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            return (String)map.get(0L);
        }
        return null;
    }

    public FindBrokerResult findBrokerAddressInSubscribe(String brokerName, long brokerId, boolean onlyThisBroker) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
        HashMap map = (HashMap)this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            brokerAddr = (String)map.get(brokerId);
            slave = brokerId != 0L;
            boolean bl = found = brokerAddr != null;
            if (!found && !onlyThisBroker) {
                Map.Entry entry = map.entrySet().iterator().next();
                brokerAddr = (String)entry.getValue();
                slave = (Long)entry.getKey() != 0L;
                found = true;
            }
        }
        if (found) {
            return new FindBrokerResult(brokerAddr, slave, this.findBrokerVersion(brokerName, brokerAddr));
        }
        return null;
    }

    public int findBrokerVersion(String brokerName, String brokerAddr) {
        if (this.brokerVersionTable.containsKey(brokerName) && ((HashMap)this.brokerVersionTable.get(brokerName)).containsKey(brokerAddr)) {
            return (Integer)((HashMap)this.brokerVersionTable.get(brokerName)).get(brokerAddr);
        }
        return 0;
    }

    public List<String> findConsumerIdList(String topic, String group) {
        String brokerAddr = this.findBrokerAddrByTopic(topic);
        if (null == brokerAddr) {
            this.updateTopicRouteInfoFromNameServer(topic);
            brokerAddr = this.findBrokerAddrByTopic(topic);
        }
        if (null != brokerAddr) {
            try {
                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000L);
            }
            catch (Exception e) {
                LOG.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
            }
        }
        return null;
    }

    public String findBrokerAddrByTopic(String topic) {
        List<BrokerData> brokers;
        TopicRouteData topicRouteData = (TopicRouteData)this.topicRouteTable.get(topic);
        if (topicRouteData != null && !(brokers = topicRouteData.getBrokerDatas()).isEmpty()) {
            int index = this.random.nextInt(brokers.size());
            BrokerData bd = brokers.get(index % brokers.size());
            return bd.selectBrokerAddr();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
        DefaultMQPushConsumerImpl consumer = null;
        try {
            MQConsumerInner impl = (MQConsumerInner)this.consumerTable.get(group);
            if (impl == null || !(impl instanceof DefaultMQPushConsumerImpl)) {
                LOG.info("[reset-offset] consumer dose not exist. group={}", (Object)group);
                return;
            }
            consumer = (DefaultMQPushConsumerImpl)impl;
            consumer.suspend();
            ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
            for (Map.Entry entry : processQueueTable.entrySet()) {
                MessageQueue mq = (MessageQueue)entry.getKey();
                if (!topic.equals(mq.getTopic()) || !offsetTable.containsKey(mq)) continue;
                ProcessQueue pq = (ProcessQueue)entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
            try {
                TimeUnit.SECONDS.sleep(10L);
            }
            catch (InterruptedException i$) {
                // empty catch block
            }
            Iterator iterator = processQueueTable.keySet().iterator();
            while (iterator.hasNext()) {
                MessageQueue mq = (MessageQueue)iterator.next();
                Long offset = offsetTable.get(mq);
                if (!topic.equals(mq.getTopic()) || offset == null) continue;
                try {
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, (ProcessQueue)processQueueTable.get(mq));
                    iterator.remove();
                }
                catch (Exception e) {
                    LOG.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
        finally {
            if (consumer != null) {
                consumer.resume();
            }
        }
    }

    public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
        MQConsumerInner impl = (MQConsumerInner)this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)impl;
            return consumer.getOffsetStore().cloneOffsetTable(topic);
        }
        if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
            DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl)impl;
            return consumer.getOffsetStore().cloneOffsetTable(topic);
        }
        return Collections.EMPTY_MAP;
    }

    public TopicRouteData getAnExistTopicRouteData(String topic) {
        return (TopicRouteData)this.topicRouteTable.get(topic);
    }

    public MQClientAPIImpl getMQClientAPIImpl() {
        return this.mQClientAPIImpl;
    }

    public MQAdminImpl getMQAdminImpl() {
        return this.mQAdminImpl;
    }

    public long getBootTimestamp() {
        return this.bootTimestamp;
    }

    public ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    public PullMessageService getPullMessageService() {
        return this.pullMessageService;
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return this.defaultMQProducer;
    }

    public ConcurrentMap<String, TopicRouteData> getTopicRouteTable() {
        return this.topicRouteTable;
    }

    public ConcurrentMap<String, ConcurrentMap<String, Long>> getTopicQueueOffsetTable() {
        return this.topicQueueOffsetTable;
    }

    public ConcurrentMap<String, Long> findQueueOffsetTable(String topic) {
        ConcurrentMap offsetTable = (ConcurrentMap)this.topicQueueOffsetTable.get(topic);
        return offsetTable;
    }

    public void putQueueOffset(String topic, String key, long newOffset) {
        Long oldOffset;
        ConcurrentMap<String, Long> offsetTable = this.findQueueOffsetTable(topic);
        if (offsetTable == null) {
            this.topicQueueOffsetTable.putIfAbsent(topic, new ConcurrentHashMap());
        }
        offsetTable = this.findQueueOffsetTable(topic);
        while ((oldOffset = offsetTable.putIfAbsent(key, newOffset)) != null && oldOffset < newOffset && !offsetTable.replace(key, oldOffset, newOffset)) {
        }
    }

    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String consumerGroup, String brokerName) {
        MQConsumerInner mqConsumerInner = (MQConsumerInner)this.consumerTable.get(consumerGroup);
        if (null != mqConsumerInner) {
            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)mqConsumerInner;
            ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
            return result;
        }
        return null;
    }

    public ConsumerRunningInfo consumerRunningInfo(String consumerGroup) {
        MQConsumerInner mqConsumerInner = (MQConsumerInner)this.consumerTable.get(consumerGroup);
        ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
        List<String> nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList();
        StringBuilder strBuilder = new StringBuilder();
        if (nsList != null) {
            for (String addr : nsList) {
                strBuilder.append(addr).append(";");
            }
        }
        String nsAddr = strBuilder.toString();
        consumerRunningInfo.getProperties().put("PROP_NAMESERVER_ADDR", nsAddr);
        consumerRunningInfo.getProperties().put("PROP_CONSUME_TYPE", mqConsumerInner.consumeType().name());
        consumerRunningInfo.getProperties().put("PROP_CLIENT_VERSION", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
        return consumerRunningInfo;
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.consumerStatsManager;
    }

    public NettyClientConfig getNettyClientConfig() {
        return this.nettyClientConfig;
    }

    public ClientConfig getClientConfig() {
        return this.clientConfig;
    }

    public ConcurrentMap<String, MQProducerInner> getProducerTable() {
        return this.producerTable;
    }

    public ConcurrentMap<String, MQConsumerInner> getConsumerTable() {
        return this.consumerTable;
    }
}

