package com.ohaotian.abilityadmin.pushClient.abilityMqBase.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.ohaotian.abilityadmin.config.pubsub.ChannelNaming;
import com.ohaotian.abilityadmin.config.pubsub.loader.PubSubRes;
import com.ohaotian.abilityadmin.config.pubsub.properties.PubSubResProperties;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqCacheDataMapper;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqCacheDataPo;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqKafkaPo;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMqBaseService;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.model.bo.MqBaseReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSubRspBo;
import com.ohaotian.plugin.cache.CacheClient;
import com.ohaotian.portalcommon.config.cluster.AdminClusterConfig;
import com.ohaotian.portalcommon.constant.ConstantBaseVersion;
import com.ohaotian.portalcommon.model.bo.RedisSyncDataBo;
import com.ohaotian.portalcommon.model.bo.RspBO;
import com.ohaotian.portalcommon.util.GsonUtil;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

@Service
/* loaded from: input_file:com/ohaotian/abilityadmin/pushClient/abilityMqBase/impl/AbilityMessagePoolServiceImpl.class */
public class AbilityMessagePoolServiceImpl implements AbilityMessagePoolService {
    private static final Logger log = LoggerFactory.getLogger(AbilityMessagePoolServiceImpl.class);

    @Autowired
    private AbilityProvideMqCacheDataMapper abilityProvideMqCacheDataMapper;

    @Autowired
    private AbilityMqBaseService abilityMqBaseService;

    @Autowired
    private CacheClient cacheClient;

    @Autowired
    private AdminClusterConfig adminClusterConfig;

    @Autowired
    private ConstantBaseVersion constantBaseVersion;

    @Autowired
    private PubSubResProperties pubSubResProperties;

    @Autowired
    private PubSubRes pubSubRes;
    private final Map<Long, Boolean> resRetentionMap = new ConcurrentHashMap();
    private final Queue<Map<String, String>> operationQueue = new LinkedList();
    private final Set<String> haveToPushSet = new HashSet();
    public static final String STOP_FLAG = "_stop";

    @Value("${atp.mq.operation.waitTime:100}")
    private long operationQueueWaitTime;

    @Autowired
    @Qualifier("mqDelayExecutorService")
    private ExecutorService mqDelayExecutorService;

    @Override // com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService
    public void initWaitTimeQueueThread(AbilityProvideMqKafkaPo abilityProvideMqKafkaPo) {
        String abilityPath = this.abilityMqBaseService.getAbilityPath(abilityProvideMqKafkaPo.getAbilityId());
        String queueKey = getQueueKey(abilityProvideMqKafkaPo.getAbilityId(), abilityProvideMqKafkaPo.getMqId());
        CompletableFuture.runAsync(() -> {
            this.resRetentionMap.put(abilityProvideMqKafkaPo.getMqId(), Boolean.TRUE);
            while (true) {
                try {
                } catch (Exception e) {
                    log.error("{} 异步延迟推送异常！ex:{}", queueKey, e.getMessage());
                }
                if (Boolean.FALSE.equals(this.resRetentionMap.get(abilityProvideMqKafkaPo.getMqId()))) {
                    log.info("队列资源释放 mqId:{}", abilityProvideMqKafkaPo.getMqId());
                    return;
                }
                if (!Boolean.FALSE.equals(Boolean.valueOf(this.adminClusterConfig.isMaster()))) {
                    if (ObjectUtils.isEmpty(this.cacheClient.get(queueKey + STOP_FLAG))) {
                        Object obj = this.cacheClient.get(queueKey);
                        if (ObjectUtils.isEmpty(obj)) {
                            log.warn("等待10s，重新获取 key:{}", queueKey);
                            TimeUnit.MILLISECONDS.sleep(10000L);
                        } else {
                            String str = (String) ((Queue) JSON.parseObject(String.valueOf(obj), new TypeReference<Queue<String>>() { // from class: com.ohaotian.abilityadmin.pushClient.abilityMqBase.impl.AbilityMessagePoolServiceImpl.1
                            }, new Feature[0])).poll();
                            if (StringUtils.isNotBlank(str)) {
                                if (this.haveToPushSet.contains(str)) {
                                    log.warn("异步延迟推送操作执行器异常，等待2s，重新获取 key:{}", queueKey);
                                    TimeUnit.MILLISECONDS.sleep(20000L);
                                } else {
                                    AbilityProvideMqCacheDataPo selectByPrimaryKey = this.abilityProvideMqCacheDataMapper.selectByPrimaryKey(Long.valueOf(Long.parseLong(str)));
                                    if (abilityProvideMqKafkaPo.getWaitTime().intValue() >= 0) {
                                        this.abilityMqBaseService.pushDataThread(Long.valueOf(Long.parseLong(str)), abilityPath, new String(selectByPrimaryKey.getData(), StandardCharsets.UTF_8));
                                        log.info("推送mqDataId：{}完成，等待{}ms，进行下一条推送 key:{}", new Object[]{str, abilityProvideMqKafkaPo.getWaitTime(), queueKey});
                                        TimeUnit.MILLISECONDS.sleep(abilityProvideMqKafkaPo.getWaitTime().intValue());
                                    } else {
                                        this.abilityMqBaseService.pushDataWait(Long.valueOf(Long.parseLong(str)), abilityPath, new String(selectByPrimaryKey.getData(), StandardCharsets.UTF_8));
                                        log.info("推送mqDataId：{}完成，进行下一条推送 key:{}", str, queueKey);
                                    }
                                    HashMap hashMap = new HashMap();
                                    hashMap.put("key", queueKey);
                                    hashMap.put("operation", "poll");
                                    hashMap.put("mqDataId", str);
                                    this.operationQueue.add(hashMap);
                                    this.haveToPushSet.add(str);
                                    TimeUnit.MILLISECONDS.sleep(this.operationQueueWaitTime * 2);
                                }
                            }
                        }
                    } else {
                        log.info("暂停消息推送");
                        TimeUnit.MILLISECONDS.sleep(10000L);
                    }
                }
            }
        }, this.mqDelayExecutorService).exceptionally(th -> {
            log.info("异步延迟推送, MQ_ID:{} TOPIC:{} 异常{}！", new Object[]{abilityProvideMqKafkaPo.getMqId(), abilityProvideMqKafkaPo.getTopic(), th.getMessage()});
            th.printStackTrace();
            return null;
        }).whenComplete((r5, th2) -> {
            log.info("异步延迟推送 MQ_ID:{} 释放完成！", abilityProvideMqKafkaPo.getMqId());
        });
        log.info("异步延迟推送 key:{} 启动完成", queueKey);
    }

    @Override // com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService
    public synchronized void pushDataQueue(KafkaSubRspBo kafkaSubRspBo, AbilityProvideMqCacheDataPo abilityProvideMqCacheDataPo) {
        String queueKey = getQueueKey(kafkaSubRspBo.getAbilityId(), Long.valueOf(kafkaSubRspBo.getMqId()));
        if (!Boolean.FALSE.equals(Boolean.valueOf(this.adminClusterConfig.isMaster()))) {
            HashMap hashMap = new HashMap();
            hashMap.put("key", queueKey);
            hashMap.put("operation", "add");
            hashMap.put("mqDataId", String.valueOf(abilityProvideMqCacheDataPo.getMqDataId()));
            this.operationQueue.add(hashMap);
            log.info("添加过后的operationQueue:{}", this.operationQueue);
            return;
        }
        String str = ChannelNaming.get(this.pubSubResProperties.getWeb2admin() + "_" + ((String) GsonUtil.toStringMap(this.cacheClient.get(this.constantBaseVersion.getMASTER_NAME()).toString()).get("name")));
        RedisSyncDataBo redisSyncDataBo = new RedisSyncDataBo();
        redisSyncDataBo.setCode("MQ_OPERATION_QUEUE");
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("kafkaSubRspBo", kafkaSubRspBo);
        jSONObject.put("mqCacheDataPo", abilityProvideMqCacheDataPo);
        redisSyncDataBo.setExtData(jSONObject);
        this.pubSubRes.getClient().publish(str, GsonUtil.toJson(redisSyncDataBo));
        log.info("异步延迟推送 通知主节点进行数据操作 key:{} mqDataId:{}", queueKey, abilityProvideMqCacheDataPo.getMqDataId());
    }

    @Override // com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService
    public RspBO<String> stopPushDataQueue(MqBaseReqBo mqBaseReqBo) {
        this.cacheClient.set(getQueueKey(mqBaseReqBo.getAbilityId(), mqBaseReqBo.getMqId()) + STOP_FLAG, JsonProperty.USE_DEFAULT_NAME);
        log.warn("异步延迟推送暂停 MQ_ID:{}", mqBaseReqBo.getMqId());
        return RspBO.success("暂停成功");
    }

    @Override // com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService
    public RspBO<String> startPushDataQueue(MqBaseReqBo mqBaseReqBo) {
        this.cacheClient.delete(getQueueKey(mqBaseReqBo.getAbilityId(), mqBaseReqBo.getMqId()) + STOP_FLAG);
        log.warn("异步延迟推送开启 MQ_ID:{}", mqBaseReqBo.getMqId());
        return RspBO.success("开启成功");
    }

    @Override // com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService
    public RspBO<String> deletePushDataQueue(MqBaseReqBo mqBaseReqBo) {
        String queueKey = getQueueKey(mqBaseReqBo.getAbilityId(), mqBaseReqBo.getMqId());
        this.cacheClient.delete(queueKey);
        log.info("异步延迟推送 key:{} 删除完成", queueKey);
        return RspBO.success("删除成功");
    }

    @Override // com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService
    public void releaseResources() {
        for (Map.Entry<Long, Boolean> entry : this.resRetentionMap.entrySet()) {
            log.info(entry.getKey().toString());
            entry.setValue(Boolean.FALSE);
        }
    }

    @Bean
    private void operationQueueExec() {
        log.info("初始化 异步延迟推送操作执行器！");
        new Thread(() -> {
            while (true) {
                try {
                    if (Boolean.TRUE.equals(Boolean.valueOf(this.adminClusterConfig.isMaster()))) {
                        Map<String, String> poll = this.operationQueue.poll();
                        if (!ObjectUtils.isEmpty(poll)) {
                            log.info("执行的operationMap内容为:{}", poll);
                            String str = poll.get("key");
                            String str2 = poll.get("operation");
                            String str3 = poll.get("mqDataId");
                            Object obj = this.cacheClient.get(str);
                            if (ObjectUtils.isEmpty(obj)) {
                                if ("add".equals(str2)) {
                                    LinkedList linkedList = new LinkedList();
                                    linkedList.add(String.valueOf(str3));
                                    this.cacheClient.set(str, JSON.toJSONString(linkedList));
                                    log.info("异步延迟推送 添加数据操作 key:{} mqDataId:{}", str, str3);
                                } else {
                                    log.info("operationMap的内容为:{}", poll);
                                }
                            } else if ("add".equals(str2)) {
                                Queue queue = (Queue) JSON.parseObject(String.valueOf(obj), new TypeReference<Queue<String>>() { // from class: com.ohaotian.abilityadmin.pushClient.abilityMqBase.impl.AbilityMessagePoolServiceImpl.2
                                }, new Feature[0]);
                                queue.add(String.valueOf(str3));
                                this.cacheClient.set(str, JSON.toJSONString(queue));
                                log.info("异步延迟推送 添加数据操作 key:{} mqDataId:{}", str, str3);
                            } else {
                                Queue queue2 = (Queue) JSON.parseObject(String.valueOf(obj), new TypeReference<Queue<String>>() { // from class: com.ohaotian.abilityadmin.pushClient.abilityMqBase.impl.AbilityMessagePoolServiceImpl.3
                                }, new Feature[0]);
                                this.haveToPushSet.remove((String) queue2.poll());
                                this.cacheClient.set(str, JSON.toJSONString(queue2));
                                log.info("异步延迟推送 移除数据操作 key:{} mqDataId:{}", str, str3);
                            }
                            TimeUnit.MILLISECONDS.sleep(this.operationQueueWaitTime);
                        }
                    }
                } catch (Exception e) {
                    log.error("mq管理操作执行器异常 ex:{}", e.getMessage());
                    e.printStackTrace();
                    return;
                }
            }
        }).start();
    }

    private String getQueueKey(Long l, Long l2) {
        return l2 + "_" + this.abilityMqBaseService.getAbilityPath(l);
    }
}
