/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.plugin.mq.proxy.ext.tonglkq;

import com.ohaotian.plugin.mq.proxy.MqRecordLog;
import com.ohaotian.plugin.mq.proxy.ProxyMessageType;
import com.ohaotian.plugin.mq.proxy.config.ApolloConfigVO;
import com.ohaotian.plugin.mq.proxy.constants.Strategy;
import com.ohaotian.plugin.mq.proxy.ext.tonglkq.TongLKQMqMessageListener;
import com.ohaotian.plugin.mq.proxy.impl.ConsumerRegisterInfo;
import com.ohaotian.plugin.mq.proxy.impl.MQRegister;
import com.tongtech.tmqi.QueueConnectionFactory;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.env.Environment;

public class TongLKQMessageConsumerRegister
implements MQRegister,
ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(TongLKQMessageConsumerRegister.class);
    private MqRecordLog mqRecordLog;
    private Environment environment;
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    private Queue queue;
    private QueueReceiver queueReceiver;
    private String subject;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private static final String MQ_ENDPOINT_PROPERTY = "mq.endpoint";
    private static final String CONSUMER_THREAD_NAME_PREFIX = "tlq-consumer-";
    private ConsumerRegisterInfo consumerInfo;
    private final ExecutorService listenerExecutor = Executors.newCachedThreadPool(r -> new Thread(r, CONSUMER_THREAD_NAME_PREFIX + UUID.randomUUID()));

    @Override
    public void register(ConsumerRegisterInfo consumerInfo, ApolloConfigVO apolloConfigVO) {
        this.consumerInfo = consumerInfo;
        this.validateDependenciesInitialized();
        this.subject = consumerInfo.getSubject();
        this.validateMessageTypes(consumerInfo);
        try {
            this.initializeMqConnection();
            log.info("Registered TLQ consumer for subject: {}", (Object)this.subject);
        }
        catch (JMSException e) {
            log.error("Failed to initialize TLQ connection", (Throwable)e);
            e.printStackTrace();
        }
    }

    @Override
    public boolean support(String strategy) {
        return TongLKQMessageConsumerRegister.strategySupported(strategy);
    }

    public static boolean strategySupported(String strategy) {
        return Strategy.isTongLKQ(strategy);
    }

    @Override
    public void startup() {
        if (!this.started.compareAndSet(false, true)) {
            return;
        }
        try {
            CompletionStage runFuture = CompletableFuture.runAsync(() -> {
                while (true) {
                    log.info("TLQ consumer started for subject: {}", (Object)this.subject);
                    TongLKQMqMessageListener listener = new TongLKQMqMessageListener(this.subject, this.consumerInfo.getConsumerWrappers(), this.mqRecordLog, this.queueReceiver);
                    listener.run();
                    log.warn("TLQ consumer exceptionally for subject: {} wait 60s", (Object)this.subject);
                    Boolean initCheck = Boolean.FALSE;
                    int count = 0;
                    do {
                        try {
                            TimeUnit.MILLISECONDS.sleep(60000L);
                            this.initializeMqConnection();
                            initCheck = Boolean.TRUE;
                            log.info("Re-Registered TLQ consumer for subject: {} count: {}", (Object)this.subject, (Object)(++count));
                        }
                        catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        catch (JMSException e) {
                            log.error("Failed to initialize TLQ connection. count: {}", (Object)count, (Object)e);
                        }
                    } while (Boolean.FALSE.equals(initCheck));
                }
            }, this.listenerExecutor).exceptionally(e -> {
                log.info("TLQ consumer subject: {} exceptionally: {}", (Object)this.subject, (Object)e.getMessage());
                return null;
            });
            ((CompletableFuture)runFuture).whenComplete((result, e) -> log.info("TLQ consumer subject: {} Release!", (Object)this.subject));
            log.info("TLQ consumer started successfully for subject: {}", (Object)this.subject);
        }
        catch (Exception e2) {
            this.started.set(false);
            throw new RuntimeException("TLQ connection initialization failed", e2);
        }
    }

    private void initializeMqConnection() throws JMSException {
        String tlqUrl = this.environment.getProperty(MQ_ENDPOINT_PROPERTY);
        if (tlqUrl == null || tlqUrl.isEmpty()) {
            throw new IllegalStateException("MQ endpoint configuration is missing");
        }
        QueueConnectionFactory queueConnectionFactory = new QueueConnectionFactory();
        queueConnectionFactory.setProperty("tmqiAddressList", tlqUrl);
        this.queueConnection = queueConnectionFactory.createQueueConnection();
        this.queueSession = this.queueConnection.createQueueSession(false, 1);
        this.queue = this.queueSession.createQueue(this.subject);
        this.queueReceiver = this.queueSession.createReceiver(this.queue);
        this.queueConnection.start();
    }

    @Override
    public void shutdown() {
        if (!this.started.compareAndSet(true, false)) {
            return;
        }
        this.shutdownExecutorGracefully();
        this.closeMqResources();
        log.info("TLQ consumer stopped successfully for subject: {}", (Object)this.subject);
    }

    private void shutdownExecutorGracefully() {
        this.listenerExecutor.shutdown();
        try {
            if (!this.listenerExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                log.warn("TLQ consumer threads did not terminate gracefully");
            }
        }
        catch (InterruptedException e) {
            log.warn("Interrupted during thread pool shutdown", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    private void closeMqResources() {
        if (this.queueReceiver != null) {
            try {
                this.queueReceiver.close();
            }
            catch (Exception e) {
                log.error("Error closing", (Object)e.getMessage());
            }
        }
        if (this.queueSession != null) {
            try {
                this.queueSession.close();
            }
            catch (Exception e) {
                log.error("Error closing", (Object)e.getMessage());
            }
        }
        if (this.queueConnection != null) {
            try {
                this.queueConnection.close();
            }
            catch (Exception e) {
                log.error("Error closing", (Object)e.getMessage());
            }
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.mqRecordLog = (MqRecordLog)applicationContext.getBean(MqRecordLog.class);
        this.environment = applicationContext.getEnvironment();
    }

    private void validateDependenciesInitialized() {
        if (this.mqRecordLog == null || this.environment == null) {
            throw new IllegalStateException("Dependencies not initialized through ApplicationContext");
        }
    }

    private void validateMessageTypes(ConsumerRegisterInfo consumerInfo) {
        for (ProxyMessageType messageType : consumerInfo.getMessageTypes()) {
            if (this.supportConcurrently(messageType)) continue;
            throw new UnsupportedOperationException("Unsupported messageType[" + (Object)((Object)messageType) + "] for subject[" + consumerInfo.getSubject() + "]");
        }
    }

    private boolean supportConcurrently(ProxyMessageType messageType) {
        return messageType == ProxyMessageType.ASYNCHRONOUS || messageType == ProxyMessageType.SYNCHRONIZATION || messageType == ProxyMessageType.ONEWAY || messageType == ProxyMessageType.TRANSACTION;
    }
}

