package com.ohaotian.plugin.mq.proxy.ext.tonglkq;

import com.ohaotian.plugin.mq.proxy.MqRecordLog;
import com.ohaotian.plugin.mq.proxy.ProxyMessageType;
import com.ohaotian.plugin.mq.proxy.config.ApolloConfigVO;
import com.ohaotian.plugin.mq.proxy.constants.Strategy;
import com.ohaotian.plugin.mq.proxy.impl.ConsumerRegisterInfo;
import com.ohaotian.plugin.mq.proxy.impl.MQRegister;
import com.tongtech.tmqi.QueueConnectionFactory;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;

/* loaded from: input_file:com/ohaotian/plugin/mq/proxy/ext/tonglkq/TongLKQMessageConsumerRegister.class */
public class TongLKQMessageConsumerRegister implements MQRegister, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(TongLKQMessageConsumerRegister.class);
    private MqRecordLog mqRecordLog;
    private Environment environment;
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private Queue queue;
    private QueueReceiver queueReceiver;
    private String subject;
    private static final String MQ_ENDPOINT_PROPERTY = "mq.endpoint";
    private static final String CONSUMER_THREAD_NAME_PREFIX = "tlq-consumer-";
    private ConsumerRegisterInfo consumerInfo;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final ExecutorService listenerExecutor = Executors.newCachedThreadPool(runnable -> {
        return new Thread(runnable, CONSUMER_THREAD_NAME_PREFIX + UUID.randomUUID());
    });

    @Override // com.ohaotian.plugin.mq.proxy.impl.MQRegister
    public void register(ConsumerRegisterInfo consumerRegisterInfo, ApolloConfigVO apolloConfigVO) {
        this.consumerInfo = consumerRegisterInfo;
        validateDependenciesInitialized();
        this.subject = consumerRegisterInfo.getSubject();
        validateMessageTypes(consumerRegisterInfo);
        try {
            initializeMqConnection();
            log.info("Registered TLQ consumer for subject: {}", this.subject);
        } catch (JMSException e) {
            log.error("Failed to initialize TLQ connection", e);
            e.printStackTrace();
        }
    }

    @Override // com.ohaotian.plugin.mq.proxy.impl.MQRegister
    public boolean support(String str) {
        return strategySupported(str);
    }

    public static boolean strategySupported(String str) {
        return Strategy.isTongLKQ(str);
    }

    @Override // com.ohaotian.plugin.mq.proxy.impl.MQRegister
    public void startup() {
        if (this.started.compareAndSet(false, true)) {
            try {
                CompletableFuture.runAsync(() -> {
                    while (true) {
                        log.info("TLQ consumer started for subject: {}", this.subject);
                        new TongLKQMqMessageListener(this.subject, this.consumerInfo.getConsumerWrappers(), this.mqRecordLog, this.queueReceiver).run();
                        log.warn("TLQ consumer exceptionally for subject: {} wait 60s", this.subject);
                        Boolean bool = Boolean.FALSE;
                        int i = 0;
                        do {
                            try {
                                i++;
                                TimeUnit.MILLISECONDS.sleep(60000L);
                                initializeMqConnection();
                                bool = Boolean.TRUE;
                                log.info("Re-Registered TLQ consumer for subject: {} count: {}", this.subject, Integer.valueOf(i));
                            } catch (JMSException e) {
                                log.error("Failed to initialize TLQ connection. count: {}", Integer.valueOf(i), e);
                            } catch (InterruptedException e2) {
                                throw new RuntimeException(e2);
                            }
                        } while (Boolean.FALSE.equals(bool));
                    }
                }, this.listenerExecutor).exceptionally(th -> {
                    log.info("TLQ consumer subject: {} exceptionally: {}", this.subject, th.getMessage());
                    return null;
                }).whenComplete((r5, th2) -> {
                    log.info("TLQ consumer subject: {} Release!", this.subject);
                });
                log.info("TLQ consumer started successfully for subject: {}", this.subject);
            } catch (Exception e) {
                this.started.set(false);
                throw new RuntimeException("TLQ connection initialization failed", e);
            }
        }
    }

    private void initializeMqConnection() throws JMSException {
        String property = this.environment.getProperty(MQ_ENDPOINT_PROPERTY);
        if (property == null || property.isEmpty()) {
            throw new IllegalStateException("MQ endpoint configuration is missing");
        }
        QueueConnectionFactory queueConnectionFactory = new QueueConnectionFactory();
        queueConnectionFactory.setProperty("tmqiAddressList", property);
        this.queueConnection = queueConnectionFactory.createQueueConnection();
        this.queueSession = this.queueConnection.createQueueSession(false, 1);
        this.queue = this.queueSession.createQueue(this.subject);
        this.queueReceiver = this.queueSession.createReceiver(this.queue);
        this.queueConnection.start();
    }

    @Override // com.ohaotian.plugin.mq.proxy.impl.MQRegister
    public void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            shutdownExecutorGracefully();
            closeMqResources();
            log.info("TLQ consumer stopped successfully for subject: {}", this.subject);
        }
    }

    private void shutdownExecutorGracefully() {
        this.listenerExecutor.shutdown();
        try {
            if (!this.listenerExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                log.warn("TLQ consumer threads did not terminate gracefully");
            }
        } catch (InterruptedException e) {
            log.warn("Interrupted during thread pool shutdown", e);
            Thread.currentThread().interrupt();
        }
    }

    private void closeMqResources() {
        if (this.queueReceiver != null) {
            try {
                this.queueReceiver.close();
            } catch (Exception e) {
                log.error("Error closing", e.getMessage());
            }
        }
        if (this.queueSession != null) {
            try {
                this.queueSession.close();
            } catch (Exception e2) {
                log.error("Error closing", e2.getMessage());
            }
        }
        if (this.queueConnection != null) {
            try {
                this.queueConnection.close();
            } catch (Exception e3) {
                log.error("Error closing", e3.getMessage());
            }
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.mqRecordLog = (MqRecordLog) applicationContext.getBean(MqRecordLog.class);
        this.environment = applicationContext.getEnvironment();
    }

    private void validateDependenciesInitialized() {
        if (this.mqRecordLog == null || this.environment == null) {
            throw new IllegalStateException("Dependencies not initialized through ApplicationContext");
        }
    }

    private void validateMessageTypes(ConsumerRegisterInfo consumerRegisterInfo) {
        for (ProxyMessageType proxyMessageType : consumerRegisterInfo.getMessageTypes()) {
            if (!supportConcurrently(proxyMessageType)) {
                throw new UnsupportedOperationException("Unsupported messageType[" + proxyMessageType + "] for subject[" + consumerRegisterInfo.getSubject() + "]");
            }
        }
    }

    private boolean supportConcurrently(ProxyMessageType proxyMessageType) {
        return proxyMessageType == ProxyMessageType.ASYNCHRONOUS || proxyMessageType == ProxyMessageType.SYNCHRONIZATION || proxyMessageType == ProxyMessageType.ONEWAY || proxyMessageType == ProxyMessageType.TRANSACTION;
    }
}
