package com.ohaotian.abilityadmin.pushClient.kafka.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.github.pagehelper.Page;
import com.github.pagehelper.page.PageMethod;
import com.ohaotian.abilityadmin.ability.model.bo.abilitydeploy.savedeploy.SaveAbilityDeployReqBO;
import com.ohaotian.abilityadmin.ability.service.AbilityDeployService;
import com.ohaotian.abilityadmin.mapper.AbilityMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideDeployMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideHttpCustomMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqCacheDataMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqKafkaMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqMapper;
import com.ohaotian.abilityadmin.model.po.AbilityProvideHttpCustomPO;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqCacheDataPo;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqKafkaPo;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqPo;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMessagePoolService;
import com.ohaotian.abilityadmin.pushClient.abilityMqBase.AbilityMqBaseService;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.AbilityProvideMqKafkaReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaConfigBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaProperties;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaResetSubReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSendReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSubRspBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.QryMqDataReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.QryMqDataRspBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.QryMqKafkaReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService;
import com.ohaotian.atp.base.service.IdGenerator;
import com.ohaotian.logplatform.util.GzipUtil;
import com.ohaotian.plugin.base.exception.ZTBusinessException;
import com.ohaotian.plugin.common.util.BeanMapper;
import com.ohaotian.portalcommon.config.cluster.AdminClusterConfig;
import com.ohaotian.portalcommon.model.bo.RspBO;
import com.ohaotian.portalcommon.model.page.RspPage;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

@Service
/* loaded from: input_file:com/ohaotian/abilityadmin/pushClient/kafka/service/impl/AbilityKafkaServiceImpl.class */
public class AbilityKafkaServiceImpl implements AbilityKafkaService, CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(AbilityKafkaServiceImpl.class);

    @Resource
    private AbilityProvideMqKafkaMapper abilityProvideMqKafkaMapper;

    @Resource
    private AbilityProvideMqCacheDataMapper abilityProvideMqCacheDataMapper;

    @Resource
    private AbilityProvideMqMapper abilityProvideMqMapper;

    @Autowired
    private IdGenerator idGenerator;

    @Resource
    private AbilityMapper abilityMapper;

    @Resource
    private AbilityProvideHttpCustomMapper abilityProvideHttpCustomMapper;

    @Autowired
    private AbilityDeployService abilityDeployService;

    @Resource
    private AbilityProvideDeployMapper abilityProvideDeployMapper;

    @Autowired
    private AbilityMqBaseService abilityMqBaseService;

    @Autowired
    private AbilityMessagePoolService abilityMessagePoolService;

    @Autowired
    private AdminClusterConfig adminClusterConfig;
    private final Map<Long, Boolean> startKafkaMap = new ConcurrentHashMap();

    @Autowired
    @Qualifier("mqConsumerExecutorService")
    private ExecutorService mqConsumerExecutorService;

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO sendMessage(KafkaSendReqBo kafkaSendReqBo) {
        AbilityProvideMqKafkaPo selectByPrimaryKey = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(kafkaSendReqBo.getMqId());
        if (StringUtils.isBlank(selectByPrimaryKey.getTopic())) {
            return RspBO.error("topic为空，检查kafka配置");
        }
        if (selectByPrimaryKey.getIsRunning().intValue() != 1) {
            return RspBO.error("当前kafka配置不可用，当前状态为:" + selectByPrimaryKey.getIsRunning());
        }
        JSONObject parseObject = JSON.parseObject(selectByPrimaryKey.getProperties());
        Properties properties = new Properties();
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        properties.putAll(parseObject);
        String msgKey = StringUtils.isBlank(selectByPrimaryKey.getMsgKey()) ? "Message" : selectByPrimaryKey.getMsgKey();
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            Throwable th = null;
            try {
                kafkaProducer.send(new ProducerRecord(selectByPrimaryKey.getTopic(), msgKey, kafkaSendReqBo.getData().toString()));
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                return RspBO.success("消息发送完成:TOPIC-" + selectByPrimaryKey.getTopic());
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
            return RspBO.error("消息发送异常:TOPIC-" + selectByPrimaryKey.getTopic() + " ex-" + e.getMessage());
        }
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public synchronized void subMessage(AbilityProvideMqKafkaPo abilityProvideMqKafkaPo, KafkaSubRspBo kafkaSubRspBo) {
        long snowflakeId = this.idGenerator.snowflakeId();
        try {
            AbilityProvideMqCacheDataPo abilityProvideMqCacheDataPo = new AbilityProvideMqCacheDataPo();
            abilityProvideMqCacheDataPo.setMqDataId(Long.valueOf(snowflakeId));
            abilityProvideMqCacheDataPo.setData(kafkaSubRspBo.getValue().getBytes(StandardCharsets.UTF_8));
            abilityProvideMqCacheDataPo.setMqId(Long.valueOf(kafkaSubRspBo.getMqId()));
            abilityProvideMqCacheDataPo.setStatus(2);
            abilityProvideMqCacheDataPo.setCreateTime(new Date());
            abilityProvideMqCacheDataPo.setUpdateTime(new Date());
            this.abilityProvideMqCacheDataMapper.insert(abilityProvideMqCacheDataPo);
            if (ObjectUtils.isEmpty(abilityProvideMqKafkaPo.getWaitTime())) {
                this.abilityMqBaseService.pushDataThread(Long.valueOf(snowflakeId), kafkaSubRspBo.getAbilityPath(), kafkaSubRspBo.getValue());
            } else {
                this.abilityMessagePoolService.pushDataQueue(kafkaSubRspBo, abilityProvideMqCacheDataPo);
            }
            RspBO.success("MqDataId:" + snowflakeId + " 推送完成！");
        } catch (Exception e) {
            log.error("MqDataId:{},kafka消费数据推送失败！", Long.valueOf(snowflakeId));
            e.printStackTrace();
            RspBO.error(e.getMessage());
        }
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO resetMessage(KafkaResetSubReqBo kafkaResetSubReqBo) {
        if (ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqDataIds())) {
            if (ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqDataId()) && ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqId())) {
                return RspBO.success("异步触发失败！没有数据可以推送。");
            }
            AbilityProvideMqCacheDataPo abilityProvideMqCacheDataPo = new AbilityProvideMqCacheDataPo();
            BeanUtils.copyProperties(kafkaResetSubReqBo, abilityProvideMqCacheDataPo);
            return RspBO.success("异步触发成功！共触发：" + resetPush(abilityProvideMqCacheDataPo) + " 条数据。");
        }
        int i = 0;
        for (Long l : kafkaResetSubReqBo.getMqDataIds()) {
            AbilityProvideMqCacheDataPo abilityProvideMqCacheDataPo2 = new AbilityProvideMqCacheDataPo();
            BeanUtils.copyProperties(kafkaResetSubReqBo, abilityProvideMqCacheDataPo2);
            abilityProvideMqCacheDataPo2.setMqDataId(l);
            i += resetPush(abilityProvideMqCacheDataPo2).intValue();
        }
        return RspBO.success("异步触发成功！共触发：" + i + " 条数据。");
    }

    public Integer resetPush(AbilityProvideMqCacheDataPo abilityProvideMqCacheDataPo) {
        List<AbilityProvideMqCacheDataPo> selectByAll = this.abilityProvideMqCacheDataMapper.selectByAll(abilityProvideMqCacheDataPo);
        new Thread(() -> {
            selectByAll.forEach(abilityProvideMqCacheDataPo2 -> {
                try {
                    AbilityProvideMqKafkaPo selectByPrimaryKey = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(abilityProvideMqCacheDataPo2.getMqId());
                    String abilityPath = this.abilityMqBaseService.getAbilityPath(selectByPrimaryKey.getAbilityId());
                    if (ObjectUtils.isEmpty(selectByPrimaryKey.getWaitTime())) {
                        this.abilityMqBaseService.pushDataThread(abilityProvideMqCacheDataPo2.getMqDataId(), abilityPath, new String(abilityProvideMqCacheDataPo2.getData(), StandardCharsets.UTF_8));
                    } else {
                        KafkaSubRspBo kafkaSubRspBo = new KafkaSubRspBo();
                        kafkaSubRspBo.setAbilityPath(this.abilityMqBaseService.getAbilityPath(selectByPrimaryKey.getAbilityId()));
                        kafkaSubRspBo.setAbilityId(selectByPrimaryKey.getAbilityId());
                        kafkaSubRspBo.setMqId(selectByPrimaryKey.getMqId().longValue());
                        this.abilityMessagePoolService.pushDataQueue(kafkaSubRspBo, abilityProvideMqCacheDataPo);
                    }
                    TimeUnit.MILLISECONDS.sleep(1000L);
                } catch (Exception e) {
                    log.error("二次触发失败！ MQ_DATA_ID:{}", abilityProvideMqCacheDataPo2.getMqDataId());
                    e.printStackTrace();
                }
            });
        }).start();
        return Integer.valueOf(selectByAll.size());
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO refreshAll() {
        try {
            for (Map.Entry<Long, Boolean> entry : this.startKafkaMap.entrySet()) {
                log.info(entry.getKey().toString());
                entry.setValue(Boolean.TRUE);
            }
            while (this.startKafkaMap.size() != 0) {
                TimeUnit.SECONDS.sleep(2L);
            }
            onMessage();
            return RspBO.success("kafka全量刷新成功");
        } catch (Exception e) {
            e.printStackTrace();
            return RspBO.error("kafka全量刷新失败，" + e.getMessage());
        }
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO refreshInc() {
        try {
            this.abilityProvideMqKafkaMapper.selectAllByType(0).forEach(abilityProvideMqKafkaPo -> {
                try {
                    if (abilityProvideMqKafkaPo.getIsRunning().intValue() == 1 && Boolean.FALSE.equals(Boolean.valueOf(this.startKafkaMap.containsKey(abilityProvideMqKafkaPo.getMqId())))) {
                        createConsumer(abilityProvideMqKafkaPo);
                    } else {
                        log.warn("增量 MQ_ID:{} 消息创建跳过！", abilityProvideMqKafkaPo.getMqId());
                    }
                } catch (Exception e) {
                    log.error("增量 MQ_ID:{} 消息创建失败！", abilityProvideMqKafkaPo.getMqId());
                    e.printStackTrace();
                }
            });
            return RspBO.success("kafka增量更新完成");
        } catch (Exception e) {
            e.printStackTrace();
            return RspBO.error("kafka增量更新失败，" + e.getMessage());
        }
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO<List<KafkaConfigBo>> qryMqList(QryMqKafkaReqBo qryMqKafkaReqBo) {
        if (!"Kafka".equals(qryMqKafkaReqBo.getMqType())) {
            throw new ZTBusinessException(qryMqKafkaReqBo.getMqType() + "类型暂时无数据!");
        }
        Page startPage = PageMethod.startPage(qryMqKafkaReqBo.getPageNo(), qryMqKafkaReqBo.getPageSize());
        List<AbilityProvideMqKafkaPo> qryMqList = this.abilityProvideMqKafkaMapper.qryMqList(qryMqKafkaReqBo);
        ArrayList arrayList = new ArrayList();
        Iterator<AbilityProvideMqKafkaPo> it = qryMqList.iterator();
        while (it.hasNext()) {
            KafkaConfigBo abilityProvideMqKafkaPoToKafkaConfig = abilityProvideMqKafkaPoToKafkaConfig(it.next());
            if (!ObjectUtils.isEmpty(abilityProvideMqKafkaPoToKafkaConfig.getAbilityId()) && !ObjectUtils.isEmpty(this.abilityMapper.queryByAbilityId(abilityProvideMqKafkaPoToKafkaConfig.getAbilityId()))) {
                abilityProvideMqKafkaPoToKafkaConfig.setAbilityEname(this.abilityMapper.queryByAbilityId(abilityProvideMqKafkaPoToKafkaConfig.getAbilityId()).getAbilityEname());
            }
            arrayList.add(abilityProvideMqKafkaPoToKafkaConfig);
        }
        return RspBO.success(new RspPage(Integer.valueOf(qryMqKafkaReqBo.getPageSize()), Integer.valueOf(qryMqKafkaReqBo.getPageNo()), arrayList, Long.valueOf(startPage.getTotal())));
    }

    public AbilityProvideMqKafkaPo kafkaConfigToAbilityProvideMqKafkaPo(KafkaConfigBo kafkaConfigBo) {
        AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = (AbilityProvideMqKafkaPo) BeanMapper.map(kafkaConfigBo, AbilityProvideMqKafkaPo.class);
        JSONObject parseObject = JSON.parseObject(JSON.toJSONString(kafkaConfigBo.getKafkaProperties(), new SerializerFeature[]{SerializerFeature.WriteMapNullValue}), new Feature[]{Feature.OrderedField});
        if (!ObjectUtils.isEmpty(kafkaConfigBo.getOtherConfigurations())) {
            parseObject.putAll(JSON.parseObject(kafkaConfigBo.getOtherConfigurations()));
        }
        if (!ObjectUtils.isEmpty(parseObject)) {
            abilityProvideMqKafkaPo.setProperties(parseObject.toString());
        }
        return abilityProvideMqKafkaPo;
    }

    public KafkaConfigBo abilityProvideMqKafkaPoToKafkaConfig(AbilityProvideMqKafkaPo abilityProvideMqKafkaPo) {
        if (ObjectUtils.isEmpty(abilityProvideMqKafkaPo)) {
            return null;
        }
        KafkaConfigBo kafkaConfigBo = (KafkaConfigBo) BeanMapper.map(abilityProvideMqKafkaPo, KafkaConfigBo.class);
        kafkaConfigBo.setKafkaProperties((KafkaProperties) JSON.parseObject(abilityProvideMqKafkaPo.getProperties(), KafkaProperties.class));
        JSONObject parseObject = JSON.parseObject(abilityProvideMqKafkaPo.getProperties());
        for (Field field : KafkaProperties.class.getDeclaredFields()) {
            JSONField annotation = field.getAnnotation(JSONField.class);
            if (!ObjectUtils.isEmpty(annotation)) {
                parseObject.remove(annotation.name());
            }
        }
        kafkaConfigBo.setOtherConfigurations(parseObject.toString());
        return kafkaConfigBo;
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO addMqInfo(AbilityProvideMqKafkaReqBo abilityProvideMqKafkaReqBo) {
        if (!"Kafka".equals(abilityProvideMqKafkaReqBo.getMqType())) {
            throw new ZTBusinessException(abilityProvideMqKafkaReqBo.getMqType() + "类型暂时无添加方案!");
        }
        AbilityProvideMqKafkaPo kafkaConfigToAbilityProvideMqKafkaPo = kafkaConfigToAbilityProvideMqKafkaPo(abilityProvideMqKafkaReqBo.getKafkaConfigBo());
        kafkaConfigToAbilityProvideMqKafkaPo.setDeployTime(new Date());
        kafkaConfigToAbilityProvideMqKafkaPo.setMqId(Long.valueOf(this.idGenerator.snowflakeId()));
        this.abilityProvideMqKafkaMapper.insertSelective(kafkaConfigToAbilityProvideMqKafkaPo);
        AbilityProvideMqPo abilityProvideMqPo = new AbilityProvideMqPo();
        abilityProvideMqPo.setMqId(kafkaConfigToAbilityProvideMqKafkaPo.getMqId());
        abilityProvideMqPo.setMqType(abilityProvideMqKafkaReqBo.getMqType());
        this.abilityProvideMqMapper.insert(abilityProvideMqPo);
        return RspBO.success("添加kafka消费消息队列信息成功");
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO modMqInfo(AbilityProvideMqKafkaReqBo abilityProvideMqKafkaReqBo) {
        if (!"Kafka".equals(abilityProvideMqKafkaReqBo.getMqType())) {
            throw new ZTBusinessException(abilityProvideMqKafkaReqBo.getMqType() + "类型暂时无修改方案!");
        }
        AbilityProvideMqKafkaPo kafkaConfigToAbilityProvideMqKafkaPo = kafkaConfigToAbilityProvideMqKafkaPo(abilityProvideMqKafkaReqBo.getKafkaConfigBo());
        kafkaConfigToAbilityProvideMqKafkaPo.setUpdateTime(new Date());
        this.abilityProvideMqKafkaMapper.updateByPrimaryKeySelective(kafkaConfigToAbilityProvideMqKafkaPo);
        return RspBO.success("修改kafka消息队列信息成功");
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO qryMqInfoByMqId(KafkaResetSubReqBo kafkaResetSubReqBo) {
        if (!"Kafka".equals(kafkaResetSubReqBo.getMqType())) {
            throw new ZTBusinessException(kafkaResetSubReqBo.getMqType() + "类型消息队列暂无数据!");
        }
        AbilityProvideMqKafkaPo selectByPrimaryKey = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(kafkaResetSubReqBo.getMqId());
        if (ObjectUtils.isEmpty(selectByPrimaryKey)) {
            return RspBO.success((Object) null);
        }
        KafkaConfigBo abilityProvideMqKafkaPoToKafkaConfig = abilityProvideMqKafkaPoToKafkaConfig(selectByPrimaryKey);
        if (abilityProvideMqKafkaPoToKafkaConfig.getType().intValue() == 1) {
            abilityProvideMqKafkaPoToKafkaConfig.setAbilityPath("/mq/kafka/sendMessage");
        } else {
            abilityProvideMqKafkaPoToKafkaConfig.setAbilityPath(this.abilityMqBaseService.getAbilityPath(selectByPrimaryKey.getAbilityId()));
        }
        return RspBO.success(abilityProvideMqKafkaPoToKafkaConfig);
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO deleteMqInfo(KafkaResetSubReqBo kafkaResetSubReqBo) {
        String str;
        if (!"Kafka".equals(kafkaResetSubReqBo.getMqType())) {
            throw new ZTBusinessException(kafkaResetSubReqBo.getMqType() + "类型消息队列暂无数据!");
        }
        if (!ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqId())) {
            Long mqId = kafkaResetSubReqBo.getMqId();
            AbilityProvideMqKafkaPo selectByPrimaryKey = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(mqId);
            if (selectByPrimaryKey.getType().intValue() == 1) {
                AbilityProvideHttpCustomPO abilityProvideHttpCustomPO = new AbilityProvideHttpCustomPO();
                abilityProvideHttpCustomPO.setMqId(mqId);
                AbilityProvideHttpCustomPO queryLimitOne = this.abilityProvideHttpCustomMapper.queryLimitOne(abilityProvideHttpCustomPO);
                if (!ObjectUtils.isEmpty(queryLimitOne)) {
                    Long abilityId = selectByPrimaryKey.getAbilityId();
                    Long provideDeployId = queryLimitOne.getProvideDeployId();
                    Long clusterId = this.abilityProvideDeployMapper.queryByProvideDeployId(provideDeployId).getClusterId();
                    SaveAbilityDeployReqBO saveAbilityDeployReqBO = new SaveAbilityDeployReqBO();
                    saveAbilityDeployReqBO.setAbilityId(abilityId);
                    saveAbilityDeployReqBO.setProvideDeployId(provideDeployId);
                    saveAbilityDeployReqBO.setClusterId(clusterId);
                    this.abilityDeployService.delAbilityDeploy(saveAbilityDeployReqBO);
                }
                this.abilityProvideMqKafkaMapper.deleteByPrimaryKey(mqId);
                str = "删除推送消息队列成功";
            } else {
                this.abilityProvideMqCacheDataMapper.deleteByMqId(mqId);
                this.abilityProvideMqKafkaMapper.deleteByPrimaryKey(mqId);
                str = "删除监听消息队列成功";
            }
            this.abilityProvideMqMapper.deleteByMqId(mqId);
        } else if (!ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqDataId())) {
            this.abilityProvideMqCacheDataMapper.deleteByPrimaryKey(kafkaResetSubReqBo.getMqDataId());
            str = "删除mqDataId为 " + kafkaResetSubReqBo.getMqDataId() + " 的数据成功成功";
        } else {
            if (ObjectUtils.isEmpty(kafkaResetSubReqBo.getMqDataIds())) {
                throw new ZTBusinessException("无正确的数据输入，请检查参数!");
            }
            kafkaResetSubReqBo.getMqDataIds().forEach(l -> {
                this.abilityProvideMqCacheDataMapper.deleteByPrimaryKey(l);
            });
            str = "批量删除mqData成功，共删除 " + kafkaResetSubReqBo.getMqDataIds().size() + " 条";
        }
        return RspBO.success(str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO qryMqDataList(QryMqDataReqBo qryMqDataReqBo) {
        List arrayList = new ArrayList();
        if (!"Kafka".equals(qryMqDataReqBo.getMqType())) {
            throw new ZTBusinessException(qryMqDataReqBo.getMqType() + "消息队列暂时无数据!");
        }
        List<AbilityProvideMqKafkaPo> qryMqList = this.abilityProvideMqKafkaMapper.qryMqList((QryMqKafkaReqBo) BeanMapper.map(qryMqDataReqBo, QryMqKafkaReqBo.class));
        ArrayList arrayList2 = new ArrayList();
        qryMqList.forEach(abilityProvideMqKafkaPo -> {
            arrayList2.add(abilityProvideMqKafkaPo.getMqId());
        });
        Page startPage = PageMethod.startPage(qryMqDataReqBo.getPageNo(), qryMqDataReqBo.getPageSize());
        if (!ObjectUtils.isEmpty(arrayList2)) {
            qryMqDataReqBo.setMqIds(arrayList2);
            arrayList = this.abilityProvideMqCacheDataMapper.qryMqDataList(qryMqDataReqBo);
        }
        RspPage rspPage = new RspPage(Integer.valueOf(qryMqDataReqBo.getPageSize()), Integer.valueOf(qryMqDataReqBo.getPageNo()), arrayList, Long.valueOf(startPage.getTotal()));
        for (Object obj : rspPage.getRows()) {
            AbilityProvideMqKafkaPo selectByPrimaryKey = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(Long.valueOf(Long.parseLong(((QryMqDataRspBo) obj).getMqId())));
            ((QryMqDataRspBo) obj).setTopic(selectByPrimaryKey.getTopic());
            ((QryMqDataRspBo) obj).setGroupId(selectByPrimaryKey.getGroupId());
        }
        return RspBO.success(GzipUtil.compress(JSON.toJSONString(rspPage), "UTF-8"));
    }

    public List<Long> getMqDataIdsByMqDataReqBo(QryMqDataReqBo qryMqDataReqBo) {
        ArrayList arrayList = new ArrayList();
        List<AbilityProvideMqKafkaPo> qryMqList = this.abilityProvideMqKafkaMapper.qryMqList((QryMqKafkaReqBo) BeanMapper.map(qryMqDataReqBo, QryMqKafkaReqBo.class));
        ArrayList arrayList2 = new ArrayList();
        qryMqList.forEach(abilityProvideMqKafkaPo -> {
            arrayList2.add(abilityProvideMqKafkaPo.getMqId());
        });
        if (!ObjectUtils.isEmpty(arrayList2)) {
            qryMqDataReqBo.setMqIds(arrayList2);
            Iterator<QryMqDataRspBo> it = this.abilityProvideMqCacheDataMapper.qryMqDataList(qryMqDataReqBo).iterator();
            while (it.hasNext()) {
                arrayList.add(Long.valueOf(Long.parseLong(it.next().getMqDataId())));
            }
        }
        return arrayList;
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO deleteMqData(QryMqDataReqBo qryMqDataReqBo) {
        KafkaResetSubReqBo kafkaResetSubReqBo = new KafkaResetSubReqBo();
        kafkaResetSubReqBo.setMqDataIds(getMqDataIdsByMqDataReqBo(qryMqDataReqBo));
        kafkaResetSubReqBo.setMqType(qryMqDataReqBo.getMqType());
        return deleteMqInfo(kafkaResetSubReqBo);
    }

    @Override // com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService
    public RspBO pushMqData(QryMqDataReqBo qryMqDataReqBo) {
        KafkaResetSubReqBo kafkaResetSubReqBo = new KafkaResetSubReqBo();
        kafkaResetSubReqBo.setMqDataIds(getMqDataIdsByMqDataReqBo(qryMqDataReqBo));
        return resetMessage(kafkaResetSubReqBo);
    }

    private void onMessage() {
        this.abilityProvideMqKafkaMapper.selectAllByType(0).forEach(abilityProvideMqKafkaPo -> {
            try {
                if (abilityProvideMqKafkaPo.getIsRunning().intValue() == 1 && Boolean.FALSE.equals(Boolean.valueOf(this.startKafkaMap.containsKey(abilityProvideMqKafkaPo.getMqId())))) {
                    createConsumer(abilityProvideMqKafkaPo);
                    if (!ObjectUtils.isEmpty(abilityProvideMqKafkaPo.getWaitTime())) {
                        this.abilityMessagePoolService.initWaitTimeQueueThread(abilityProvideMqKafkaPo);
                    }
                } else {
                    log.warn("初始化 MQ_ID:{} 消息创建跳过！", abilityProvideMqKafkaPo.getMqId());
                }
            } catch (Exception e) {
                log.error("初始化 MQ_ID:{} 消息创建失败！", abilityProvideMqKafkaPo.getMqId());
                e.printStackTrace();
            }
        });
    }

    private void createConsumer(AbilityProvideMqKafkaPo abilityProvideMqKafkaPo) {
        Properties properties = new Properties();
        properties.put("group.id", abilityProvideMqKafkaPo.getGroupId());
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.putAll(JSON.parseObject(abilityProvideMqKafkaPo.getProperties()));
        List asList = Arrays.asList(abilityProvideMqKafkaPo.getTopic().split(","));
        CompletableFuture.runAsync(() -> {
            this.startKafkaMap.put(abilityProvideMqKafkaPo.getMqId(), Boolean.FALSE);
            Duration ofSeconds = Duration.ofSeconds(1L);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            Throwable th = null;
            try {
                kafkaConsumer.subscribe(asList);
                log.info("开始监听数据, MQ_ID:{} TOPIC:{} 消息创建完成！", abilityProvideMqKafkaPo.getMqId(), asList);
                do {
                    ConsumerRecords poll = kafkaConsumer.poll(ofSeconds);
                    if (null != poll && poll.count() > 0) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            KafkaSubRspBo kafkaSubRspBo = new KafkaSubRspBo();
                            kafkaSubRspBo.setKey((String) consumerRecord.key());
                            kafkaSubRspBo.setValue((String) consumerRecord.value());
                            kafkaSubRspBo.setOffset(consumerRecord.offset());
                            kafkaSubRspBo.setAbilityPath(this.abilityMqBaseService.getAbilityPath(abilityProvideMqKafkaPo.getAbilityId()));
                            kafkaSubRspBo.setAbilityId(abilityProvideMqKafkaPo.getAbilityId());
                            kafkaSubRspBo.setMqId(abilityProvideMqKafkaPo.getMqId().longValue());
                            subMessage(abilityProvideMqKafkaPo, kafkaSubRspBo);
                        }
                    }
                } while (this.startKafkaMap.get(abilityProvideMqKafkaPo.getMqId()).equals(Boolean.FALSE));
                this.startKafkaMap.remove(abilityProvideMqKafkaPo.getMqId());
                if (kafkaConsumer != null) {
                    if (0 == 0) {
                        kafkaConsumer.close();
                        return;
                    }
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                throw th3;
            }
        }, this.mqConsumerExecutorService).exceptionally(th -> {
            log.info("监听数据异常, MQ_ID:{} TOPIC:{} 异常{}！", new Object[]{abilityProvideMqKafkaPo.getMqId(), asList, th.getMessage()});
            th.printStackTrace();
            return null;
        }).whenComplete((r5, th2) -> {
            log.info("topics:{} 释放完成！", asList);
        });
    }

    public void run(String... strArr) throws Exception {
        log.info("启动kafka监听！");
        onMessage();
    }
}
