package com.tydic.nicc.mq.starter.rocketmq;

import com.tydic.nicc.mq.starter.KKMqUtils;
import com.tydic.nicc.mq.starter.KKMsgBuilder;
import com.tydic.nicc.mq.starter.annotation.KKMqTransactionCheckService;
import com.tydic.nicc.mq.starter.api.KKMqLocalTransactionChecker;
import com.tydic.nicc.mq.starter.api.KKMqLocalTransactionExecter;
import com.tydic.nicc.mq.starter.api.KKMqProducerHelper;
import com.tydic.nicc.mq.starter.api.KKMqSendCallback;
import com.tydic.nicc.mq.starter.autoconfigure.KKMqProducerProperties;
import com.tydic.nicc.mq.starter.autoconfigure.KKMqProperties;
import com.tydic.nicc.mq.starter.entity.KKMqSendResult;
import com.tydic.nicc.mq.starter.entity.eum.KKMqTransactionStatus;
import com.tydic.nicc.mq.starter.exception.KKMqMsgException;
import java.util.Date;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.messaging.MessagingException;

/* loaded from: input_file:com/tydic/nicc/mq/starter/rocketmq/RocketMqProducerHelper.class */
public class RocketMqProducerHelper implements KKMqProducerHelper {
    private static final Logger log = LoggerFactory.getLogger(RocketMqProducerHelper.class);
    private TransactionMQProducer transProducer;
    private DefaultMQProducer producer;
    private KKMqProperties mqProperties;
    private RPCHook rpcHook;
    private boolean isRunning = false;

    public RocketMqProducerHelper(ConfigurableApplicationContext configurableApplicationContext, KKMqProperties kKMqProperties) {
        this.mqProperties = kKMqProperties;
        KKMqProducerProperties producer = kKMqProperties.getProducer();
        String group = producer.getGroup();
        if (!StringUtils.isEmpty(producer.getAccessKey()) && !StringUtils.isEmpty(producer.getSecretKey())) {
            this.rpcHook = new AclClientRPCHook(new SessionCredentials(producer.getAccessKey(), producer.getSecretKey()));
        }
        if (this.rpcHook != null) {
            log.info("KKMQ生产者-初始化 RocketMqProducerHelper 使用ACL:{}", producer);
            if (AccessChannel.CLOUD.name().equals(kKMqProperties.getProducer().getAccessChannel())) {
                this.producer = new DefaultMQProducer(group, this.rpcHook, true, (String) null);
                this.producer.setAccessChannel(AccessChannel.valueOf(kKMqProperties.getProducer().getAccessChannel()));
            } else {
                this.producer = new DefaultMQProducer(group, this.rpcHook);
            }
            if (!StringUtils.isEmpty(producer.getTransGroup())) {
                this.transProducer = new TransactionMQProducer(producer.getTransGroup(), this.rpcHook);
            }
        } else {
            this.producer = new DefaultMQProducer();
            this.producer.setProducerGroup(group);
            if (!StringUtils.isEmpty(producer.getTransGroup())) {
                this.transProducer = new TransactionMQProducer(producer.getTransGroup());
            }
        }
        if (!StringUtils.isEmpty(kKMqProperties.getNamespace())) {
            this.producer.setNamespace(kKMqProperties.getNamespace());
        }
        this.producer.setNamesrvAddr(kKMqProperties.getNameServer());
        this.producer.setSendMsgTimeout(producer.getSendTimeout().intValue());
        this.producer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed().intValue());
        this.producer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed().intValue());
        this.producer.setMaxMessageSize(producer.getMaxMessageSize().intValue());
        if (!StringUtils.isEmpty(producer.getTransGroup()) && this.transProducer != null) {
            this.transProducer.setNamesrvAddr(kKMqProperties.getNameServer());
            this.transProducer.setSendMsgTimeout(producer.getSendTimeout().intValue());
            this.transProducer.setRetryTimesWhenSendFailed(producer.getRetryTimesWhenSendFailed().intValue());
            this.transProducer.setRetryTimesWhenSendAsyncFailed(producer.getRetryTimesWhenSendAsyncFailed().intValue());
            this.transProducer.setMaxMessageSize(producer.getMaxMessageSize().intValue());
            if (!StringUtils.isEmpty(kKMqProperties.getNamespace())) {
                this.transProducer.setNamespace(kKMqProperties.getNamespace());
            }
            registListener(configurableApplicationContext);
        }
        log.info("KKMQ生产者-初始化 RocketMqProducerHelper 完成:{}", kKMqProperties);
    }

    public void registListener(ConfigurableApplicationContext configurableApplicationContext) {
        ((Map) configurableApplicationContext.getBeansWithAnnotation(KKMqTransactionCheckService.class).entrySet().stream().filter(entry -> {
            return !ScopedProxyUtils.isScopedTarget((String) entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).forEach((str, obj) -> {
            ((GenericApplicationContext) configurableApplicationContext).registerBean(KKMqLocalTransactionChecker.class.getName(), KKMqLocalTransactionChecker.class, new BeanDefinitionCustomizer[0]);
            KKMqLocalTransactionChecker kKMqLocalTransactionChecker = (KKMqLocalTransactionChecker) obj;
            log.info("KKMQ生产者-初始化 RocketMqProducerHelper 注册事务处理服务:{}", kKMqLocalTransactionChecker);
            this.transProducer.setTransactionCheckListener(messageExt -> {
                KKMqTransactionStatus check = kKMqLocalTransactionChecker.check(KKMsgBuilder.convertRocketMsg(messageExt));
                return check.equals(KKMqTransactionStatus.COMMIT) ? LocalTransactionState.COMMIT_MESSAGE : check.equals(KKMqTransactionStatus.ROLLBACK) ? LocalTransactionState.ROLLBACK_MESSAGE : LocalTransactionState.UNKNOW;
            });
        });
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void start() {
        try {
            if (!this.isRunning) {
                this.producer.start();
                if (this.transProducer != null) {
                    this.transProducer.start();
                }
                this.isRunning = true;
            }
        } catch (Exception e) {
            log.error("KKMQ生产者-初始化 RocketMqProducerHelper 启动异常:{}", this.mqProperties, e);
        }
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendTransactionMsg(String str, String str2, KKMqLocalTransactionExecter kKMqLocalTransactionExecter) {
        sendTransactionMsg(str, "*", str2, null, kKMqLocalTransactionExecter);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendTransactionMsg(String str, String str2, String str3, Object obj, KKMqLocalTransactionExecter kKMqLocalTransactionExecter) {
        try {
            this.transProducer.sendMessageInTransaction(KKMsgBuilder.buildRocketMsg(str, str2, KKMsgBuilder.parseMessageToString(str3)), (message, obj2) -> {
                KKMqTransactionStatus execute = kKMqLocalTransactionExecter.execute(KKMsgBuilder.convertRocketMsg(message), obj2);
                return execute.equals(KKMqTransactionStatus.COMMIT) ? LocalTransactionState.COMMIT_MESSAGE : execute.equals(KKMqTransactionStatus.ROLLBACK) ? LocalTransactionState.ROLLBACK_MESSAGE : LocalTransactionState.UNKNOW;
            }, obj);
        } catch (Exception e) {
            log.error("KKMQ生产者-发送事务消息异常:", e);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendAsyncMsg(String str, String str2, KKMqSendCallback kKMqSendCallback) {
        sendAsyncMsg(str, "*", str2, kKMqSendCallback);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendAsyncMsg(String str, String str2, String str3, KKMqSendCallback kKMqSendCallback) {
        sendMsg(str, str2, str3, kKMqSendCallback);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendDelayMsgOnTime(String str, String str2, Object obj, Date date) {
        String parseMessageToString = KKMsgBuilder.parseMessageToString(obj);
        if (log.isTraceEnabled()) {
            log.trace("KKMQ生产者-发送定时消息:{}|{}|{}|{}", new Object[]{str, str2, parseMessageToString, date});
        }
        long time = date.getTime() - System.currentTimeMillis();
        if (time <= 0) {
            throw new KKMqMsgException("KKMQ生产者-发送定时消息异常,指定时间不得小于当前时间!");
        }
        return sendDelayMsg(str, str2, obj, Integer.valueOf(Math.toIntExact(time / 1000)));
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendDelayMsg(String str, Object obj, Integer num) {
        return sendDelayMsg(str, "*", obj, num);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendDelayMsg(String str, String str2, Object obj, Integer num) {
        int rmqDelayLevel = KKMqUtils.getRmqDelayLevel(num.intValue());
        try {
            String parseMessageToString = KKMsgBuilder.parseMessageToString(obj);
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送延时消息:{}|{}|{}|{}", new Object[]{str, str2, parseMessageToString, Integer.valueOf(rmqDelayLevel)});
            }
            Message message = new Message(str, str2, parseMessageToString.getBytes("UTF-8"));
            long currentTimeMillis = System.currentTimeMillis();
            if (rmqDelayLevel > 0) {
                message.setDelayTimeLevel(rmqDelayLevel);
            }
            SendResult send = this.producer.send(message, this.mqProperties.getProducer().getSendTimeout().intValue());
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送延时消息-完成:{}|{}|{}", new Object[]{str, parseMessageToString, Long.valueOf(currentTimeMillis2)});
            }
            return KKMsgBuilder.convertRocketMsgResult(send);
        } catch (Exception e) {
            log.error("KKMQ生产者-发送延时消息-异常:level = {} ,context = {}", new Object[]{Integer.valueOf(rmqDelayLevel), obj, num});
            throw new MessagingException(e.getMessage(), e);
        }
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendMsg(String str, Object obj) {
        return sendMsg(str, "*", obj);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendMsg(String str, String str2, Object obj) {
        return sendMsg(str, str2, obj, null);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendMsg(String str, String str2, Object obj, final KKMqSendCallback kKMqSendCallback) {
        try {
            String parseMessageToString = KKMsgBuilder.parseMessageToString(obj);
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送消息:{}|{}|{}", new Object[]{str, str2, parseMessageToString});
            }
            long currentTimeMillis = System.currentTimeMillis();
            SendResult sendResult = new SendResult();
            if (kKMqSendCallback != null) {
                this.producer.send(KKMsgBuilder.buildRocketMsg(str, str2, parseMessageToString), new SendCallback() { // from class: com.tydic.nicc.mq.starter.rocketmq.RocketMqProducerHelper.1
                    public void onSuccess(SendResult sendResult2) {
                        kKMqSendCallback.onSuccess(KKMsgBuilder.convertRocketMsgResult(sendResult2));
                    }

                    public void onException(Throwable th) {
                        kKMqSendCallback.onException(th);
                    }
                });
            } else {
                sendResult = this.producer.send(KKMsgBuilder.buildRocketMsg(str, str2, parseMessageToString));
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送消息-完成:{}|{}|{}", new Object[]{str, parseMessageToString, Long.valueOf(currentTimeMillis2)});
            }
            return KKMsgBuilder.convertRocketMsgResult(sendResult);
        } catch (Exception e) {
            log.error("消息发送异常:", e);
            throw new MessagingException(e.getMessage(), e);
        }
    }
}
