/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.abilityadmin.pushClient.kafka.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqCacheDataMapper;
import com.ohaotian.abilityadmin.mapper.AbilityProvideMqKafkaMapper;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqCacheDataPo;
import com.ohaotian.abilityadmin.model.po.AbilityProvideMqKafkaPo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaResetSubReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSendReqBo;
import com.ohaotian.abilityadmin.pushClient.kafka.model.bo.KafkaSubRspBo;
import com.ohaotian.abilityadmin.pushClient.kafka.service.AbilityKafkaService;
import com.ohaotian.plugin.util.HpartyCheckHttpUtil;
import com.ohaotian.portalcommon.model.bo.RspBO;
import com.ohaotian.portalcommon.util.ExecutorProcessPool;
import com.ohaotian.portalcommon.util.IdGenerator;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

@Service
public class AbilityKafkaServiceImpl
implements AbilityKafkaService,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(AbilityKafkaServiceImpl.class);
    @Autowired
    private AbilityProvideMqKafkaMapper abilityProvideMqKafkaMapper;
    @Autowired
    private AbilityProvideMqCacheDataMapper abilityProvideMqCacheDataMapper;
    private final Map<Long, KafkaConsumer<String, String>> startKafkaMap = new ConcurrentHashMap<Long, KafkaConsumer<String, String>>();

    @Override
    public RspBO sendMessage(KafkaSendReqBo kafkaSendReqBo) {
        AbilityProvideMqKafkaPo abilityProvideMqKafkaPo = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(kafkaSendReqBo.getKafkaId());
        if (StringUtils.isBlank((CharSequence)abilityProvideMqKafkaPo.getTopic())) {
            return RspBO.error((String)"topic\u4e3a\u7a7a\uff0c\u68c0\u67e5kafka\u914d\u7f6e");
        }
        if (abilityProvideMqKafkaPo.getIsRunning() != 1) {
            return RspBO.error((String)("\u5f53\u524dkafka\u914d\u7f6e\u4e0d\u53ef\u7528\uff0c\u5f53\u524d\u72b6\u6001\u4e3a:" + abilityProvideMqKafkaPo.getIsRunning()));
        }
        JSONObject kafkaConfigJson = JSON.parseObject((String)abilityProvideMqKafkaPo.getProperties());
        Properties producerProps = new Properties();
        producerProps.put("key.serializer", StringSerializer.class.getName());
        producerProps.put("value.serializer", StringSerializer.class.getName());
        producerProps.putAll((Map<?, ?>)kafkaConfigJson);
        String msgKey = StringUtils.isBlank((CharSequence)abilityProvideMqKafkaPo.getMsgkey()) ? "Message" : abilityProvideMqKafkaPo.getMsgkey();
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            producer.send(new ProducerRecord(abilityProvideMqKafkaPo.getTopic(), (Object)msgKey, (Object)JSON.toJSONString((Object)kafkaSendReqBo.getData())));
        }
        catch (Exception e) {
            e.printStackTrace();
            return RspBO.error((String)("\u6d88\u606f\u53d1\u9001\u5f02\u5e38:TOPIC-" + abilityProvideMqKafkaPo.getTopic() + " ex-" + e.getMessage()));
        }
        return RspBO.success((Object)("\u6d88\u606f\u53d1\u9001\u5b8c\u6210:TOPIC-" + abilityProvideMqKafkaPo.getTopic()));
    }

    @Override
    public RspBO subMessage(KafkaSubRspBo kafkaSubRspBo) {
        long mqDataId = new IdGenerator().snowflakeId();
        try {
            AbilityProvideMqCacheDataPo mqCacheDataPo = new AbilityProvideMqCacheDataPo();
            mqCacheDataPo.setMqDataId(mqDataId);
            mqCacheDataPo.setData(kafkaSubRspBo.getValue().getBytes(StandardCharsets.UTF_8));
            mqCacheDataPo.setMqId(kafkaSubRspBo.getMqId());
            mqCacheDataPo.setStatus(2);
            mqCacheDataPo.setCreateTime(new Date());
            mqCacheDataPo.setUpdateTime(new Date());
            this.abilityProvideMqCacheDataMapper.insert(mqCacheDataPo);
            this.pushData(mqDataId, kafkaSubRspBo.getAbilityPath(), kafkaSubRspBo.getValue());
        }
        catch (Exception e) {
            log.error("MqDataId:{},kafka\u6d88\u8d39\u6570\u636e\u63a8\u9001\u5931\u8d25\uff01", (Object)mqDataId);
            e.printStackTrace();
            return RspBO.error((String)e.getMessage());
        }
        return RspBO.success((Object)("MqDataId:" + mqDataId + " \u63a8\u9001\u5b8c\u6210\uff01"));
    }

    @Override
    public RspBO resetMessage(KafkaResetSubReqBo kafkaResetSubReqBo) {
        AbilityProvideMqCacheDataPo mqCacheDataPo = new AbilityProvideMqCacheDataPo();
        BeanUtils.copyProperties((Object)kafkaResetSubReqBo, (Object)mqCacheDataPo);
        List<AbilityProvideMqCacheDataPo> mqCacheDataList = this.abilityProvideMqCacheDataMapper.selectByAll(mqCacheDataPo);
        mqCacheDataList.forEach(mqCacheData -> {
            try {
                AbilityProvideMqKafkaPo mqKafkaPo = this.abilityProvideMqKafkaMapper.selectByPrimaryKey(mqCacheData.getMqId());
                this.pushData(mqCacheData.getMqDataId(), mqKafkaPo.getAbilityPath(), new String(mqCacheData.getData(), StandardCharsets.UTF_8));
            }
            catch (Exception e) {
                log.error("\u4e8c\u6b21\u89e6\u53d1\u5931\u8d25\uff01 MQ_DATA_ID:{}", (Object)mqCacheData.getMqDataId());
                e.printStackTrace();
            }
        });
        return RspBO.success((Object)("\u5f02\u6b65\u89e6\u53d1\u6210\u529f\uff01\u5171\u89e6\u53d1\uff1a" + mqCacheDataList.size() + " \u6761\u6570\u636e\u3002"));
    }

    @Override
    public RspBO refreshAll() {
        try {
            Iterator<Map.Entry<Long, KafkaConsumer<String, String>>> iterator = this.startKafkaMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, KafkaConsumer<String, String>> entry = iterator.next();
                entry.getValue().close();
                log.info("kafka mq_id:{} \u5173\u95ed\u5b8c\u6210\uff01", entry.getValue());
                iterator.remove();
            }
            this.onMessage();
            return RspBO.success((Object)"kafka\u5168\u91cf\u5237\u65b0\u6210\u529f");
        }
        catch (Exception e) {
            e.printStackTrace();
            return RspBO.error((String)("kafka\u5168\u91cf\u5237\u65b0\u5931\u8d25\uff0c" + e.getMessage()));
        }
    }

    @Override
    public RspBO refreshInc() {
        try {
            List<AbilityProvideMqKafkaPo> mqKafkaPoList = this.abilityProvideMqKafkaMapper.selectAllByType(0);
            mqKafkaPoList.forEach(mqKafkaPo -> {
                try {
                    if (mqKafkaPo.getIsRunning() == 1 && Boolean.FALSE.equals(this.startKafkaMap.containsKey(mqKafkaPo.getKafkaId()))) {
                        this.createConsumer((AbilityProvideMqKafkaPo)mqKafkaPo);
                    } else {
                        log.warn("\u589e\u91cf MQ_ID:{} \u6d88\u606f\u521b\u5efa\u8df3\u8fc7\uff01", (Object)mqKafkaPo.getKafkaId());
                    }
                }
                catch (Exception e) {
                    log.error("\u589e\u91cf MQ_ID:{} \u6d88\u606f\u521b\u5efa\u5931\u8d25\uff01", (Object)mqKafkaPo.getKafkaId());
                    e.printStackTrace();
                }
            });
            return RspBO.success((Object)"kafka\u589e\u91cf\u66f4\u65b0\u5b8c\u6210");
        }
        catch (Exception e) {
            e.printStackTrace();
            return RspBO.error((String)("kafka\u589e\u91cf\u66f4\u65b0\u5931\u8d25\uff0c" + e.getMessage()));
        }
    }

    private void onMessage() {
        List<AbilityProvideMqKafkaPo> mqKafkaPoList = this.abilityProvideMqKafkaMapper.selectAllByType(0);
        mqKafkaPoList.forEach(mqKafkaPo -> {
            try {
                if (mqKafkaPo.getIsRunning() == 1) {
                    this.createConsumer((AbilityProvideMqKafkaPo)mqKafkaPo);
                } else {
                    log.warn("\u521d\u59cb\u5316 MQ_ID:{} \u6d88\u606f\u521b\u5efa\u8df3\u8fc7\uff01", (Object)mqKafkaPo.getKafkaId());
                }
            }
            catch (Exception e) {
                log.error("\u521d\u59cb\u5316 MQ_ID:{} \u6d88\u606f\u521b\u5efa\u5931\u8d25\uff01", (Object)mqKafkaPo.getKafkaId());
                e.printStackTrace();
            }
        });
    }

    private void createConsumer(AbilityProvideMqKafkaPo mqKafkaPo) {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", mqKafkaPo.getGroupid());
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        JSONObject kafkaConfigJson = JSON.parseObject((String)mqKafkaPo.getProperties());
        consumerProps.putAll((Map<?, ?>)kafkaConfigJson);
        List<String> topics = Arrays.asList(mqKafkaPo.getTopic().split(","));
        Runnable runnable = () -> this.lambda$createConsumer$3(consumerProps, topics, mqKafkaPo);
        Thread thread = new Thread(runnable);
        thread.start();
    }

    private void pushData(final Long mqDataId, final String abilityPath, final String value) {
        String success = "success";
        Runnable runPush = new Runnable(){

            @Override
            public void run() {
                AbilityProvideMqCacheDataPo failMqCacheDataPo = new AbilityProvideMqCacheDataPo();
                failMqCacheDataPo.setMqDataId(mqDataId);
                failMqCacheDataPo.setUpdateTime(new Date());
                try {
                    HashMap<String, String> headers = new HashMap<String, String>();
                    headers.put("Content-Type", "application/json;charset=UTF-8");
                    log.info("MQ_DATA_ID: {} ,\u63a8\u9001\u6570\u636e\u5730\u5740: {}", (Object)mqDataId, (Object)abilityPath);
                    HpartyCheckHttpUtil.doPostJson((String)abilityPath, (String)value, (status, map) -> {
                        failMqCacheDataPo.setRemake(map.getBytes(StandardCharsets.UTF_8));
                        if (status.intValue() != HttpStatus.OK.value()) {
                            failMqCacheDataPo.setStatus(0);
                            AbilityKafkaServiceImpl.this.abilityProvideMqCacheDataMapper.updateByPrimaryKeySelective(failMqCacheDataPo);
                            return RspBO.error((String)map);
                        }
                        JSONObject rsp = JSON.parseObject((String)map);
                        if (Boolean.TRUE.equals(rsp.get((Object)"success"))) {
                            failMqCacheDataPo.setStatus(1);
                        } else {
                            failMqCacheDataPo.setStatus(0);
                        }
                        AbilityKafkaServiceImpl.this.abilityProvideMqCacheDataMapper.updateByPrimaryKeySelective(failMqCacheDataPo);
                        return RspBO.success((Object)map);
                    }, headers);
                }
                catch (Exception e) {
                    failMqCacheDataPo.setStatus(0);
                    if (ObjectUtils.isEmpty((Object)e.getMessage())) {
                        failMqCacheDataPo.setRemake("Exception null!".getBytes(StandardCharsets.UTF_8));
                    } else {
                        failMqCacheDataPo.setRemake(e.getMessage().getBytes(StandardCharsets.UTF_8));
                    }
                    AbilityKafkaServiceImpl.this.abilityProvideMqCacheDataMapper.updateByPrimaryKeySelective(failMqCacheDataPo);
                    throw new RuntimeException(e);
                }
            }
        };
        ExecutorProcessPool.getInstance().executeByCustomThread(runPush);
    }

    public void run(String ... args) throws Exception {
        log.info("\u542f\u52a8kafka\u76d1\u542c\uff01");
        this.onMessage();
    }

    /*
     * Unable to fully structure code
     */
    private /* synthetic */ void lambda$createConsumer$3(Properties consumerProps, List topics, AbilityProvideMqKafkaPo mqKafkaPo) {
        timeout = Duration.ofSeconds(1L);
        consumer = new KafkaConsumer(consumerProps);
        var7_6 = null;
        try {
            try {
                consumer.subscribe((Collection)topics);
                this.startKafkaMap.put(mqKafkaPo.getKafkaId(), (KafkaConsumer<String, String>)consumer);
                AbilityKafkaServiceImpl.log.info("\u5f00\u59cb\u76d1\u542c\u6570\u636e, MQ_ID:{} TOPIC:{} \u6d88\u606f\u521b\u5efa\u5b8c\u6210\uff01", (Object)mqKafkaPo.getKafkaId(), (Object)topics);
                block6: while (true) {
                    if (null == (msgList = consumer.poll(timeout)) || msgList.count() <= 0) {
                        continue;
                    }
                    var8_8 = msgList.iterator();
                    while (true) {
                        if (var8_8.hasNext()) ** break;
                        continue block6;
                        record = (ConsumerRecord)var8_8.next();
                        kafkaSubRspBo = new KafkaSubRspBo();
                        kafkaSubRspBo.setKey((String)record.key());
                        kafkaSubRspBo.setValue((String)record.value());
                        kafkaSubRspBo.setOffset(record.offset());
                        kafkaSubRspBo.setAbilityPath(mqKafkaPo.getAbilityPath());
                        kafkaSubRspBo.setMqId(mqKafkaPo.getKafkaId());
                        this.subMessage(kafkaSubRspBo);
                    }
                    break;
                }
            }
            catch (Throwable var8_9) {
                var7_6 = var8_9;
                throw var8_9;
            }
        }
        catch (Throwable var11_12) {
            if (consumer != null) {
                if (var7_6 != null) {
                    try {
                        consumer.close();
                    }
                    catch (Throwable var12_13) {
                        var7_6.addSuppressed(var12_13);
                    }
                } else {
                    consumer.close();
                }
            }
            throw var11_12;
        }
    }
}

