package com.ohaotian.plugin.mq.proxy.ext.tonglkq;

import com.ohaotian.plugin.mq.proxy.MqRecordLog;
import com.ohaotian.plugin.mq.proxy.ProxyMessage;
import com.ohaotian.plugin.mq.proxy.impl.IProxyMessageConsumerWrapper;
import com.ohaotian.plugin.mq.proxy.status.ProxyConsumerStatus;
import java.util.Iterator;
import java.util.Set;
import javax.jms.JMSException;
import javax.jms.QueueReceiver;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ohaotian/plugin/mq/proxy/ext/tonglkq/TongLKQMqMessageListener.class */
public class TongLKQMqMessageListener implements TLQMessageListener, Runnable {
    private static final Logger log = LoggerFactory.getLogger(TongLKQMqMessageListener.class);
    private final Set<IProxyMessageConsumerWrapper> localConsumers;
    private MqRecordLog mqRecordLog;
    private final QueueReceiver queueReceiver;
    private final String subject;
    private volatile boolean running = true;

    public TongLKQMqMessageListener(String str, Set<IProxyMessageConsumerWrapper> set, MqRecordLog mqRecordLog, QueueReceiver queueReceiver) {
        this.mqRecordLog = mqRecordLog;
        this.localConsumers = set;
        this.queueReceiver = queueReceiver;
        this.subject = str;
    }

    @Override // com.ohaotian.plugin.mq.proxy.ext.tonglkq.TLQMessageListener
    public ProxyConsumerStatus onMessage(QueueReceiver queueReceiver, Set<IProxyMessageConsumerWrapper> set) {
        ProxyMessage proxyMessage = new ProxyMessage();
        try {
            TextMessage receive = queueReceiver.receive(6000L);
            if (receive != null && (receive instanceof TextMessage)) {
                proxyMessage = new ProxyMessage(this.subject, "*", receive.getText());
                Iterator<IProxyMessageConsumerWrapper> it = set.iterator();
                while (it.hasNext()) {
                    it.next().onMessage(proxyMessage);
                }
                proxyMessage.setStatus(2);
                this.mqRecordLog.thread(proxyMessage, this.mqRecordLog);
            }
            return null;
        } catch (JMSException e) {
            proxyMessage.setStatus(3);
            this.mqRecordLog.thread(proxyMessage, this.mqRecordLog);
            e.printStackTrace();
            this.running = Boolean.FALSE.booleanValue();
            return null;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                onMessage(this.queueReceiver, this.localConsumers);
            } catch (Exception e) {
                log.error("Unexpected error in message listener", e);
                this.running = Boolean.FALSE.booleanValue();
            }
        }
    }

    public String toString() {
        return "TongLKQMqMessageListener";
    }
}
