/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.client;

import com.alipay.sofa.sofamq.client.ClientLoggerUtil;
import com.alipay.sofa.sofamq.client.LdcSubMode;
import com.alipay.sofa.sofamq.client.MQClientRPCHook;
import com.alipay.sofa.sofamq.client.SofaMQClientAbstract;
import com.alipay.sofa.sofamq.client.function.Predicate;
import com.alipay.sofa.sofamq.client.rebalance.LocalFirstAllocateMessageQueueStrategy;
import com.alipay.sofa.sofamq.client.trace.common.TraceDispatcherType;
import com.alipay.sofa.sofamq.client.trace.dispatch.NameServerAddressSetter;
import com.alipay.sofa.sofamq.client.trace.dispatch.impl.AsyncArrayDispatcher;
import com.alipay.sofa.sofamq.client.trace.hook.ConsumeMessageHookImpl;
import com.alipay.sofa.sofamq.client.trace.hook.DevGroupConsumeMessageHook;
import com.alipay.sofa.sofamq.client.util.DevGroupUtils;
import com.alipay.sofa.sofamq.com.shade.alibaba.fastjson.JSON;
import com.alipay.sofa.sofamq.com.shade.alibaba.fastjson.TypeReference;
import com.alipay.sofa.sofamq.com.shade.alibaba.fastjson.parser.Feature;
import com.alipay.sofa.sofamq.org.shade.apache.commons.lang3.StringUtils;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.exception.MQClientException;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.hook.FilterMessageContext;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.hook.FilterMessageHook;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.MQClientAPIImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.UtilAll;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageExt;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.logging.InternalLogger;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.remoting.exception.RemotingException;
import io.openmessaging.api.ConsumerBase;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.exception.OMSRuntimeException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class SofaMQConsumerAbstract
extends SofaMQClientAbstract
implements ConsumerBase {
    protected static final InternalLogger LOGGER = ClientLoggerUtil.getClientLogger();
    protected final DefaultMQPushConsumer defaultMQPushConsumer;
    protected String group;
    private static final int MAX_CACHED_MESSAGE_SIZE_IN_MIB = 2048;
    private static final int MIN_CACHED_MESSAGE_SIZE_IN_MIB = 16;
    private static final int MAX_CACHED_MESSAGE_AMOUNT = 50000;
    private static final int MIN_CACHED_MESSAGE_AMOUNT = 100;
    private int maxCachedMessageSizeInMiB = 512;
    private int maxCachedMessageAmount = 5000;
    private Predicate<MessageExt> predicate = null;
    private Predicate<MessageExt> devGroupPredicate = null;
    private Predicate<MessageExt> siteMessagePredicate = null;
    private LdcSubMode ldcSubMode = LdcSubMode.DEFAULT;

    public SofaMQConsumerAbstract(Properties properties) {
        super(properties);
        String configuredCachedMessageAmount;
        String consumeTimeout;
        String maxBatchMessageCount;
        String consumerGroup = properties.getProperty("groupId", properties.getProperty("groupId"));
        if (StringUtils.isEmpty(consumerGroup)) {
            throw new OMSRuntimeException("ConsumerId property is null");
        }
        this.group = consumerGroup;
        this.defaultMQPushConsumer = new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new MQClientRPCHook(this.sessionCredentials));
        this.defaultMQPushConsumer.setAllocateMessageQueueStrategy(new LocalFirstAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle()));
        String maxReconsumeTimes = properties.getProperty("maxRetryTimes");
        if (!UtilAll.isBlank(maxReconsumeTimes)) {
            try {
                this.defaultMQPushConsumer.setMaxReconsumeTimes(Integer.parseInt(maxReconsumeTimes));
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
        }
        if (!UtilAll.isBlank(maxBatchMessageCount = properties.getProperty("maxBatchMessageCount"))) {
            this.defaultMQPushConsumer.setPullBatchSize(Integer.valueOf(maxBatchMessageCount));
        }
        if (!UtilAll.isBlank(consumeTimeout = properties.getProperty("consumeTimeout"))) {
            try {
                this.defaultMQPushConsumer.setConsumeTimeout(Integer.parseInt(consumeTimeout));
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
        }
        boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty("isVipChannelEnabled", "false"));
        this.defaultMQPushConsumer.setVipChannelEnabled(isVipChannelEnabled);
        String instanceName = this.buildInstanceName();
        this.defaultMQPushConsumer.setInstanceName(instanceName);
        this.defaultMQPushConsumer.setNamesrvAddr(this.getNameServerAddr());
        String consumeThreadNums = properties.getProperty("consumeThreadNums");
        if (!UtilAll.isBlank(consumeThreadNums)) {
            this.defaultMQPushConsumer.setConsumeThreadMin(Integer.valueOf(consumeThreadNums));
            this.defaultMQPushConsumer.setConsumeThreadMax(Integer.valueOf(consumeThreadNums));
        }
        if (!UtilAll.isBlank(configuredCachedMessageAmount = properties.getProperty("maxCachedMessageAmount"))) {
            this.maxCachedMessageAmount = Math.min(50000, Integer.valueOf(configuredCachedMessageAmount));
            this.maxCachedMessageAmount = Math.max(100, this.maxCachedMessageAmount);
        }
        this.defaultMQPushConsumer.setPullThresholdForTopic(this.maxCachedMessageAmount);
        String configuredCachedMessageSizeInMiB = properties.getProperty("maxCachedMessageSizeInMiB");
        if (!UtilAll.isBlank(configuredCachedMessageSizeInMiB)) {
            this.maxCachedMessageSizeInMiB = Math.min(2048, Integer.valueOf(configuredCachedMessageSizeInMiB));
            this.maxCachedMessageSizeInMiB = Math.max(16, this.maxCachedMessageSizeInMiB);
        }
        this.defaultMQPushConsumer.setPullThresholdSizeForTopic(this.maxCachedMessageSizeInMiB);
        this.defaultMQPushConsumer.registerFilterMessageHook(new FilterMessageHookImpl());
        String msgTraceSwitch = properties.getProperty("msgTraceSwitch");
        if (!UtilAll.isBlank(msgTraceSwitch) && !Boolean.parseBoolean(msgTraceSwitch)) {
            LOGGER.info("MQ Client Disable the Trace Hook!");
        } else {
            try {
                Properties tempProperties = new Properties();
                tempProperties.put("ACCESS_KEY", this.sessionCredentials.getAccessKey());
                tempProperties.put("SECRET_KEY", this.sessionCredentials.getSecretKey());
                tempProperties.put("MAX_MSG_SIZE", "128000");
                tempProperties.put("ASYNC_BUFFER_SIZE", "2048");
                tempProperties.put("MAX_BATCH_NUM", "100");
                tempProperties.put("INSTANCE_NAME", "PID_CLIENT_INNER_TRACE_PRODUCER" + (StringUtils.isBlank(this.cell) ? "" : "_" + this.cell));
                tempProperties.put("DispatcherType", TraceDispatcherType.CONSUMER.name());
                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, this.sessionCredentials, new NameServerAddressSetter(){

                    @Override
                    public String getNewNameServerAddress() {
                        return SofaMQConsumerAbstract.this.getNameServerAddr();
                    }
                });
                dispatcher.setHostConsumer(this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
                this.traceDispatcher = dispatcher;
                this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHookImpl(this.traceDispatcher));
                if (DevGroupUtils.inStableEnvironment()) {
                    this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new DevGroupConsumeMessageHook());
                }
            }
            catch (Throwable e) {
                LOGGER.error("system mqtrace hook init failed ,maybe can't send message trace data", e);
            }
        }
        if (this.ldc) {
            this.ldcSubMode = LdcSubMode.valueOf(properties.getProperty("ldcSubMode", LdcSubMode.DEFAULT.toString()));
            this.initLdcPredicate();
        }
        this.initSiteMessagePredicate();
    }

    protected void registerDevGroupPredicate(Predicate<MessageExt> predicate) {
        this.devGroupPredicate = predicate;
    }

    private void initSiteMessagePredicate() {
        this.siteMessagePredicate = new Predicate<MessageExt>(){

            @Override
            public boolean test(MessageExt messageExt) {
                String siteMessageMark = messageExt.getUserProperty("__ROUTER.SITE_MSG");
                return StringUtils.isBlank(siteMessageMark) || !StringUtils.equalsIgnoreCase(siteMessageMark, "true");
            }
        };
    }

    protected void switchSiteMessagePredicate(boolean enableSiteMessage) {
        if (!enableSiteMessage) {
            this.initSiteMessagePredicate();
            return;
        }
        this.siteMessagePredicate = new Predicate<MessageExt>(){

            @Override
            public boolean test(MessageExt messageExt) {
                String siteMessageMark = messageExt.getUserProperty("__ROUTER.SITE_MSG");
                return StringUtils.isNotBlank(siteMessageMark) && StringUtils.equalsIgnoreCase(siteMessageMark, "true");
            }
        };
    }

    protected void initLdcPredicate() {
        if (this.ldcSubMode == LdcSubMode.DEFAULT) {
            this.predicate = new Predicate<MessageExt>(){

                @Override
                public boolean test(MessageExt messageExt) {
                    return true;
                }
            };
            return;
        }
        if (this.ldcSubMode == LdcSubMode.LOCAL) {
            this.predicate = new Predicate<MessageExt>(){

                @Override
                public boolean test(MessageExt messageExt) {
                    String originCell = messageExt.getProperty("__UNIT.ORICELL");
                    return originCell == null || SofaMQConsumerAbstract.this.cell.equals(originCell);
                }
            };
            return;
        }
        if (this.ldcSubMode == LdcSubMode.GZONE) {
            this.predicate = new Predicate<MessageExt>(){

                @Override
                public boolean test(MessageExt messageExt) {
                    String targetG = messageExt.getProperty("__UNIT.TARGET_G");
                    return targetG == null || SofaMQConsumerAbstract.this.cell.equals(targetG);
                }
            };
            return;
        }
        if (this.ldcSubMode == LdcSubMode.RZONE) {
            this.predicate = new Predicate<MessageExt>(){

                @Override
                public boolean test(MessageExt messageExt) {
                    String targetR = messageExt.getProperty("__UNIT.TARGET_R");
                    return targetR == null || SofaMQConsumerAbstract.this.cell.equals(targetR);
                }
            };
        }
        if (this.ldcSubMode == LdcSubMode.CZONE) {
            this.predicate = new Predicate<MessageExt>(){

                @Override
                public boolean test(MessageExt messageExt) {
                    String targetC = messageExt.getProperty("__UNIT.TARGET_C");
                    return targetC == null || SofaMQConsumerAbstract.this.cell.equals(targetC);
                }
            };
        }
    }

    @Override
    protected void updateNameServerAddr(String newAddrs) {
        this.defaultMQPushConsumer.setNamesrvAddr(newAddrs);
        this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl().updateNameServerAddressList(newAddrs);
    }

    protected void subscribe(String topic, String subExpression) {
        LOGGER.info("consumer[{}] subscribe (topic[{}],subExpression[{}])", this.group, topic, subExpression);
        try {
            this.defaultMQPushConsumer.subscribe(topic, subExpression);
        }
        catch (MQClientException e) {
            throw new OMSRuntimeException("defaultMQPushConsumer subscribe exception", (Throwable)e);
        }
    }

    protected void subscribe(String topic, MessageSelector selector) {
        LOGGER.info("consumer[{}] subscribe (topic[{}],selector[{}])", this.group, topic, JSON.toJSONString(selector));
        try {
            this.defaultMQPushConsumer.subscribe(topic, this.toMessageSelector(selector));
        }
        catch (MQClientException e) {
            throw new OMSRuntimeException("Consumer subscribe exception", (Throwable)e);
        }
    }

    @Override
    public void unsubscribe(String topic) {
        LOGGER.info("consumer[{}] unsubscribe (topic[{}])", (Object)this.group, (Object)topic);
        this.defaultMQPushConsumer.unsubscribe(topic);
    }

    @Override
    public void start() {
        try {
            if (this.started.compareAndSet(false, true)) {
                if (this.needStart()) {
                    this.defaultMQPushConsumer.start();
                    super.start();
                    LOGGER.info("start consumer[{}] with [props:{}]", (Object)this.group, (Object)this.properties);
                } else {
                    LOGGER.info("Skip start consumer[{}] cell[{}] ldcSubMode[{}]", new Object[]{this.group, this.cell, this.ldcSubMode});
                }
            }
        }
        catch (Exception e) {
            String errorMsg = String.format("start consumer[%s] with [props:%s] fail, errorMsg=%s", this.group, this.properties, e.getMessage());
            LOGGER.error(errorMsg, e);
            throw new OMSRuntimeException(errorMsg, (Throwable)e);
        }
    }

    @Override
    public void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            this.defaultMQPushConsumer.shutdown();
            LOGGER.info("shutdown consumer[{}] with [props:{}]", (Object)this.group, (Object)this.properties);
        }
        super.shutdown();
    }

    protected boolean needStart() {
        if (!this.ldc) {
            return true;
        }
        switch (this.ldcSubMode) {
            case DEFAULT: 
            case LOCAL: {
                return true;
            }
            case RZONE: {
                return this.cell.startsWith("RZ");
            }
            case GZONE: {
                return this.cell.startsWith("GZ");
            }
            case CZONE: {
                return this.cell.startsWith("CZ");
            }
        }
        throw new OMSRuntimeException("Unknown ldcSubMode " + (Object)((Object)this.ldcSubMode));
    }

    protected boolean filter(MessageExt message) {
        boolean siteMessageFilterRes;
        boolean devGroupFilterRes;
        if (this.devGroupPredicate != null && !(devGroupFilterRes = this.devGroupPredicate.test(message))) {
            return false;
        }
        if (this.siteMessagePredicate != null && !(siteMessageFilterRes = this.siteMessagePredicate.test(message))) {
            return false;
        }
        return !this.ldc || this.predicate.test(message);
    }

    protected void checkSubscribeParams(String topic, Object listener) {
        if (null == topic) {
            throw new OMSRuntimeException("topic is null");
        }
        if (null == listener) {
            throw new OMSRuntimeException("listener is null");
        }
    }

    protected void checkSubscribeCompatibility(String topic, Class clazz) {
        if (!this.schemaValidator.validateReadSchema(topic, clazz)) {
            throw new OMSRuntimeException(String.format("%s is not compatible with schema registered of topic: %s", clazz.getName(), topic));
        }
    }

    protected Map<String, String> getElasticCells() throws RemotingException, MQClientException, InterruptedException {
        MQClientAPIImpl mqClientAPI = this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl();
        try {
            String elasticCellsStr = mqClientAPI.getKVConfigValue("SOFAMQ_LDC_ELASTIC", this.cell, 3000L);
            if (StringUtils.isBlank(elasticCellsStr)) {
                return null;
            }
            Map<String, String> elasticCells = elasticCellsStr.equals("NOOP") ? Collections.emptyMap() : JSON.parseObject(elasticCellsStr, new TypeReference<Map<String, String>>(){}, new Feature[0]);
            return elasticCells;
        }
        catch (MQClientException ex) {
            if (ex.getResponseCode() == 22) {
                return null;
            }
            throw ex;
        }
    }

    protected Map<String, String> getSiteDestZones(boolean needConsumeFromProd) throws MQClientException {
        try {
            MQClientAPIImpl mqClientAPI = this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQClientAPIImpl();
            String siteDestZonesStr = needConsumeFromProd ? mqClientAPI.getKVConfigValue("SOFAMQ_SITE_DEST_ZONES_INCLUDE_PROD", this.cell, 3000L) : mqClientAPI.getKVConfigValue("SOFAMQ_SITE_DEST_ZONES_CURRENT_ENV", this.cell, 3000L);
            return JSON.parseObject(siteDestZonesStr, new TypeReference<Map<String, String>>(){}, new Feature[0]);
        }
        catch (MQClientException ex) {
            if (ex.getResponseCode() == 22) {
                return null;
            }
            throw ex;
        }
        catch (Exception ex) {
            throw new MQClientException("Get site destination zones catch exception", ex);
        }
    }

    class FilterMessageHookImpl
    implements FilterMessageHook {
        FilterMessageHookImpl() {
        }

        @Override
        public String hookName() {
            return "SOFAMQ_FILTER";
        }

        @Override
        public void filterMessage(FilterMessageContext context) {
            List<MessageExt> msgs = context.getMsgList();
            if (msgs == null || msgs.size() == 0) {
                return;
            }
            Iterator<MessageExt> it = msgs.iterator();
            while (it.hasNext()) {
                MessageExt msg = it.next();
                if (SofaMQConsumerAbstract.this.filter(msg)) continue;
                it.remove();
            }
        }
    }
}

