package com.ohaotian.base.mq;

import com.ohaotian.base.cache.CacheService;
import com.ohaotian.base.mq.bo.MessageInfoBO;
import com.ohaotian.base.mq.bo.MqConstants;
import com.ohaotian.base.mq.exception.ResourceException;
import com.ohaotian.base.mq.interfce.MqCunsumer;
import com.ohaotian.base.util.SerializeUtils;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/plugin-mq-1.1-20180204.040337-1.jar:com/ohaotian/base/mq/NativeMqListener.class */
public class NativeMqListener extends Thread {
    private static final Logger log = LoggerFactory.getLogger(NativeMqListener.class);
    public static Map<String, List<MqCunsumer>> nativeTopics = null;
    public CacheService cacheService;

    public NativeMqListener(CacheService cacheService) {
        this.cacheService = cacheService;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            try {
                doBusi();
            } catch (Exception e) {
                log.error("本地消费者模式下出现异常了！请检查错误！", (Throwable) e);
            }
            try {
                Thread.sleep(600L);
            } catch (InterruptedException e2) {
                log.error("线程sleep异常！", (Throwable) e2);
            }
        }
    }

    private void doBusi() {
        for (String str : this.cacheService.getkeys("LOCALMQ_*")) {
            MessageInfoBO messageInfoBO = (MessageInfoBO) this.cacheService.get(str, MessageInfoBO.class);
            if (messageInfoBO != null && messageInfoBO.getConsumerStatus() == null) {
                messageInfoBO.setConsumerStatus(MqConstants.CONSUMER_STATUS_UPDATE);
                this.cacheService.put(str, messageInfoBO);
                String topic = messageInfoBO.getTopic();
                if (nativeTopics == null) {
                    nativeTopics = MqCunsumerSubscribe.nativeTopics;
                }
                List<MqCunsumer> list = nativeTopics.get(topic);
                if (list == null) {
                    messageInfoBO.setConsumerStatus(null);
                    this.cacheService.put(str, messageInfoBO);
                } else {
                    try {
                        try {
                            try {
                                for (MqCunsumer mqCunsumer : list) {
                                    if (messageInfoBO.getTag().equals(mqCunsumer.subscribe().getTag())) {
                                        mqCunsumer.execute(messageInfoBO.getTag(), SerializeUtils.deserialize(messageInfoBO.getBody()));
                                    }
                                }
                                this.cacheService.delete(str);
                            } catch (ResourceException e) {
                                log.error("业务执行失败！消息消费失败！请检查错误", (Throwable) e);
                                this.cacheService.delete(str);
                            }
                        } catch (Exception e2) {
                            log.error("系统异常!消息消费失败！请检查错误", (Throwable) e2);
                            this.cacheService.delete(str);
                        }
                    } catch (Throwable th) {
                        this.cacheService.delete(str);
                        throw th;
                    }
                }
            }
        }
    }
}
