package com.tydic.nicc.mq.starter.alimq;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import com.tydic.nicc.mq.starter.KKMsgBuilder;
import com.tydic.nicc.mq.starter.annotation.KKMqTransactionCheckService;
import com.tydic.nicc.mq.starter.api.KKMqLocalTransactionChecker;
import com.tydic.nicc.mq.starter.api.KKMqLocalTransactionExecter;
import com.tydic.nicc.mq.starter.api.KKMqProducerHelper;
import com.tydic.nicc.mq.starter.api.KKMqSendCallback;
import com.tydic.nicc.mq.starter.autoconfigure.KKMqProducerProperties;
import com.tydic.nicc.mq.starter.autoconfigure.KKMqProperties;
import com.tydic.nicc.mq.starter.entity.KKMqSendResult;
import com.tydic.nicc.mq.starter.entity.eum.KKMqTransactionStatus;
import com.tydic.nicc.mq.starter.exception.KKMqMsgException;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.config.BeanDefinitionCustomizer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/tydic/nicc/mq/starter/alimq/AliMqProducerHelper.class */
public class AliMqProducerHelper implements KKMqProducerHelper {
    private static final Logger log = LoggerFactory.getLogger(AliMqProducerHelper.class);
    private KKMqProperties mqProperties;
    private Producer producer;
    private TransactionProducer transactionProducer;
    private Properties properties;
    private boolean isRunning;

    public AliMqProducerHelper(ConfigurableApplicationContext configurableApplicationContext, KKMqProperties kKMqProperties) {
        this.mqProperties = kKMqProperties;
        KKMqProducerProperties producer = kKMqProperties.getProducer();
        Properties properties = new Properties();
        properties.put("AccessKey", producer.getAccessKey());
        properties.put("SecretKey", producer.getSecretKey());
        properties.put("GROUP_ID", kKMqProperties.getProducer().getGroup());
        if (!StringUtils.isEmpty(kKMqProperties.getNamespace())) {
            properties.put("INSTANCE_ID", kKMqProperties.getNamespace());
        }
        properties.setProperty("SendMsgTimeoutMillis", String.valueOf(kKMqProperties.getProducer().getSendTimeout()));
        properties.put("NAMESRV_ADDR", kKMqProperties.getNameServer());
        this.properties = properties;
        this.producer = ONSFactory.createProducer(properties);
        if (!StringUtils.isEmpty(producer.getTransGroup())) {
            createTransProducer(configurableApplicationContext);
        }
        log.info("初始化 AliMqProducerHelper 完成:{}", kKMqProperties);
    }

    public void createTransProducer(ConfigurableApplicationContext configurableApplicationContext) {
        ((Map) configurableApplicationContext.getBeansWithAnnotation(KKMqTransactionCheckService.class).entrySet().stream().filter(entry -> {
            return !ScopedProxyUtils.isScopedTarget((String) entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).forEach((str, obj) -> {
            AopProxyUtils.ultimateTargetClass(obj);
            ((GenericApplicationContext) configurableApplicationContext).registerBean(KKMqLocalTransactionChecker.class.getName(), KKMqLocalTransactionChecker.class, new BeanDefinitionCustomizer[0]);
            Properties properties = new Properties();
            BeanUtils.copyProperties(this.properties, properties);
            properties.put("GROUP_ID", this.mqProperties.getProducer().getTransGroup());
            KKMqLocalTransactionChecker kKMqLocalTransactionChecker = (KKMqLocalTransactionChecker) obj;
            log.info("初始化 AliMqProducerHelper 注册事务处理服务:{}", kKMqLocalTransactionChecker);
            this.transactionProducer = ONSFactory.createTransactionProducer(this.properties, message -> {
                KKMqTransactionStatus check = kKMqLocalTransactionChecker.check(KKMsgBuilder.convertAliMsg(message));
                return check.equals(KKMqTransactionStatus.COMMIT) ? TransactionStatus.CommitTransaction : check.equals(KKMqTransactionStatus.ROLLBACK) ? TransactionStatus.RollbackTransaction : TransactionStatus.Unknow;
            });
        });
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void start() {
        try {
            if (!this.isRunning) {
                this.producer.start();
                if (this.transactionProducer != null) {
                    this.transactionProducer.start();
                }
                this.isRunning = true;
            }
        } catch (Exception e) {
            log.error("初始化 AliMqProducerHelper 启动异常:{}", this.mqProperties, e);
        }
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendTransactionMsg(String str, String str2, KKMqLocalTransactionExecter kKMqLocalTransactionExecter) {
        sendTransactionMsg(str, "*", str2, null, kKMqLocalTransactionExecter);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendTransactionMsg(String str, String str2, String str3, Object obj, KKMqLocalTransactionExecter kKMqLocalTransactionExecter) {
        String parseMessageToString = KKMsgBuilder.parseMessageToString(str3);
        if (this.transactionProducer != null) {
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送事务消息:{}|{}|{}", new Object[]{str, str2, parseMessageToString});
            }
            this.transactionProducer.send(KKMsgBuilder.buildAliMsg(str, str2, parseMessageToString), (message, obj2) -> {
                KKMqTransactionStatus execute = kKMqLocalTransactionExecter.execute(KKMsgBuilder.convertAliMsg(message), obj2);
                return execute.equals(KKMqTransactionStatus.COMMIT) ? TransactionStatus.CommitTransaction : execute.equals(KKMqTransactionStatus.ROLLBACK) ? TransactionStatus.RollbackTransaction : TransactionStatus.Unknow;
            }, obj);
        }
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendAsyncMsg(String str, String str2, KKMqSendCallback kKMqSendCallback) {
        sendAsyncMsg(str, "*", str2, kKMqSendCallback);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public void sendAsyncMsg(String str, String str2, String str3, KKMqSendCallback kKMqSendCallback) {
        sendMsg(str, str2, str3, kKMqSendCallback);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendDelayMsgOnTime(String str, String str2, Object obj, Date date) {
        try {
            String parseMessageToString = KKMsgBuilder.parseMessageToString(obj);
            long currentTimeMillis = System.currentTimeMillis();
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送定时消息:{}|{}|{}|{}", new Object[]{str, str2, parseMessageToString, date});
            }
            if (date.getTime() <= System.currentTimeMillis()) {
                throw new KKMqMsgException("KKMQ生产者-发送定时消息异常,指定时间不得小于当前时间!");
            }
            SendResult send = this.producer.send(KKMsgBuilder.buildAliDelayMsg(str, str2, parseMessageToString, date.getTime()));
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送定时消息-完成:{}|{}|{}", new Object[]{str, parseMessageToString, Long.valueOf(currentTimeMillis2)});
            }
            return KKMsgBuilder.convertAliMsgResult(send);
        } catch (Exception e) {
            log.error("KKMQ生产者-消息发送异常:", e);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendDelayMsg(String str, Object obj, Integer num) {
        return sendDelayMsg(str, "*", obj, num);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendDelayMsg(String str, String str2, Object obj, Integer num) {
        try {
            String parseMessageToString = KKMsgBuilder.parseMessageToString(obj);
            long currentTimeMillis = System.currentTimeMillis();
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送延时消息:{}|{}|{}|{}", new Object[]{str, str2, parseMessageToString, num});
            }
            SendResult send = this.producer.send(KKMsgBuilder.buildAliDelayMsg(str, str2, parseMessageToString, currentTimeMillis + (num.intValue() * 1000)));
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送延时消息-完成:{}|{}|{}", new Object[]{str, parseMessageToString, Long.valueOf(currentTimeMillis2)});
            }
            return KKMsgBuilder.convertAliMsgResult(send);
        } catch (Exception e) {
            log.error("KKMQ生产者-消息发送异常:", e);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendMsg(String str, Object obj) {
        return sendMsg(str, "*", obj);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendMsg(String str, String str2, Object obj) {
        return sendMsg(str, str2, obj, null);
    }

    @Override // com.tydic.nicc.mq.starter.api.KKMqProducerHelper
    public KKMqSendResult sendMsg(String str, String str2, Object obj, final KKMqSendCallback kKMqSendCallback) {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            String parseMessageToString = KKMsgBuilder.parseMessageToString(obj);
            KKMqSendResult kKMqSendResult = new KKMqSendResult(str);
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送消息:{}|{}|{}", new Object[]{str, str2, parseMessageToString});
            }
            if (kKMqSendCallback != null) {
                this.producer.sendAsync(KKMsgBuilder.buildAliMsg(str, str2, parseMessageToString), new SendCallback() { // from class: com.tydic.nicc.mq.starter.alimq.AliMqProducerHelper.1
                    public void onSuccess(SendResult sendResult) {
                        kKMqSendCallback.onSuccess(KKMsgBuilder.convertAliMsgResult(sendResult));
                    }

                    public void onException(OnExceptionContext onExceptionContext) {
                        kKMqSendCallback.onException(onExceptionContext.getException());
                    }
                });
            } else {
                kKMqSendResult = KKMsgBuilder.convertAliMsgResult(this.producer.send(KKMsgBuilder.buildAliMsg(str, str2, parseMessageToString)));
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (log.isTraceEnabled()) {
                log.trace("KKMQ生产者-发送消息-完成:{}|{}|{}", new Object[]{str, parseMessageToString, Long.valueOf(currentTimeMillis2)});
            }
            return kKMqSendResult;
        } catch (Exception e) {
            log.error("KKMQ生产者-消息发送异常:", e);
            throw new MessagingException(e.getMessage(), e);
        }
    }
}
