/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.abilityadmin.pushClient.kafka.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.github.pagehelper.Page;
import com.github.pagehelper.page.PageMethod;
import com.ohaotian.abilityadmin.ability.model.bo.abilitydeploy.savedeploy.SaveAbilityDeployReqBO;
import com.ohaotian.abilityadmin.ability.service.AbilityDeployService;
import com.ohaotian.abilityadmin.mapper.AbilityMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideDeployMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideHttpCustomMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqCacheDataMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqKafkaMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqMapper;
import com.ohaotian.abilityadmin.model.po.AbilityProvideDeployPO;
import com.ohaotian.abilityadmin.model.po.AbilityProvideHttpCustomPO;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqCacheDataPo;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqKafkaPo;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqPo;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMqBaseService;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.AbilityProvideMqKafkaReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaConfigBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaProperties;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaResetSubReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSendReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSubRspBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.QryMqDataReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.QryMqDataRspBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.QryMqKafkaReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService;
import com.ohaotian.atp.base.service.IdGenerator;
import com.ohaotian.logplatform.util.GzipUtil;
import com.ohaotian.plugin.base.exception.ZTBusinessException;
import com.ohaotian.plugin.common.util.BeanMapper;
import com.ohaotian.portalcommon.config.cluster.AdminClusterConfig;
import com.ohaotian.portalcommon.model.bo.RspBO;
import com.ohaotian.portalcommon.model.page.RspPage;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

@Service
public class AbilityKafkaServiceImpl
implements AbilityKafkaService,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(AbilityKafkaServiceImpl.class);
    @Resource
    private AbilityProvideMqKafkaMapper abilityProvideMqKafkaMapper;
    @Resource
    private AbilityProvideMqCacheDataMapper abilityProvideMqCacheDataMapper;
    @Resource
    private AbilityProvideMqMapper abilityProvideMqMapper;
    @Autowired
    private IdGenerator idGenerator;
    @Resource
    private AbilityMapper abilityMapper;
    @Resource
    private AbilityProvideHttpCustomMapper abilityProvideHttpCustomMapper;
    @Autowired
    private AbilityDeployService abilityDeployService;
    @Resource
    private AbilityProvideDeployMapper abilityProvideDeployMapper;
    @Autowired
    private AbilityMqBaseService abilityMqBaseService;
    @Autowired
    private AbilityMessagePoolService abilityMessagePoolService;
    @Autowired
    private AdminClusterConfig adminClusterConfig;
    private final Map<Long, Boolean> startKafkaMap = new ConcurrentHashMap<Long, Boolean>();
    @Qualifier(value="mqConsumerExecutorService")
    @Autowired
    private ExecutorService mqConsumerExecutorService;

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public RspBO sendMessage(KafkaSendReqBo kafkaSendReqBo) {
        AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(kafkaSendReqBo.getMqId());
        if (StringUtils.isBlank((CharSequence)abilityProvideMqKafkaPo.getTopic())) {
            return RspBO.error((String)"topic\u4e3a\u7a7a\uff0c\u68c0\u67e5kafka\u914d\u7f6e");
        }
        if (abilityProvideMqKafkaPo.getIsRunning() != 1) {
            return RspBO.error((String)("\u5f53\u524dkafka\u914d\u7f6e\u4e0d\u53ef\u7528\uff0c\u5f53\u524d\u72b6\u6001\u4e3a:" + abilityProvideMqKafkaPo.getIsRunning()));
        }
        JSONObject kafkaConfigJson = JSON.parseObject((String)abilityProvideMqKafkaPo.getProperties());
        Properties producerProps = new Properties();
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", StringSerializer.class.getName());
        producerProps.putAll((Map<?, ?>)kafkaConfigJson);
        String msgKey = StringUtils.isBlank((CharSequence)abilityProvideMqKafkaPo.getMsgKey()) ? "Message" : abilityProvideMqKafkaPo.getMsgKey();
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            Future send = producer.send(new ProducerRecord(abilityProvideMqKafkaPo.getTopic(), (Object)msgKey, (Object)kafkaSendReqBo.getData().toString()));
            RecordMetadata metadata = (RecordMetadata)send.get();
            RspBO rspBO = RspBO.success((Object)("\u6d88\u606f\u53d1\u9001\u5b8c\u6210:TOPIC-" + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset()));
            return rspBO;
        }
        catch (Exception e) {
            e.printStackTrace();
            return RspBO.error((String)("\u6d88\u606f\u53d1\u9001\u5f02\u5e38:TOPIC-" + abilityProvideMqKafkaPo.getTopic() + " ex-" + e.getMessage()));
        }
    }

    @Override
    public synchronized void subMessage(AbilityProvideMqKafkaPo mqKafkaPo, KafkaSubRspBo kafkaSubRspBo) {
        long mqDataId = this.idGenerator.snowflakeId();
        try {
            AbilityProvideMqCacheDataPo mqCacheDataPo = new AbilityProvideMqCacheDataPo();
            mqCacheDataPo.setMqDataId(mqDataId);
            mqCacheDataPo.setData(kafkaSubRspBo.getValue().getBytes(StandardCharsets.UTF_8));
            mqCacheDataPo.setMqId(kafkaSubRspBo.getMqId());
            mqCacheDataPo.setStatus(2);
            mqCacheDataPo.setCreateTime(new Date());
            mqCacheDataPo.setUpdateTime(new Date());
            this.abilityProvideMqCacheDataMapper.insert(mqCacheDataPo);
            if (ObjectUtils.isEmpty((Object)mqKafkaPo.getWaitTime())) {
                this.abilityMqBaseService.pushDataThread(mqDataId, kafkaSubRspBo.getAbilityPath(), kafkaSubRspBo.getValue());
            } else {
                this.abilityMessagePoolService.pushDataQueue(kafkaSubRspBo, mqCacheDataPo);
            }
        }
        catch (Exception e) {
            log.error("MqDataId:{},kafka\u6d88\u8d39\u6570\u636e\u63a8\u9001\u5931\u8d25\uff01", (Object)mqDataId);
            e.printStackTrace();
            RspBO.error((String)e.getMessage());
            return;
        }
        RspBO.success((Object)("MqDataId:" + mqDataId + " \u63a8\u9001\u5b8c\u6210\uff01"));
    }

    @Override
    public RspBO resetMessage(KafkaResetSubReqBo kafkaResetSubReqBo) {
        if (!ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqDataIds())) {
            int count = 0;
            for (Long mqDataId : kafkaResetSubReqBo.getMqDataIds()) {
                AbilityProvideMqCacheDataPo mqCacheDataPo = new AbilityProvideMqCacheDataPo();
                BeanUtils.copyProperties((Object)kafkaResetSubReqBo, (Object)mqCacheDataPo);
                mqCacheDataPo.setMqDataId(mqDataId);
                count += this.resetPush(mqCacheDataPo).intValue();
            }
            return RspBO.success((Object)("\u5f02\u6b65\u89e6\u53d1\u6210\u529f\uff01\u5171\u89e6\u53d1\uff1a" + count + " \u6761\u6570\u636e\u3002"));
        }
        if (!ObjectUtils.isEmpty((Object)kafkaResetSubReqBo.getMqDataId()) || !ObjectUtils.isEmpty((Object)kafkaResetSubReqBo.getMqId())) {
            AbilityProvideMqCacheDataPo mqCacheDataPo = new AbilityProvideMqCacheDataPo();
            BeanUtils.copyProperties((Object)kafkaResetSubReqBo, (Object)mqCacheDataPo);
            return RspBO.success((Object)("\u5f02\u6b65\u89e6\u53d1\u6210\u529f\uff01\u5171\u89e6\u53d1\uff1a" + this.resetPush(mqCacheDataPo) + " \u6761\u6570\u636e\u3002"));
        }
        return RspBO.success((Object)"\u5f02\u6b65\u89e6\u53d1\u5931\u8d25\uff01\u6ca1\u6709\u6570\u636e\u53ef\u4ee5\u63a8\u9001\u3002");
    }

    public Integer resetPush(AbilityProvideMqCacheDataPo mqCacheDataPo) {
        List<AbilityProvideMqCacheDataPo> mqCacheDataList = this.abilityProvideMqCacheDataMapper.selectByAll(mqCacheDataPo);
        Runnable runnable = () -> mqCacheDataList.forEach(mqCacheData -> {
            try {
                AbilityProvideMqKafkaPo mqKafkaPo = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(mqCacheData.getMqId());
                String abilityPath = this.abilityMqBaseService.getAbilityPath(mqKafkaPo.getAbilityId());
                if (ObjectUtils.isEmpty((Object)mqKafkaPo.getWaitTime())) {
                    this.abilityMqBaseService.pushDataThread(mqCacheData.getMqDataId(), abilityPath, new String(mqCacheData.getData(), StandardCharsets.UTF_8));
                } else {
                    KafkaSubRspBo kafkaSubRspBo = new KafkaSubRspBo();
                    kafkaSubRspBo.setAbilityPath(this.abilityMqBaseService.getAbilityPath(mqKafkaPo.getAbilityId()));
                    kafkaSubRspBo.setAbilityId(mqKafkaPo.getAbilityId());
                    kafkaSubRspBo.setMqId(mqKafkaPo.getMqId());
                    this.abilityMessagePoolService.pushDataQueue(kafkaSubRspBo, mqCacheDataPo);
                }
                TimeUnit.MILLISECONDS.sleep(1000L);
            }
            catch (Exception e) {
                log.error("\u4e8c\u6b21\u89e6\u53d1\u5931\u8d25\uff01 MQ_DATA_ID:{}", (Object)mqCacheData.getMqDataId());
                e.printStackTrace();
            }
        });
        Thread thread = new Thread(runnable);
        thread.start();
        return mqCacheDataList.size();
    }

    @Override
    public RspBO refreshAll() {
        try {
            for (Map.Entry<Long, Boolean> entry : this.startKafkaMap.entrySet()) {
                log.info(entry.getKey().toString());
                entry.setValue(Boolean.TRUE);
            }
            this.abilityMessagePoolService.releaseResources();
            while (this.startKafkaMap.size() != 0) {
                TimeUnit.SECONDS.sleep(2L);
            }
            this.onMessage();
            return RspBO.success((Object)"kafka\u5168\u91cf\u5237\u65b0\u6210\u529f");
        }
        catch (Exception e) {
            e.printStackTrace();
            return RspBO.error((String)("kafka\u5168\u91cf\u5237\u65b0\u5931\u8d25\uff0c" + e.getMessage()));
        }
    }

    @Override
    public RspBO refreshInc() {
        try {
            List<AbilityProvideMqKafkaPo> mqKafkaPoList = this.abilityProvideMqKafkaMapper.selectAllByType(0);
            mqKafkaPoList.forEach(mqKafkaPo -> {
                try {
                    if (mqKafkaPo.getIsRunning() == 1 && Boolean.FALSE.equals(this.startKafkaMap.containsKey(mqKafkaPo.getMqId()))) {
                        this.createConsumer((AbilityProvideMqKafkaPo)mqKafkaPo);
                    } else {
                        log.warn("\u589e\u91cf MQ_ID:{} \u6d88\u606f\u521b\u5efa\u8df3\u8fc7\uff01", (Object)mqKafkaPo.getMqId());
                    }
                }
                catch (Exception e) {
                    log.error("\u589e\u91cf MQ_ID:{} \u6d88\u606f\u521b\u5efa\u5931\u8d25\uff01", (Object)mqKafkaPo.getMqId());
                    e.printStackTrace();
                }
            });
            return RspBO.success((Object)"kafka\u589e\u91cf\u66f4\u65b0\u5b8c\u6210");
        }
        catch (Exception e) {
            e.printStackTrace();
            return RspBO.error((String)("kafka\u589e\u91cf\u66f4\u65b0\u5931\u8d25\uff0c" + e.getMessage()));
        }
    }

    @Override
    public RspBO<List<KafkaConfigBo>> qryMqList(QryMqKafkaReqBo qryMqKafkaReqBo) {
        if ("Kafka".equals(qryMqKafkaReqBo.getMqType())) {
            Page page = PageMethod.startPage((int)qryMqKafkaReqBo.getPageNo(), (int)qryMqKafkaReqBo.getPageSize());
            List<AbilityProvideMqKafkaPo> abilityProvideMqKafkaPos = this.abilityProvideMqKafkaMapper.qryMqList(qryMqKafkaReqBo);
            ArrayList<KafkaConfigBo> kafkaConfigBos = new ArrayList<KafkaConfigBo>();
            for (AbilityProvideMqKafkaPo abilityProvideMqKafkaPo : abilityProvideMqKafkaPos) {
                KafkaConfigBo kafkaConfigBo = this.abilityProvideMqKafkaPoToKafkaConfig(abilityProvideMqKafkaPo);
                if (!ObjectUtils.isEmpty((Object)kafkaConfigBo.getAbilityId()) && !ObjectUtils.isEmpty((Object)this.abilityMapper.queryByAbilityId(kafkaConfigBo.getAbilityId()))) {
                    String abilityEname = this.abilityMapper.queryByAbilityId(kafkaConfigBo.getAbilityId()).getAbilityEname();
                    kafkaConfigBo.setAbilityEname(abilityEname);
                }
                kafkaConfigBos.add(kafkaConfigBo);
            }
            RspPage rspPage = new RspPage(Integer.valueOf(qryMqKafkaReqBo.getPageSize()), Integer.valueOf(qryMqKafkaReqBo.getPageNo()), kafkaConfigBos, Long.valueOf(page.getTotal()));
            RspBO rspBO = RspBO.success((Object)rspPage);
            return rspBO;
        }
        throw new ZTBusinessException(qryMqKafkaReqBo.getMqType() + "\u7c7b\u578b\u6682\u65f6\u65e0\u6570\u636e!");
    }

    public AbilityProvideMqKafkaPo kafkaConfigToAbilityProvideMqKafkaPo(KafkaConfigBo kafkaConfigBo) {
        AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = (AbilityProvideMqKafkaPo)BeanMapper.map((Object)kafkaConfigBo, AbilityProvideMqKafkaPo.class);
        JSONObject properties = JSON.parseObject((String)JSON.toJSONString((Object)kafkaConfigBo.getKafkaProperties(), (SerializerFeature[])new SerializerFeature[]{SerializerFeature.WriteMapNullValue}), (Feature[])new Feature[]{Feature.OrderedField});
        if (!ObjectUtils.isEmpty((Object)kafkaConfigBo.getOtherConfigurations())) {
            properties.putAll((Map)JSON.parseObject((String)kafkaConfigBo.getOtherConfigurations()));
        }
        if (!ObjectUtils.isEmpty((Object)properties)) {
            abilityProvideMqKafkaPo.setProperties(properties.toString());
        }
        return abilityProvideMqKafkaPo;
    }

    public KafkaConfigBo abilityProvideMqKafkaPoToKafkaConfig(AbilityProvideMqKafkaPo abilityProvideMqKafkaPo) {
        if (!ObjectUtils.isEmpty((Object)abilityProvideMqKafkaPo)) {
            Field[] fields;
            KafkaConfigBo kafkaConfigBo = (KafkaConfigBo)BeanMapper.map((Object)abilityProvideMqKafkaPo, KafkaConfigBo.class);
            kafkaConfigBo.setKafkaProperties((KafkaProperties)JSON.parseObject((String)abilityProvideMqKafkaPo.getProperties(), KafkaProperties.class));
            JSONObject otherConfigurations = JSON.parseObject((String)abilityProvideMqKafkaPo.getProperties());
            for (Field field : fields = KafkaProperties.class.getDeclaredFields()) {
                JSONField annotation = field.getAnnotation(JSONField.class);
                if (ObjectUtils.isEmpty((Object)annotation)) continue;
                String name = annotation.name();
                otherConfigurations.remove((Object)name);
            }
            kafkaConfigBo.setOtherConfigurations(otherConfigurations.toString());
            return kafkaConfigBo;
        }
        return null;
    }

    @Override
    public RspBO addMqInfo(AbilityProvideMqKafkaReqBo abilityProvideMqKafkaReqBo) {
        if ("Kafka".equals(abilityProvideMqKafkaReqBo.getMqType())) {
            AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = this.kafkaConfigToAbilityProvideMqKafkaPo(abilityProvideMqKafkaReqBo.getKafkaConfigBo());
            abilityProvideMqKafkaPo.setDeployTime(new Date());
            abilityProvideMqKafkaPo.setMqId(this.idGenerator.snowflakeId());
            this.abilityProvideMqKafkaMapper.insertSelective(abilityProvideMqKafkaPo);
            AbilityProvideMqPo abilityProvideMqPo = new AbilityProvideMqPo();
            abilityProvideMqPo.setMqId(abilityProvideMqKafkaPo.getMqId());
            abilityProvideMqPo.setMqType(abilityProvideMqKafkaReqBo.getMqType());
            this.abilityProvideMqMapper.insert(abilityProvideMqPo);
            return RspBO.success((Object)"\u6dfb\u52a0kafka\u6d88\u8d39\u6d88\u606f\u961f\u5217\u4fe1\u606f\u6210\u529f");
        }
        throw new ZTBusinessException(abilityProvideMqKafkaReqBo.getMqType() + "\u7c7b\u578b\u6682\u65f6\u65e0\u6dfb\u52a0\u65b9\u6848!");
    }

    @Override
    public RspBO modMqInfo(AbilityProvideMqKafkaReqBo abilityProvideMqKafkaReqBo) {
        if ("Kafka".equals(abilityProvideMqKafkaReqBo.getMqType())) {
            AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = this.kafkaConfigToAbilityProvideMqKafkaPo(abilityProvideMqKafkaReqBo.getKafkaConfigBo());
            abilityProvideMqKafkaPo.setUpdateTime(new Date());
            this.abilityProvideMqKafkaMapper.updateByPrimaryKeySelective(abilityProvideMqKafkaPo);
            return RspBO.success((Object)"\u4fee\u6539kafka\u6d88\u606f\u961f\u5217\u4fe1\u606f\u6210\u529f");
        }
        throw new ZTBusinessException(abilityProvideMqKafkaReqBo.getMqType() + "\u7c7b\u578b\u6682\u65f6\u65e0\u4fee\u6539\u65b9\u6848!");
    }

    @Override
    public RspBO qryMqInfoByMqId(KafkaResetSubReqBo kafkaResetSubReqBo) {
        if ("Kafka".equals(kafkaResetSubReqBo.getMqType())) {
            AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(kafkaResetSubReqBo.getMqId());
            if (!ObjectUtils.isEmpty((Object)abilityProvideMqKafkaPo)) {
                KafkaConfigBo kafkaConfigBo = this.abilityProvideMqKafkaPoToKafkaConfig(abilityProvideMqKafkaPo);
                if (kafkaConfigBo.getType() == 1) {
                    kafkaConfigBo.setAbilityPath("/mq/kafka/sendMessage");
                } else {
                    kafkaConfigBo.setAbilityPath(this.abilityMqBaseService.getAbilityPath(abilityProvideMqKafkaPo.getAbilityId()));
                }
                return RspBO.success((Object)kafkaConfigBo);
            }
            return RspBO.success(null);
        }
        throw new ZTBusinessException(kafkaResetSubReqBo.getMqType() + "\u7c7b\u578b\u6d88\u606f\u961f\u5217\u6682\u65e0\u6570\u636e!");
    }

    @Override
    public RspBO deleteMqInfo(KafkaResetSubReqBo kafkaResetSubReqBo) {
        if ("Kafka".equals(kafkaResetSubReqBo.getMqType())) {
            String resultMessage;
            if (!ObjectUtils.isEmpty((Object)kafkaResetSubReqBo.getMqId())) {
                Long mqId = kafkaResetSubReqBo.getMqId();
                AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(mqId);
                if (abilityProvideMqKafkaPo.getType() == 1) {
                    AbilityProvideHttpCustomPO abilityProvideHttpCustomReqPO = new AbilityProvideHttpCustomPO();
                    abilityProvideHttpCustomReqPO.setMqId(mqId);
                    AbilityProvideHttpCustomPO abilityProvideHttpCustomRspPO = this.abilityProvideHttpCustomMapper.queryLimitOne(abilityProvideHttpCustomReqPO);
                    if (!ObjectUtils.isEmpty((Object)abilityProvideHttpCustomRspPO)) {
                        Long abilityId = abilityProvideMqKafkaPo.getAbilityId();
                        Long provideDeployId = abilityProvideHttpCustomRspPO.getProvideDeployId();
                        AbilityProvideDeployPO abilityProvideDeployPO = this.abilityProvideDeployMapper.queryByProvideDeployId(provideDeployId);
                        Long clusterId = abilityProvideDeployPO.getClusterId();
                        SaveAbilityDeployReqBO saveAbilityDeployReqBO = new SaveAbilityDeployReqBO();
                        saveAbilityDeployReqBO.setAbilityId(abilityId);
                        saveAbilityDeployReqBO.setProvideDeployId(provideDeployId);
                        saveAbilityDeployReqBO.setClusterId(clusterId);
                        this.abilityDeployService.delAbilityDeploy(saveAbilityDeployReqBO);
                    }
                    this.abilityProvideMqKafkaMapper.deleteByPrimaryKey(mqId);
                    resultMessage = "\u5220\u9664\u63a8\u9001\u6d88\u606f\u961f\u5217\u6210\u529f";
                } else {
                    this.abilityProvideMqCacheDataMapper.deleteByMqId(mqId);
                    this.abilityProvideMqKafkaMapper.deleteByPrimaryKey(mqId);
                    resultMessage = "\u5220\u9664\u76d1\u542c\u6d88\u606f\u961f\u5217\u6210\u529f";
                }
                this.abilityProvideMqMapper.deleteByMqId(mqId);
            } else if (!ObjectUtils.isEmpty((Object)kafkaResetSubReqBo.getMqDataId())) {
                this.abilityProvideMqCacheDataMapper.deleteByPrimaryKey(kafkaResetSubReqBo.getMqDataId());
                resultMessage = "\u5220\u9664mqDataId\u4e3a " + kafkaResetSubReqBo.getMqDataId() + " \u7684\u6570\u636e\u6210\u529f\u6210\u529f";
            } else if (!ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqDataIds())) {
                kafkaResetSubReqBo.getMqDataIds().forEach(mqDataId -> this.abilityProvideMqCacheDataMapper.deleteByPrimaryKey((Long)mqDataId));
                resultMessage = "\u6279\u91cf\u5220\u9664mqData\u6210\u529f\uff0c\u5171\u5220\u9664 " + kafkaResetSubReqBo.getMqDataIds().size() + " \u6761";
            } else {
                throw new ZTBusinessException("\u65e0\u6b63\u786e\u7684\u6570\u636e\u8f93\u5165\uff0c\u8bf7\u68c0\u67e5\u53c2\u6570!");
            }
            return RspBO.success((Object)resultMessage);
        }
        throw new ZTBusinessException(kafkaResetSubReqBo.getMqType() + "\u7c7b\u578b\u6d88\u606f\u961f\u5217\u6682\u65e0\u6570\u636e!");
    }

    @Override
    public RspBO qryMqDataList(QryMqDataReqBo qryMqDataReqBo) {
        List<Object> qryMqDataRspBoList = new ArrayList();
        if ("Kafka".equals(qryMqDataReqBo.getMqType())) {
            List<AbilityProvideMqKafkaPo> abilityProvideMqKafkaPos = this.abilityProvideMqKafkaMapper.qryMqList((QryMqKafkaReqBo)BeanMapper.map((Object)qryMqDataReqBo, QryMqKafkaReqBo.class));
            ArrayList<Long> mqIds = new ArrayList<Long>();
            abilityProvideMqKafkaPos.forEach(abilityProvideMqKafkaPo -> mqIds.add(abilityProvideMqKafkaPo.getMqId()));
            Page page = PageMethod.startPage((int)qryMqDataReqBo.getPageNo(), (int)qryMqDataReqBo.getPageSize());
            if (!ObjectUtils.isEmpty(mqIds)) {
                qryMqDataReqBo.setMqIds(mqIds);
                qryMqDataRspBoList = this.abilityProvideMqCacheDataMapper.qryMqDataList(qryMqDataReqBo);
            }
            RspPage rspPage = new RspPage(Integer.valueOf(qryMqDataReqBo.getPageSize()), Integer.valueOf(qryMqDataReqBo.getPageNo()), qryMqDataRspBoList, Long.valueOf(page.getTotal()));
            for (Object row : rspPage.getRows()) {
                QryMqDataRspBo qryMqDataRspBo = (QryMqDataRspBo)row;
                AbilityProvideMqKafkaPo abilityProvideMqKafkaPo2 = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(Long.parseLong(qryMqDataRspBo.getMqId()));
                ((QryMqDataRspBo)row).setTopic(abilityProvideMqKafkaPo2.getTopic());
                ((QryMqDataRspBo)row).setGroupId(abilityProvideMqKafkaPo2.getGroupId());
            }
            byte[] result = GzipUtil.compress((String)JSON.toJSONString((Object)rspPage), (String)"UTF-8");
            return RspBO.success((Object)result);
        }
        throw new ZTBusinessException(qryMqDataReqBo.getMqType() + "\u6d88\u606f\u961f\u5217\u6682\u65f6\u65e0\u6570\u636e!");
    }

    public List<Long> getMqDataIdsByMqDataReqBo(QryMqDataReqBo qryMqDataReqBo) {
        ArrayList<Long> mqDataIds = new ArrayList<Long>();
        List<AbilityProvideMqKafkaPo> abilityProvideMqKafkaPos = this.abilityProvideMqKafkaMapper.qryMqList((QryMqKafkaReqBo)BeanMapper.map((Object)qryMqDataReqBo, QryMqKafkaReqBo.class));
        ArrayList<Long> mqIds = new ArrayList<Long>();
        abilityProvideMqKafkaPos.forEach(abilityProvideMqKafkaPo -> mqIds.add(abilityProvideMqKafkaPo.getMqId()));
        if (!ObjectUtils.isEmpty(mqIds)) {
            qryMqDataReqBo.setMqIds(mqIds);
            List<QryMqDataRspBo> qryMqDataRspBoList = this.abilityProvideMqCacheDataMapper.qryMqDataList(qryMqDataReqBo);
            for (QryMqDataRspBo qryMqDataRspBo : qryMqDataRspBoList) {
                mqDataIds.add(Long.parseLong(qryMqDataRspBo.getMqDataId()));
            }
        }
        Collections.reverse(mqDataIds);
        return mqDataIds;
    }

    @Override
    public RspBO deleteMqData(QryMqDataReqBo qryMqDataReqBo) {
        KafkaResetSubReqBo kafkaResetSubReqBo = new KafkaResetSubReqBo();
        kafkaResetSubReqBo.setMqDataIds(this.getMqDataIdsByMqDataReqBo(qryMqDataReqBo));
        kafkaResetSubReqBo.setMqType(qryMqDataReqBo.getMqType());
        return this.deleteMqInfo(kafkaResetSubReqBo);
    }

    @Override
    public RspBO pushMqData(QryMqDataReqBo qryMqDataReqBo) {
        KafkaResetSubReqBo kafkaResetSubReqBo = new KafkaResetSubReqBo();
        kafkaResetSubReqBo.setMqDataIds(this.getMqDataIdsByMqDataReqBo(qryMqDataReqBo));
        return this.resetMessage(kafkaResetSubReqBo);
    }

    private void onMessage() {
        List<AbilityProvideMqKafkaPo> mqKafkaPoList = this.abilityProvideMqKafkaMapper.selectAllByType(0);
        mqKafkaPoList.forEach(mqKafkaPo -> {
            try {
                if (mqKafkaPo.getIsRunning() == 1 && Boolean.FALSE.equals(this.startKafkaMap.containsKey(mqKafkaPo.getMqId()))) {
                    this.createConsumer((AbilityProvideMqKafkaPo)mqKafkaPo);
                    if (!ObjectUtils.isEmpty((Object)mqKafkaPo.getWaitTime())) {
                        this.abilityMessagePoolService.initWaitTimeQueueThread((AbilityProvideMqKafkaPo)mqKafkaPo);
                    }
                } else {
                    log.warn("\u521d\u59cb\u5316 MQ_ID:{} \u6d88\u606f\u521b\u5efa\u8df3\u8fc7\uff01", (Object)mqKafkaPo.getMqId());
                }
            }
            catch (Exception e) {
                log.error("\u521d\u59cb\u5316 MQ_ID:{} \u6d88\u606f\u521b\u5efa\u5931\u8d25\uff01", (Object)mqKafkaPo.getMqId());
                e.printStackTrace();
            }
        });
    }

    private void createConsumer(AbilityProvideMqKafkaPo mqKafkaPo) {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", mqKafkaPo.getGroupId());
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        JSONObject kafkaConfigJson = JSON.parseObject((String)mqKafkaPo.getProperties());
        consumerProps.putAll((Map<?, ?>)kafkaConfigJson);
        List<String> topics = Arrays.asList(mqKafkaPo.getTopic().split(","));
        CompletionStage runFuture = CompletableFuture.runAsync(() -> {
            this.startKafkaMap.put(mqKafkaPo.getMqId(), Boolean.FALSE);
            Duration timeout = Duration.ofSeconds(1L);
            try (KafkaConsumer consumer = new KafkaConsumer(consumerProps);){
                consumer.subscribe((Collection)topics);
                log.info("\u5f00\u59cb\u76d1\u542c\u6570\u636e, MQ_ID:{} TOPIC:{} \u6d88\u606f\u521b\u5efa\u5b8c\u6210\uff01", (Object)mqKafkaPo.getMqId(), (Object)topics);
                do {
                    ConsumerRecords msgList;
                    if (null == (msgList = consumer.poll(timeout)) || msgList.count() <= 0) continue;
                    for (ConsumerRecord record : msgList) {
                        KafkaSubRspBo kafkaSubRspBo = new KafkaSubRspBo();
                        kafkaSubRspBo.setKey((String)record.key());
                        kafkaSubRspBo.setValue((String)record.value());
                        kafkaSubRspBo.setOffset(record.offset());
                        kafkaSubRspBo.setAbilityPath(this.abilityMqBaseService.getAbilityPath(mqKafkaPo.getAbilityId()));
                        kafkaSubRspBo.setAbilityId(mqKafkaPo.getAbilityId());
                        kafkaSubRspBo.setMqId(mqKafkaPo.getMqId());
                        this.subMessage(mqKafkaPo, kafkaSubRspBo);
                    }
                } while (this.startKafkaMap.get(mqKafkaPo.getMqId()).equals(Boolean.FALSE));
                this.startKafkaMap.remove(mqKafkaPo.getMqId());
            }
        }, this.mqConsumerExecutorService).exceptionally(e -> {
            log.info("\u76d1\u542c\u6570\u636e\u5f02\u5e38, MQ_ID:{} TOPIC:{} \u5f02\u5e38{}\uff01", new Object[]{mqKafkaPo.getMqId(), topics, e.getMessage()});
            e.printStackTrace();
            this.startKafkaMap.remove(mqKafkaPo.getMqId());
            return null;
        });
        ((CompletableFuture)runFuture).whenComplete((result, e) -> log.info("topics:{} \u91ca\u653e\u5b8c\u6210\uff01", (Object)topics));
    }

    public void run(String ... args) throws Exception {
        try {
            log.info("\u542f\u52a8kafka\u76d1\u542c\uff01");
            this.onMessage();
        }
        catch (Exception e) {
            log.error("\u542f\u52a8kafka\u76d1\u542c\u5f02\u5e38\uff01 ex:{}", (Object)e.getMessage());
        }
    }
}

