/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.abilityadmin.pushClient.abilityMqBase.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.ohaotian.abilityadmin.config.pubsub.ChannelNaming;
import com.ohaotian.abilityadmin.config.pubsub.loader.PubSubRes;
import com.ohaotian.abilityadmin.config.pubsub.properties.PubSubResProperties;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqCacheDataMapper;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqCacheDataPo;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqKafkaPo;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMqBaseService;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.model.bo.MqBaseReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSubRspBo;
import com.ohaotian.plugin.cache.CacheClient;
import com.ohaotian.portalcommon.config.cluster.AdminClusterConfig;
import com.ohaotian.portalcommon.constant.ConstantBaseVersion;
import com.ohaotian.portalcommon.model.bo.RedisSyncDataBo;
import com.ohaotian.portalcommon.model.bo.RspBO;
import com.ohaotian.portalcommon.util.GsonUtil;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

@Service
public class AbilityMessagePoolServiceImpl
implements AbilityMessagePoolService {
    private static final Logger log = LoggerFactory.getLogger(AbilityMessagePoolServiceImpl.class);
    @Resource
    private AbilityProvideMqCacheDataMapper abilityProvideMqCacheDataMapper;
    @Autowired
    private AbilityMqBaseService abilityMqBaseService;
    @Autowired
    private CacheClient cacheClient;
    @Autowired
    private AdminClusterConfig adminClusterConfig;
    @Autowired
    private ConstantBaseVersion constantBaseVersion;
    @Autowired
    private PubSubResProperties pubSubResProperties;
    @Autowired
    private PubSubRes pubSubRes;
    @Resource
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    private final Map<Long, Boolean> resRetentionMap = new ConcurrentHashMap<Long, Boolean>();
    private final LinkedBlockingQueue<Map<String, String>> operationQueue = new LinkedBlockingQueue();
    private final Set<String> haveToPushSet = new HashSet<String>();
    public static final String STOP_FLAG = "_stop";
    @Value(value="${atp.mq.operation.waitTime:100}")
    private long operationQueueWaitTime;
    @Qualifier(value="mqDelayExecutorService")
    @Autowired
    private ExecutorService mqDelayExecutorService;

    @Override
    public void initWaitTimeQueueThread(AbilityProvideMqKafkaPo mqKafkaPo) {
        String abilityPath = this.abilityMqBaseService.getAbilityPath(mqKafkaPo.getAbilityId());
        String queueKey = this.getQueueKey(mqKafkaPo.getAbilityId(), mqKafkaPo.getMqId());
        CompletionStage runFuture = CompletableFuture.runAsync(() -> {
            this.resRetentionMap.put(mqKafkaPo.getMqId(), Boolean.TRUE);
            Boolean isAlwaysMaster = Boolean.FALSE;
            while (true) {
                try {
                    while (!Boolean.FALSE.equals(this.resRetentionMap.get(mqKafkaPo.getMqId()))) {
                        if (Boolean.FALSE.equals(this.adminClusterConfig.isMaster())) {
                            isAlwaysMaster = Boolean.FALSE;
                            continue;
                        }
                        if (Boolean.FALSE.equals(isAlwaysMaster)) {
                            log.info("\u6e05\u9664\u6570\u636e{}", (Object)queueKey);
                            this.cacheClient.set(queueKey, (Object)"[]");
                            isAlwaysMaster = Boolean.TRUE;
                        }
                        if (!ObjectUtils.isEmpty((Object)this.cacheClient.get(queueKey + STOP_FLAG))) {
                            log.info("\u6682\u505c\u6d88\u606f\u63a8\u9001");
                            TimeUnit.MILLISECONDS.sleep(10000L);
                            continue;
                        }
                        Object value = this.cacheClient.get(queueKey);
                        if (!ObjectUtils.isEmpty((Object)value)) {
                            Queue data = (Queue)JSON.parseObject((String)String.valueOf(value), (TypeReference)new TypeReference<Queue<String>>(){}, (Feature[])new Feature[0]);
                            String mqDataId = (String)data.poll();
                            if (!StringUtils.isNotBlank((CharSequence)mqDataId)) continue;
                            if (this.haveToPushSet.contains(mqDataId)) {
                                log.warn("\u8be5\u4fe1\u606f\u5df2\u7ecf\u53d1\u9001\uff0c\u51c6\u5907\u79fb\u9664\u4e2d\uff0c\u7b49\u5f851s \u8be5\u4fe1\u606fid:{}, \u5f85\u79fb\u9664\u4fe1\u606f\u96c6\u5408{}", (Object)mqDataId, this.haveToPushSet);
                                TimeUnit.MILLISECONDS.sleep(1000L);
                                continue;
                            }
                            Object newValue = this.cacheClient.get(queueKey);
                            Queue newData = (Queue)JSON.parseObject((String)String.valueOf(newValue), (TypeReference)new TypeReference<Queue<String>>(){}, (Feature[])new Feature[0]);
                            String newMqDataId = (String)newData.poll();
                            if (Boolean.FALSE.equals(newMqDataId.equals(mqDataId))) {
                                log.warn("\u63a8\u9001\u4fe1\u606f\u4e0d\u5339\u914d\uff0c\u8df3\u8fc7\u8be5\u6761\u4fe1\u606f\uff01\u539fid{}, \u65b0id{}", (Object)mqDataId, (Object)newMqDataId);
                                continue;
                            }
                            AbilityProvideMqCacheDataPo mqCacheDataPo = this.abilityProvideMqCacheDataMapper.selectByPrimaryKey(Long.parseLong(mqDataId));
                            if (mqKafkaPo.getWaitTime() >= 0) {
                                this.abilityMqBaseService.pushDataThread(Long.parseLong(mqDataId), abilityPath, new String(mqCacheDataPo.getData(), StandardCharsets.UTF_8));
                                log.info("\u5f02\u6b65\u63a8\u9001mqDataId\uff1a{}\u5b8c\u6210\uff0c\u7b49\u5f85{}ms\uff0c\u8fdb\u884c\u4e0b\u4e00\u6761\u63a8\u9001 key:{}", new Object[]{mqDataId, mqKafkaPo.getWaitTime(), queueKey});
                                TimeUnit.MILLISECONDS.sleep(mqKafkaPo.getWaitTime().intValue());
                            } else {
                                this.abilityMqBaseService.pushDataWait(Long.parseLong(mqDataId), abilityPath, new String(mqCacheDataPo.getData(), StandardCharsets.UTF_8));
                                log.info("\u987a\u5e8f\u63a8\u9001mqDataId\uff1a{}\u5b8c\u6210\uff0c\u8fdb\u884c\u4e0b\u4e00\u6761\u63a8\u9001 key:{}", (Object)mqDataId, (Object)queueKey);
                            }
                            HashMap<String, String> operation = new HashMap<String, String>();
                            operation.put("key", queueKey);
                            operation.put("operation", "poll");
                            operation.put("mqDataId", mqDataId);
                            this.operationQueue.add(operation);
                            this.haveToPushSet.add(mqDataId);
                            continue;
                        }
                        TimeUnit.MILLISECONDS.sleep(10000L);
                    }
                }
                catch (Exception e) {
                    log.error("{} \u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001\u5f02\u5e38\uff01ex:{}", (Object)queueKey, (Object)e.getMessage());
                    continue;
                }
                break;
            }
        }, this.mqDelayExecutorService).exceptionally(e -> {
            log.info("\u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001\u5f02\u5e38, MQ_ID:{} TOPIC:{} \u5f02\u5e38{}\uff01", new Object[]{mqKafkaPo.getMqId(), mqKafkaPo.getTopic(), e.getMessage()});
            e.printStackTrace();
            return null;
        });
        ((CompletableFuture)runFuture).whenComplete((result, e) -> log.info("\u961f\u5217\u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001 MQ_ID:{} \u91ca\u653e\u5b8c\u6210\uff01", (Object)mqKafkaPo.getMqId()));
        log.info("\u961f\u5217\u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001 key:{} \u542f\u52a8\u5b8c\u6210", (Object)queueKey);
    }

    @Override
    public synchronized void pushDataQueue(KafkaSubRspBo kafkaSubRspBo, AbilityProvideMqCacheDataPo mqCacheDataPo) {
        String key = this.getQueueKey(kafkaSubRspBo.getAbilityId(), kafkaSubRspBo.getMqId());
        if (Boolean.FALSE.equals(this.adminClusterConfig.isMaster())) {
            Object master = this.cacheClient.get(this.constantBaseVersion.getMASTER_NAME());
            Map node = GsonUtil.toStringMap((String)master.toString());
            String channel = ChannelNaming.get(this.pubSubResProperties.getWeb2admin() + "_" + (String)node.get("name"));
            RedisSyncDataBo redisSyncDataRspBo = new RedisSyncDataBo();
            redisSyncDataRspBo.setCode("MQ_OPERATION_QUEUE");
            JSONObject extData = new JSONObject();
            extData.put("kafkaSubRspBo", (Object)kafkaSubRspBo);
            extData.put("mqCacheDataPo", (Object)mqCacheDataPo);
            redisSyncDataRspBo.setExtData(extData);
            this.pubSubRes.getClient().publish(channel, GsonUtil.toJson((Object)redisSyncDataRspBo));
            log.info("\u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001 \u901a\u77e5\u4e3b\u8282\u70b9\u8fdb\u884c\u6570\u636e\u64cd\u4f5c key:{} mqDataId:{}", (Object)key, (Object)mqCacheDataPo.getMqDataId());
        } else {
            HashMap<String, String> operation = new HashMap<String, String>();
            operation.put("key", key);
            operation.put("operation", "add");
            operation.put("mqDataId", String.valueOf(mqCacheDataPo.getMqDataId()));
            this.operationQueue.add(operation);
        }
    }

    @Override
    public RspBO<String> stopPushDataQueue(MqBaseReqBo mqBaseReqBo) {
        String key = this.getQueueKey(mqBaseReqBo.getAbilityId(), mqBaseReqBo.getMqId()) + STOP_FLAG;
        this.cacheClient.set(key, (Object)"");
        log.warn("\u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001\u6682\u505c MQ_ID:{}", (Object)mqBaseReqBo.getMqId());
        return RspBO.success((Object)"\u6682\u505c\u6210\u529f\uff01");
    }

    @Override
    public RspBO<String> startPushDataQueue(MqBaseReqBo mqBaseReqBo) {
        String key = this.getQueueKey(mqBaseReqBo.getAbilityId(), mqBaseReqBo.getMqId()) + STOP_FLAG;
        this.cacheClient.delete(key);
        log.warn("\u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001\u5f00\u542f MQ_ID:{}", (Object)mqBaseReqBo.getMqId());
        return RspBO.success((Object)"\u5f00\u542f\u6210\u529f");
    }

    @Override
    public RspBO<String> deletePushDataQueue(MqBaseReqBo mqBaseReqBo) {
        String key = this.getQueueKey(mqBaseReqBo.getAbilityId(), mqBaseReqBo.getMqId());
        this.cacheClient.delete(key);
        log.info("\u5f02\u6b65\u5ef6\u8fdf\u63a8\u9001 key:{} \u5220\u9664\u5b8c\u6210", (Object)key);
        return RspBO.success((Object)"\u5220\u9664\u6210\u529f");
    }

    @Override
    public void releaseResources() {
        for (Map.Entry<Long, Boolean> entry : this.resRetentionMap.entrySet()) {
            log.info(entry.getKey().toString());
            entry.setValue(Boolean.FALSE);
        }
    }

    @Scheduled(cron="* * * * * ?")
    private void operationQueueExec() {
        try {
            Map<String, String> operationMap;
            if (Boolean.TRUE.equals(this.adminClusterConfig.isMaster()) && !ObjectUtils.isEmpty(operationMap = this.operationQueue.poll())) {
                log.info("\u6267\u884c\u7684operationMap\u5185\u5bb9\u4e3a:{}", operationMap);
                String key = operationMap.get("key");
                String operation = operationMap.get("operation");
                String mqDataId = operationMap.get("mqDataId");
                Object value = this.cacheClient.get(key);
                if (ObjectUtils.isEmpty((Object)value)) {
                    if ("add".equals(operation)) {
                        LinkedList<String> data = new LinkedList<String>();
                        data.add(String.valueOf(mqDataId));
                        this.cacheClient.set(key, (Object)JSON.toJSONString(data));
                    } else {
                        log.info("operationMap\u7684\u5185\u5bb9\u4e3a:{}", operationMap);
                    }
                } else if ("add".equals(operation)) {
                    Queue data = (Queue)JSON.parseObject((String)String.valueOf(value), (TypeReference)new TypeReference<Queue<String>>(){}, (Feature[])new Feature[0]);
                    data.add(String.valueOf(mqDataId));
                    this.cacheClient.set(key, (Object)JSON.toJSONString((Object)data));
                } else {
                    Queue data = (Queue)JSON.parseObject((String)String.valueOf(value), (TypeReference)new TypeReference<Queue<String>>(){}, (Feature[])new Feature[0]);
                    String judge = (String)data.poll();
                    this.cacheClient.set(key, (Object)JSON.toJSONString((Object)data));
                    if (Boolean.FALSE.equals(this.haveToPushSet.remove(judge))) {
                        log.warn("\u79fb\u9664mqId\uff1a{}\u5931\u8d25\uff0c\u9700\u8981\u5b9a\u4f4d\u95ee\u9898\uff01\uff01\uff01", (Object)judge);
                    }
                }
            }
        }
        catch (Exception e) {
            log.error("mq\u7ba1\u7406\u64cd\u4f5c\u6267\u884c\u5668\u5f02\u5e38 ex:{}", (Object)e.getMessage());
            e.printStackTrace();
        }
    }

    private String getQueueKey(Long abilityId, Long mqId) {
        String abilityPath = this.abilityMqBaseService.getAbilityPath(abilityId);
        return mqId + "_" + abilityPath;
    }
}

