package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.QueryResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.Validators;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.PullCallback;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.PullResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.ConsumeMessageHook;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.FilterMessageHook;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.MixAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ServiceState;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.filter.FilterAPI;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.help.FAQUrl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.RPCHook;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.taobao.metaq.trace.core.common.MetaQTraceConstants;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:WEB-INF/lib/ons-client-1.2.7-ForEagleEye.jar:com/aliyun/openservices/shade/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.class */
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
    private final DefaultMQPullConsumer defaultMQPullConsumer;
    private final RPCHook rpcHook;
    private MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private OffsetStore offsetStore;
    private final Logger log = ClientLogger.getLog();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<>();
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);

    public DefaultMQPullConsumerImpl(DefaultMQPullConsumer defaultMQPullConsumer, RPCHook rPCHook) {
        this.defaultMQPullConsumer = defaultMQPullConsumer;
        this.rpcHook = rPCHook;
    }

    public void registerConsumeMessageHook(ConsumeMessageHook consumeMessageHook) {
        this.consumeMessageHookList.add(consumeMessageHook);
        this.log.info("register consumeMessageHook Hook, {}", consumeMessageHook.hookName());
    }

    public void createTopic(String str, String str2, int i) throws MQClientException {
        createTopic(str, str2, i, 0);
    }

    public void createTopic(String str, String str2, int i, int i2) throws MQClientException {
        makeSureStateOK();
        this.mQClientFactory.getMQAdminImpl().createTopic(str, str2, i, i2);
    }

    private void makeSureStateOK() throws MQClientException {
        if (this.serviceState != ServiceState.RUNNING) {
            throw new MQClientException("The consumer service state not OK, " + this.serviceState + FAQUrl.suggestTodo("http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&service_not_ok"), (Throwable) null);
        }
    }

    public long fetchConsumeOffset(MessageQueue messageQueue, boolean z) throws MQClientException {
        makeSureStateOK();
        return this.offsetStore.readOffset(messageQueue, z ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
    }

    public Set<MessageQueue> fetchMessageQueuesInBalance(String str) throws MQClientException {
        makeSureStateOK();
        if (null == str) {
            throw new IllegalArgumentException("topic is null");
        }
        ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
        HashSet hashSet = new HashSet();
        for (MessageQueue messageQueue : processQueueTable.keySet()) {
            if (messageQueue.getTopic().equals(str)) {
                hashSet.add(messageQueue);
            }
        }
        return hashSet;
    }

    public List<MessageQueue> fetchPublishMessageQueues(String str) throws MQClientException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(str);
    }

    public Set<MessageQueue> fetchSubscribeMessageQueues(String str) throws MQClientException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(str);
    }

    public long earliestMsgStoreTime(MessageQueue messageQueue) throws MQClientException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(messageQueue);
    }

    public long maxOffset(MessageQueue messageQueue) throws MQClientException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
    }

    public long minOffset(MessageQueue messageQueue) throws MQClientException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue);
    }

    public PullResult pull(MessageQueue messageQueue, String str, long j, int i) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return pull(messageQueue, str, j, i, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
    }

    public PullResult pull(MessageQueue messageQueue, String str, long j, int i, long j2) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return pullSyncImpl(messageQueue, str, j, i, false, j2);
    }

    private PullResult pullSyncImpl(MessageQueue messageQueue, String str, long j, int i, boolean z, long j2) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        makeSureStateOK();
        if (null == messageQueue) {
            throw new MQClientException("mq is null", (Throwable) null);
        }
        if (j < 0) {
            throw new MQClientException("offset < 0", (Throwable) null);
        }
        if (i <= 0) {
            throw new MQClientException("maxNums <= 0", (Throwable) null);
        }
        subscriptionAutomatically(messageQueue.getTopic());
        int buildSysFlag = PullSysFlag.buildSysFlag(false, z, true, false);
        try {
            SubscriptionData buildSubscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), messageQueue.getTopic(), str);
            PullResult pullKernelImpl = this.pullAPIWrapper.pullKernelImpl(messageQueue, buildSubscriptionData.getSubString(), 0L, j, i, buildSysFlag, 0L, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), z ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : j2, CommunicationMode.SYNC, null);
            this.pullAPIWrapper.processPullResult(messageQueue, pullKernelImpl, buildSubscriptionData);
            if (!this.consumeMessageHookList.isEmpty()) {
                ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setConsumerGroup(groupName());
                consumeMessageContext.setMq(messageQueue);
                consumeMessageContext.setMsgList(pullKernelImpl.getMsgFoundList());
                consumeMessageContext.setSuccess(false);
                executeHookBefore(consumeMessageContext);
                consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
                consumeMessageContext.setSuccess(true);
                executeHookAfter(consumeMessageContext);
            }
            return pullKernelImpl;
        } catch (Exception e) {
            throw new MQClientException("parse subscription error", e);
        }
    }

    private void subscriptionAutomatically(String str) {
        if (this.rebalanceImpl.getSubscriptionInner().containsKey(str)) {
            return;
        }
        try {
            this.rebalanceImpl.subscriptionInner.putIfAbsent(str, FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), str, "*"));
        } catch (Exception e) {
        }
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public String groupName() {
        return this.defaultMQPullConsumer.getConsumerGroup();
    }

    public void executeHookBefore(ConsumeMessageContext consumeMessageContext) {
        if (this.consumeMessageHookList.isEmpty()) {
            return;
        }
        Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
        while (it.hasNext()) {
            try {
                it.next().consumeMessageBefore(consumeMessageContext);
            } catch (Throwable th) {
            }
        }
    }

    public void executeHookAfter(ConsumeMessageContext consumeMessageContext) {
        if (this.consumeMessageHookList.isEmpty()) {
            return;
        }
        Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
        while (it.hasNext()) {
            try {
                it.next().consumeMessageAfter(consumeMessageContext);
            } catch (Throwable th) {
            }
        }
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public MessageModel messageModel() {
        return this.defaultMQPullConsumer.getMessageModel();
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_ACTIVELY;
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public ConsumeFromWhere consumeFromWhere() {
        return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public Set<SubscriptionData> subscriptions() {
        HashSet hashSet = new HashSet();
        Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
        if (registerTopics != null) {
            synchronized (registerTopics) {
                Iterator<String> it = registerTopics.iterator();
                while (it.hasNext()) {
                    SubscriptionData subscriptionData = null;
                    try {
                        subscriptionData = FilterAPI.buildSubscriptionData(groupName(), it.next(), "*");
                    } catch (Exception e) {
                        this.log.error("parse subscription error", (Throwable) e);
                    }
                    subscriptionData.setSubVersion(0L);
                    hashSet.add(subscriptionData);
                }
            }
        }
        return hashSet;
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            this.rebalanceImpl.doRebalance(false);
        }
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public void persistConsumerOffset() {
        try {
            makeSureStateOK();
            HashSet hashSet = new HashSet();
            Set<MessageQueue> keySet = this.rebalanceImpl.getProcessQueueTable().keySet();
            if (keySet != null) {
                hashSet.addAll(keySet);
            }
            this.offsetStore.persistAll(hashSet);
        } catch (Exception e) {
            this.log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", (Throwable) e);
        }
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public void updateTopicSubscribeInfo(String str, Set<MessageQueue> set) {
        ConcurrentHashMap<String, SubscriptionData> subscriptionInner = this.rebalanceImpl.getSubscriptionInner();
        if (subscriptionInner == null || !subscriptionInner.containsKey(str)) {
            return;
        }
        this.rebalanceImpl.getTopicSubscribeInfoTable().put(str, set);
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public boolean isSubscribeTopicNeedUpdate(String str) {
        ConcurrentHashMap<String, SubscriptionData> subscriptionInner = this.rebalanceImpl.getSubscriptionInner();
        return (subscriptionInner == null || !subscriptionInner.containsKey(str) || this.rebalanceImpl.topicSubscribeInfoTable.containsKey(str)) ? false : true;
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public boolean isUnitMode() {
        return this.defaultMQPullConsumer.isUnitMode();
    }

    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner
    public ConsumerRunningInfo consumerRunningInfo() {
        ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
        Properties object2Properties = MixAll.object2Properties(this.defaultMQPullConsumer);
        object2Properties.put("PROP_CONSUMER_START_TIMESTAMP", String.valueOf(this.consumerStartTimestamp));
        consumerRunningInfo.setProperties(object2Properties);
        consumerRunningInfo.getSubscriptionSet().addAll(subscriptions());
        return consumerRunningInfo;
    }

    public void pull(MessageQueue messageQueue, String str, long j, int i, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
        pull(messageQueue, str, j, i, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
    }

    public void pull(MessageQueue messageQueue, String str, long j, int i, PullCallback pullCallback, long j2) throws MQClientException, RemotingException, InterruptedException {
        pullAsyncImpl(messageQueue, str, j, i, pullCallback, false, j2);
    }

    private void pullAsyncImpl(final MessageQueue messageQueue, String str, long j, int i, final PullCallback pullCallback, boolean z, long j2) throws MQClientException, RemotingException, InterruptedException {
        makeSureStateOK();
        if (null == messageQueue) {
            throw new MQClientException("mq is null", (Throwable) null);
        }
        if (j < 0) {
            throw new MQClientException("offset < 0", (Throwable) null);
        }
        if (i <= 0) {
            throw new MQClientException("maxNums <= 0", (Throwable) null);
        }
        if (null == pullCallback) {
            throw new MQClientException("pullCallback is null", (Throwable) null);
        }
        subscriptionAutomatically(messageQueue.getTopic());
        try {
            int buildSysFlag = PullSysFlag.buildSysFlag(false, z, true, false);
            try {
                final SubscriptionData buildSubscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), messageQueue.getTopic(), str);
                this.pullAPIWrapper.pullKernelImpl(messageQueue, buildSubscriptionData.getSubString(), 0L, j, i, buildSysFlag, 0L, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), z ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : j2, CommunicationMode.ASYNC, new PullCallback() { // from class: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.1
                    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.PullCallback
                    public void onSuccess(PullResult pullResult) {
                        pullCallback.onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(messageQueue, pullResult, buildSubscriptionData));
                    }

                    @Override // com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.PullCallback
                    public void onException(Throwable th) {
                        pullCallback.onException(th);
                    }
                });
            } catch (Exception e) {
                throw new MQClientException("parse subscription error", e);
            }
        } catch (MQBrokerException e2) {
            throw new MQClientException("pullAsync unknow exception", e2);
        }
    }

    public PullResult pullBlockIfNotFound(MessageQueue messageQueue, String str, long j, int i) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return pullSyncImpl(messageQueue, str, j, i, true, getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
    }

    public DefaultMQPullConsumer getDefaultMQPullConsumer() {
        return this.defaultMQPullConsumer;
    }

    public void pullBlockIfNotFound(MessageQueue messageQueue, String str, long j, int i, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException {
        pullAsyncImpl(messageQueue, str, j, i, pullCallback, true, getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
    }

    public QueryResult queryMessage(String str, String str2, int i, long j, long j2) throws MQClientException, InterruptedException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().queryMessage(str, str2, i, j, j2);
    }

    public MessageExt queryMessageByUniqKey(String str, String str2) throws MQClientException, InterruptedException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(str, str2);
    }

    public long searchOffset(MessageQueue messageQueue, long j) throws MQClientException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().searchOffset(messageQueue, j);
    }

    public void sendMessageBack(MessageExt messageExt, int i, String str) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        sendMessageBack(messageExt, i, str, this.defaultMQPullConsumer.getConsumerGroup());
    }

    public void sendMessageBack(MessageExt messageExt, int i, String str, String str2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            String findBrokerAddressInPublish = null != str ? this.mQClientFactory.findBrokerAddressInPublish(str) : RemotingHelper.parseSocketAddressAddr(messageExt.getStoreHost());
            if (UtilAll.isBlank(str2)) {
                str2 = this.defaultMQPullConsumer.getConsumerGroup();
            }
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(findBrokerAddressInPublish, messageExt, str2, i, MetaQTraceConstants.DIAMOND_TIMEOUT, this.defaultMQPullConsumer.getMaxReconsumeTimes());
        } catch (Exception e) {
            this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), (Throwable) e);
            Message message = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), messageExt.getBody());
            String originMessageId = MessageAccessor.getOriginMessageId(messageExt);
            MessageAccessor.setOriginMessageId(message, UtilAll.isBlank(originMessageId) ? messageExt.getMsgId() : originMessageId);
            message.setFlag(messageExt.getFlag());
            MessageAccessor.setProperties(message, messageExt.getProperties());
            MessageAccessor.putProperty(message, "RETRY_TOPIC", messageExt.getTopic());
            MessageAccessor.setReconsumeTime(message, String.valueOf(messageExt.getReconsumeTimes() + 1));
            MessageAccessor.setMaxReconsumeTimes(message, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
            message.setDelayTimeLevel(3 + messageExt.getReconsumeTimes());
            this.mQClientFactory.getDefaultMQProducer().send(message);
        }
    }

    public void shutdown() {
        switch (this.serviceState) {
            case CREATE_JUST:
            case SHUTDOWN_ALREADY:
            default:
                return;
            case RUNNING:
                persistConsumerOffset();
                this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                this.log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                return;
        }
    }

    public void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                checkConfig();
                copySubscription();
                if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPullConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
                if (this.defaultMQPullConsumer.getOffsetStore() == null) {
                    switch (this.defaultMQPullConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                            break;
                    }
                } else {
                    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
                }
                this.offsetStore.load();
                if (!this.mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this)) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), (Throwable) null);
                }
                this.mQClientFactory.start();
                this.log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                return;
            case RUNNING:
            case SHUTDOWN_ALREADY:
            case START_FAILED:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&service_not_ok"), (Throwable) null);
            default:
                return;
        }
    }

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup());
        if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
            throw new MQClientException("consumerGroup is null" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), (Throwable) null);
        }
        if (this.defaultMQPullConsumer.getConsumerGroup().equals("DEFAULT_CONSUMER")) {
            throw new MQClientException("consumerGroup can not equal DEFAULT_CONSUMER, please specify another one." + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), (Throwable) null);
        }
        if (null == this.defaultMQPullConsumer.getMessageModel()) {
            throw new MQClientException("messageModel is null" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), (Throwable) null);
        }
        if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
            throw new MQClientException("allocateMessageQueueStrategy is null" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), (Throwable) null);
        }
        if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
            throw new MQClientException("Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), (Throwable) null);
        }
    }

    private void copySubscription() throws MQClientException {
        try {
            Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
            if (registerTopics != null) {
                for (String str : registerTopics) {
                    this.rebalanceImpl.getSubscriptionInner().put(str, FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), str, "*"));
                }
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    public void updateConsumeOffset(MessageQueue messageQueue, long j) throws MQClientException {
        makeSureStateOK();
        this.offsetStore.updateOffset(messageQueue, j, false);
    }

    public MessageExt viewMessage(String str) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        makeSureStateOK();
        return this.mQClientFactory.getMQAdminImpl().viewMessage(str);
    }

    public void registerFilterMessageHook(FilterMessageHook filterMessageHook) {
        this.filterMessageHookList.add(filterMessageHook);
        this.log.info("register FilterMessageHook Hook, {}", filterMessageHook.hookName());
    }

    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    public void setOffsetStore(OffsetStore offsetStore) {
        this.offsetStore = offsetStore;
    }

    public PullAPIWrapper getPullAPIWrapper() {
        return this.pullAPIWrapper;
    }

    public void setPullAPIWrapper(PullAPIWrapper pullAPIWrapper) {
        this.pullAPIWrapper = pullAPIWrapper;
    }

    public ServiceState getServiceState() {
        return this.serviceState;
    }

    public void setServiceState(ServiceState serviceState) {
        this.serviceState = serviceState;
    }

    public long getConsumerStartTimestamp() {
        return this.consumerStartTimestamp;
    }

    public RebalanceImpl getRebalanceImpl() {
        return this.rebalanceImpl;
    }
}
