package com.dic.bid.common.datasync.producer;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.dic.bid.common.core.config.DataSourceContextHolder;
import com.dic.bid.common.core.constant.ErrorCodeEnum;
import com.dic.bid.common.core.exception.MyRuntimeException;
import com.dic.bid.common.core.object.CallResult;
import com.dic.bid.common.core.util.ContextUtil;
import com.dic.bid.common.datasync.base.service.BaseProducerTransactionService;
import com.dic.bid.common.datasync.config.DataSyncProducerProperties;
import com.dic.bid.common.datasync.constant.DataSyncCommandType;
import com.dic.bid.common.datasync.constant.DataSyncConstant;
import com.dic.bid.common.datasync.event.DataSyncTransEvent;
import com.dic.bid.common.datasync.model.DataSyncProducerTrans;
import com.dic.bid.common.datasync.object.TransactionMessageLocalInfo;
import com.dic.bid.common.datasync.service.DataSyncProducerTransService;
import com.dic.bid.common.sequence.wrapper.IdGeneratorWrapper;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name = {"datasync.producer.enabled"}, havingValue = "true")
@Component
/* loaded from: input_file:com/dic/bid/common/datasync/producer/DataSyncRocketMqProducer.class */
public class DataSyncRocketMqProducer {
    private static final Logger log = LoggerFactory.getLogger(DataSyncRocketMqProducer.class);

    @Autowired
    private DataSyncProducerProperties producerProperties;

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private DataSyncProducerTransService producerTransService;

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Autowired
    private IdGeneratorWrapper idGenerator;
    private RLock distLock;
    private List<Integer> producerDatasourceTypes = null;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = this.lock.newCondition();

    @PostConstruct
    public void initialize() {
        new ScheduledThreadPoolExecutor(1, (ThreadFactory) new BasicThreadFactory.Builder().namingPattern("schedule-message-sender-%d").daemon(true).build()).scheduleWithFixedDelay(this::messageSender, 1L, 1L, TimeUnit.SECONDS);
        this.distLock = this.redissonClient.getLock(this.producerProperties.getDistributeLockName());
    }

    public void registerProducerDatasourceTypes(List<Integer> list) {
        this.producerDatasourceTypes = list;
    }

    public CallResult sendTransactionalMessage(BaseProducerTransactionService baseProducerTransactionService, Object obj, String str, String str2, String str3, JSONObject jSONObject) {
        String sb = new StringBuilder(128).append(this.idGenerator.nextStringId()).append("-").append(str2).append("-").append(obj).toString();
        TransactionMessageLocalInfo transactionMessageLocalInfo = new TransactionMessageLocalInfo();
        transactionMessageLocalInfo.setLocalService(baseProducerTransactionService);
        transactionMessageLocalInfo.setTopic(str);
        transactionMessageLocalInfo.setTransId(sb);
        transactionMessageLocalInfo.setMessageType(str2);
        transactionMessageLocalInfo.setCommandType(str3);
        transactionMessageLocalInfo.setMessageData(jSONObject);
        String str4 = (String) ContextUtil.getHttpRequest().getAttribute("traceId");
        transactionMessageLocalInfo.setProducerTraceId(str4);
        return verifySendResult(this.rocketMqTemplate.sendMessageInTransaction(str, MessageBuilder.withPayload(jSONObject).setHeader(DataSyncConstant.MESSAGE_HEADER_KEY_MESSAGE_TYPE, str2).setHeader(DataSyncConstant.MESSAGE_HEADER_KEY_COMMAND_TYPE, str3).setHeader(DataSyncConstant.MESSAGE_HEADER_KEY_PRODUCER_TRACE_ID, str4).setHeader("TRANSACTION_ID", sb).setHeader("KEYS", sb).build(), transactionMessageLocalInfo), transactionMessageLocalInfo);
    }

    public void sendOrderly(Object obj, String str, String str2, String str3, String str4, String str5) {
        this.producerTransService.saveNew(obj, str, str2, str3, str4, str5, (String) ContextUtil.getHttpRequest().getAttribute("traceId"));
        this.applicationEventPublisher.publishEvent(new DataSyncTransEvent(this.lock, this.condition));
    }

    public void sendOrderly(Object obj, String str, String str2, String str3, String str4) {
        sendOrderly(obj, str, str2, str3, str4, str2);
    }

    public void sendOrderly(Object obj, String str, String str2, DataSyncCommandType dataSyncCommandType, String str3) {
        sendOrderly(obj, str, str2, dataSyncCommandType.name(), str3, str2);
    }

    private void messageSender() {
        this.lock.lock();
        try {
            try {
                safeWait();
                if (CollUtil.isEmpty(this.producerDatasourceTypes)) {
                    doSend();
                } else {
                    Iterator<Integer> it = this.producerDatasourceTypes.iterator();
                    while (it.hasNext()) {
                        DataSourceContextHolder.setDataSourceType(it.next());
                        doSend();
                    }
                }
                if (CollUtil.isNotEmpty(this.producerDatasourceTypes)) {
                    DataSourceContextHolder.setDataSourceType((Integer) null);
                }
                this.lock.unlock();
            } catch (Exception e) {
                log.error("Failed to messageSender", e);
                if (CollUtil.isNotEmpty(this.producerDatasourceTypes)) {
                    DataSourceContextHolder.setDataSourceType((Integer) null);
                }
                this.lock.unlock();
            }
        } catch (Throwable th) {
            if (CollUtil.isNotEmpty(this.producerDatasourceTypes)) {
                DataSourceContextHolder.setDataSourceType((Integer) null);
            }
            this.lock.unlock();
            throw th;
        }
    }

    private void safeWait() {
        try {
            if (this.condition.await(this.producerProperties.getFetchTransIntervalMins().intValue(), TimeUnit.MINUTES)) {
            }
        } catch (InterruptedException e) {
            log.error("Failed to call safeWait", e);
            Thread.currentThread().interrupt();
            throw new MyRuntimeException(e);
        }
    }

    private void doSend() {
        List<DataSyncProducerTrans> latestNonUpdateTransList;
        ArrayList arrayList;
        if (this.distLock.tryLock()) {
            do {
                try {
                    try {
                        latestNonUpdateTransList = this.producerTransService.getLatestNonUpdateTransList();
                        if (CollUtil.isEmpty(latestNonUpdateTransList)) {
                            log.info("No DataSync Trans Data");
                            this.distLock.unlock();
                            return;
                        }
                        arrayList = new ArrayList(latestNonUpdateTransList.size());
                        for (DataSyncProducerTrans dataSyncProducerTrans : latestNonUpdateTransList) {
                            SendResult syncSendOrderly = this.rocketMqTemplate.syncSendOrderly(dataSyncProducerTrans.getMessageTopic(), MessageBuilder.withPayload(dataSyncProducerTrans.getMessageData()).setHeader(DataSyncConstant.MESSAGE_HEADER_KEY_MESSAGE_TYPE, dataSyncProducerTrans.getMessageType()).setHeader(DataSyncConstant.MESSAGE_HEADER_KEY_COMMAND_TYPE, dataSyncProducerTrans.getMessageCommand()).setHeader(DataSyncConstant.MESSAGE_HEADER_KEY_PRODUCER_TRACE_ID, dataSyncProducerTrans.getProducerTraceId()).setHeader("TRANSACTION_ID", dataSyncProducerTrans.getTransId()).setHeader("KEYS", dataSyncProducerTrans.getTransId()).build(), dataSyncProducerTrans.getMessageQueueSelectorKey());
                            if (syncSendOrderly.getSendStatus() != SendStatus.SEND_OK) {
                                log.error("Failed to send message for TransId {}, MessageType {}, CommandType{}, traceId{}, sendResult{}", new Object[]{dataSyncProducerTrans.getTransId(), dataSyncProducerTrans.getMessageType(), dataSyncProducerTrans.getMessageCommand(), dataSyncProducerTrans.getProducerTraceId(), syncSendOrderly.getSendStatus()});
                                this.distLock.unlock();
                                return;
                            }
                            arrayList.add(dataSyncProducerTrans.getId());
                        }
                        if (CollUtil.isNotEmpty(arrayList)) {
                            this.producerTransService.batchUpdate(arrayList);
                        }
                    } catch (Exception e) {
                        log.error("Failed to execute sendMessage or batchUpdate", e);
                        this.distLock.unlock();
                        return;
                    }
                } catch (Throwable th) {
                    this.distLock.unlock();
                    throw th;
                }
            } while (arrayList.size() == latestNonUpdateTransList.size());
            this.distLock.unlock();
        }
    }

    private CallResult verifySendResult(TransactionSendResult transactionSendResult, TransactionMessageLocalInfo transactionMessageLocalInfo) {
        if (transactionSendResult.getSendStatus() != SendStatus.SEND_OK) {
            return CallResult.error("消息发送失败，发送状态 [" + transactionSendResult.getSendStatus().name() + "]!");
        }
        if (transactionMessageLocalInfo.getExceptionCause() != null) {
            throw new MyRuntimeException(transactionMessageLocalInfo.getExceptionCause());
        }
        if (transactionSendResult.getLocalTransactionState() == LocalTransactionState.UNKNOW) {
            log.warn("Timeout for waiting Message [TransId: {}] Local Transaction notice.", transactionMessageLocalInfo.getTransId());
            return CallResult.error(ErrorCodeEnum.TRANSACTION_MESSAGE_LOCAL_STATUS_UNKNOW.getErrorMessage());
        }
        if (transactionSendResult.getLocalTransactionState() != LocalTransactionState.ROLLBACK_MESSAGE) {
            return CallResult.ok();
        }
        String errorMessage = ErrorCodeEnum.TRANSACTION_MESSAGE_LOCAL_STATUS_ROLLBACK.getErrorMessage();
        if (StrUtil.isNotBlank(transactionMessageLocalInfo.getErrorMessage())) {
            errorMessage = transactionMessageLocalInfo.getErrorMessage();
        }
        return CallResult.error(errorMessage);
    }
}
