package com.haotian.plugin.proxy.mq.ext.rocketmq;

import com.haotian.plugin.proxy.mq.CacheStore;
import com.haotian.plugin.proxy.mq.ProxyExceptionContext;
import com.haotian.plugin.proxy.mq.ProxyMessage;
import com.haotian.plugin.proxy.mq.ProxyMessageType;
import com.haotian.plugin.proxy.mq.ProxySendResult;
import com.haotian.plugin.proxy.mq.callback.ProxyLocalTransactionExecuter;
import com.haotian.plugin.proxy.mq.callback.ProxySendCallback;
import com.haotian.plugin.proxy.mq.ext.ProxyMessageProducerEx;
import com.haotian.plugin.proxy.mq.ext.ProxyMqTransactionChecker;
import com.haotian.plugin.proxy.mq.internal.ProxyMessageConfig;
import com.haotian.plugin.proxy.mq.internal.ProxyMessageException;
import com.haotian.plugin.proxy.mq.status.ProxyTransactionStatus;
import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/haotian/plugin/proxy/mq/ext/rocketmq/RocketMqMessageSender.class */
public class RocketMqMessageSender implements ProxyMessageProducerEx, ApplicationContextAware {
    private DefaultMQProducer producer;
    private TransactionMQProducer transProductor;
    private final String subject;
    private final Map<ProxyMessageType, Set<String>> typeTagsMapping;
    private RocketMqTransactionCheckListener transactionCheckerListener;
    private CacheStore cacheStore;
    private final ProxyMessageConfig messageConfig;
    private final Logger logger = LoggerFactory.getLogger(getClass().getName());
    private Properties mergedProps = new Properties();

    public RocketMqMessageSender(ProxyMessageConfig proxyMessageConfig, Map<ProxyMessageType, Set<String>> map) {
        this.subject = proxyMessageConfig.getSubject();
        this.messageConfig = proxyMessageConfig;
        this.typeTagsMapping = map;
    }

    public ProxySendResult send(ProxyMessage proxyMessage) {
        checkStatus(proxyMessage, ProxyMessageType.SYNCHRONIZATION);
        try {
            return getProxySendResult(this.producer.send(getMessage(proxyMessage)));
        } catch (Exception e) {
            throw new ProxyMessageException("send subject[" + proxyMessage.getSubject() + "] tag[" + proxyMessage.getTag() + "] error", e);
        }
    }

    public ProxySendResult sendInTransaction(final ProxyMessage proxyMessage, final ProxyLocalTransactionExecuter proxyLocalTransactionExecuter, Object obj) {
        checkStatus(proxyMessage, ProxyMessageType.TRANSACTION);
        try {
            TransactionSendResult sendMessageInTransaction = this.transProductor.sendMessageInTransaction(getMessage(proxyMessage), new LocalTransactionExecuter() { // from class: com.haotian.plugin.proxy.mq.ext.rocketmq.RocketMqMessageSender.1
                public LocalTransactionState executeLocalTransactionBranch(Message message, Object obj2) {
                    ProxyTransactionStatus exec = proxyLocalTransactionExecuter.exec(proxyMessage, obj2);
                    return exec == ProxyTransactionStatus.COMMIT ? LocalTransactionState.COMMIT_MESSAGE : exec == ProxyTransactionStatus.ROLLBACK ? LocalTransactionState.ROLLBACK_MESSAGE : exec == ProxyTransactionStatus.UNKNOW ? LocalTransactionState.UNKNOW : LocalTransactionState.UNKNOW;
                }
            }, obj);
            ProxySendResult proxySendResult = getProxySendResult(sendMessageInTransaction);
            this.cacheStore.set(sendMessageInTransaction.getMsgId(), ProxyTransactionStatus.COMMIT.toString(), ProxyMqTransactionChecker.MQ_TRAN_CHECK_EXPIRE);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("send tran msg msgId={" + proxySendResult.getMsgId() + "}  topic=" + proxyMessage.getSubject() + " tag=" + proxyMessage.getTag() + "  body={" + proxyMessage.getContent() + "}  SendResult={" + proxySendResult.getStatus() + "}");
            }
            return proxySendResult;
        } catch (Exception e) {
            throw new ProxyMessageException("send subject[" + proxyMessage.getSubject() + "] tag[" + proxyMessage.getTag() + "] error", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProxySendResult getProxySendResult(SendResult sendResult) {
        ProxySendResult proxySendResult = new ProxySendResult();
        proxySendResult.setMsgId(sendResult.getMsgId());
        proxySendResult.setStatus(sendResult.getSendStatus().name());
        return proxySendResult;
    }

    private Message getMessage(ProxyMessage proxyMessage) {
        try {
            return new Message(proxyMessage.getSubject(), proxyMessage.getTag(), proxyMessage.getContent().getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Convert rocketmq message error", e);
        }
    }

    public void send(ProxyMessage proxyMessage, final ProxySendCallback proxySendCallback) {
        checkStatus(proxyMessage, ProxyMessageType.ASYNCHRONOUS);
        try {
            this.producer.send(getMessage(proxyMessage), new SendCallback() { // from class: com.haotian.plugin.proxy.mq.ext.rocketmq.RocketMqMessageSender.2
                public void onSuccess(SendResult sendResult) {
                    if (proxySendCallback != null) {
                        proxySendCallback.onSuccess(RocketMqMessageSender.this.getProxySendResult(sendResult));
                    }
                }

                public void onException(Throwable th) {
                    if (proxySendCallback != null) {
                        proxySendCallback.onException(new ProxyExceptionContext(th));
                    }
                }
            });
        } catch (Throwable th) {
            if (proxySendCallback == null) {
                throw new ProxyMessageException("send subject[" + proxyMessage.getSubject() + "] tag[" + proxyMessage.getTag() + "] error", th);
            }
            proxySendCallback.onException(new ProxyExceptionContext(th));
        }
    }

    private void checkStatus(ProxyMessage proxyMessage, ProxyMessageType proxyMessageType) {
        if (!this.subject.equals(proxyMessage.getSubject())) {
            throw new IllegalArgumentException("Unsupported subject[" + proxyMessage.getSubject() + "].Supported subject[" + this.subject + "].");
        }
        Set<String> set = this.typeTagsMapping.get(proxyMessageType);
        if (set == null || !(set.contains(proxyMessage.getTag()) || set.contains("*"))) {
            throw new IllegalStateException("Subject[" + proxyMessage.getSubject() + "]unsupported tag[" + proxyMessage.getTag() + "].Supported tags " + (set == null ? "[]" : set) + " for messageType[" + proxyMessageType + "].");
        }
    }

    public void sendOneway(ProxyMessage proxyMessage) {
        checkStatus(proxyMessage, ProxyMessageType.ONEWAY);
        try {
            this.producer.sendOneway(getMessage(proxyMessage));
        } catch (Throwable th) {
            throw new ProxyMessageException("send subject[" + proxyMessage.getSubject() + "] tag[" + proxyMessage.getTag() + "] error", th);
        }
    }

    @Override // com.haotian.plugin.proxy.mq.ext.ProxyMessageProducerEx
    public void startup() {
        if (this.typeTagsMapping.containsKey(ProxyMessageType.TRANSACTION)) {
            this.transProductor = new TransactionMQProducer(this.subject + "_" + ProxyMessageType.TRANSACTION);
            this.transactionCheckerListener = new RocketMqTransactionCheckListener();
            this.transactionCheckerListener.setCacheStore(this.cacheStore);
        } else {
            this.transProductor = null;
        }
        boolean z = false;
        if (this.typeTagsMapping.containsKey(ProxyMessageType.SYNCHRONIZATION)) {
            z = true;
        }
        if (!z && this.typeTagsMapping.containsKey(ProxyMessageType.ASYNCHRONOUS)) {
            z = true;
        }
        if (!z && this.typeTagsMapping.containsKey(ProxyMessageType.ONEWAY)) {
            z = true;
        }
        if (!z && this.typeTagsMapping.containsKey(ProxyMessageType.ORDERED)) {
            z = true;
        }
        if (z) {
            this.producer = new DefaultMQProducer(this.subject + "_Productor");
        } else {
            this.producer = null;
        }
        try {
            if (this.producer != null) {
                this.producer.setNamesrvAddr(this.messageConfig.getProperties().getProperty("mq.rocket.namesrvaddr", this.mergedProps.getProperty("mq.rocket.namesrvaddr")));
                this.producer.setRetryTimesWhenSendFailed(0);
                this.producer.setRetryTimesWhenSendAsyncFailed(0);
                this.producer.start();
            }
            if (this.transProductor != null) {
                this.transProductor.setNamesrvAddr(this.messageConfig.getProperties().getProperty("mq.rocket.namesrvaddr", this.mergedProps.getProperty("mq.rocket.namesrvaddr")));
                this.transProductor.setTransactionCheckListener(this.transactionCheckerListener);
                this.transProductor.setCheckThreadPoolMinSize(5);
                this.transProductor.setCheckThreadPoolMaxSize(50);
                this.transProductor.setCheckRequestHoldMax(2000);
                this.transProductor.start();
            }
        } catch (MQClientException e) {
            throw new ProxyMessageException("start product[" + this.subject + "] error", e);
        }
    }

    @Override // com.haotian.plugin.proxy.mq.ext.ProxyMessageProducerEx
    public void shutdown() {
        if (this.producer != null) {
            this.producer.shutdown();
        }
        if (this.transProductor != null) {
            this.transProductor.shutdown();
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.cacheStore = (CacheStore) applicationContext.getBean(CacheStore.class);
        Map beansOfType = applicationContext.getBeansOfType(Properties.class);
        if (beansOfType != null) {
            Iterator it = beansOfType.values().iterator();
            while (it.hasNext()) {
                CollectionUtils.mergePropertiesIntoMap((Properties) it.next(), this.mergedProps);
            }
        }
    }
}
