/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.base.mq;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.ohaotian.base.cache.CacheService;
import com.ohaotian.base.mq.bo.MessageInfoBO;
import com.ohaotian.base.mq.bo.MqConstants;
import com.ohaotian.base.mq.bo.MqSubScribeSingleBO;
import com.ohaotian.base.mq.interfce.MqCunsumer;
import com.ohaotian.base.mq.util.LogUtils;
import com.ohaotian.base.util.SerializeUtils;
import java.util.Date;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqMessageListener
implements MessageListener {
    private static final Logger log = LoggerFactory.getLogger(MqMessageListener.class);
    private final boolean isDebugEnabled = log.isDebugEnabled();
    public static final ThreadLocal<String> traceIds = new ThreadLocal();
    private CacheService cacheService;
    private String subExpression;
    private List<MqCunsumer> mqcList;

    public MqMessageListener(CacheService cacheService, List<MqCunsumer> mqcList, String subExpression) {
        this.cacheService = cacheService;
        this.mqcList = mqcList;
        this.subExpression = subExpression;
    }

    public void setCacheService(CacheService cacheService) {
        this.cacheService = cacheService;
    }

    public void setTag(String subExpression) {
        this.subExpression = subExpression;
    }

    public Action consume(Message msg, ConsumeContext context) {
        if (this.isDebugEnabled) {
            log.info("msgid:[" + msg.getMsgID() + "]topic[" + msg.getTopic() + "]tag[" + msg.getTag() + "]key[" + msg.getKey() + "]");
        }
        String msgId = msg.getMsgID();
        String topic = msg.getTopic();
        String tag = msg.getTag();
        String key = msg.getKey();
        byte[] body = msg.getBody();
        Action ret = null;
        String[] infos = null;
        try {
            Object msginfo = SerializeUtils.deserialize((byte[])body);
            for (MqCunsumer mqc : this.mqcList) {
                MqSubScribeSingleBO bo = mqc.subscribe();
                if ("*".equals(bo.getTag())) {
                    infos = LogUtils.printMQConsumerStartLog(msg, mqc, msginfo);
                    mqc.execute(tag, msginfo);
                    ret = Action.CommitMessage;
                    continue;
                }
                if (!tag.equals(bo.getTag())) continue;
                infos = LogUtils.printMQConsumerStartLog(msg, mqc, msginfo);
                mqc.execute(tag, msginfo);
                ret = Action.CommitMessage;
            }
            LogUtils.printMQConsumerEndLog(infos);
        }
        catch (Exception e) {
            log.error("\u6d88\u8d39\u5931\u8d25\uff01msgId =" + msgId + "\u6d88\u606f\u7684\u4e3b\u9898\u662f" + topic + "\u6d88\u606f\u7684tag=" + tag, (Throwable)e);
            ret = Action.ReconsumeLater;
            LogUtils.printMQConsumerErrorLog(infos, e);
        }
        return ret;
    }

    private void updateMessageInfoBO(MessageInfoBO msgInfo, Integer consumerStatus) {
        msgInfo.setConsumerTime(new Date());
        msgInfo.setConsumerStatus(consumerStatus);
    }

    private void insertMessageInfoBO(MessageInfoBO msgInfo) {
        msgInfo.setConsumerTime(new Date());
        msgInfo.setConsumerStatus(MqConstants.CONSUMER_STATUS_UPDATE);
    }

    public List<MqCunsumer> getMqcList() {
        return this.mqcList;
    }

    public void setMqcList(List<MqCunsumer> mqcList) {
        this.mqcList = mqcList;
    }
}

