/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shenyu.plugin.logging.kafka.client;

import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.shenyu.common.utils.JsonUtils;
import org.apache.shenyu.plugin.logging.common.client.AbstractLogConsumeClient;
import org.apache.shenyu.plugin.logging.common.entity.KafkaLogPushData;
import org.apache.shenyu.plugin.logging.common.entity.LZ4CompressData;
import org.apache.shenyu.plugin.logging.common.entity.ShenyuRequestLog;
import org.apache.shenyu.plugin.logging.common.utils.LogCollectConfigUtils;
import org.apache.shenyu.plugin.logging.kafka.config.KafkaLogCollectConfig;
import org.springframework.beans.BeanUtils;
import org.springframework.lang.NonNull;
import org.springframework.util.CollectionUtils;

public class KafkaLogCollectClient
extends AbstractLogConsumeClient<KafkaLogCollectConfig.KafkaLogConfig, ShenyuRequestLog> {
    private static Map<String, String> apiTopicMap = new HashMap<String, String>();
    private KafkaProducer<String, String> producer;
    private String topic;

    public void initClient0(@NonNull KafkaLogCollectConfig.KafkaLogConfig config) {
        if (Objects.isNull((Object)config) || StringUtils.isBlank((CharSequence)config.getNamesrvAddr()) || StringUtils.isBlank((CharSequence)config.getTopic())) {
            LOG.error("kafka props is empty. failed init kafka producer");
            return;
        }
        String topic = "shenyu-access-logging";
        String nameserverAddress = config.getNamesrvAddr();
        if (StringUtils.isBlank((CharSequence)topic) || StringUtils.isBlank((CharSequence)nameserverAddress)) {
            LOG.error("init kafkaLogCollectClient error, please check topic or nameserverAddress");
            return;
        }
        this.topic = topic;
        Properties props = new Properties();
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("bootstrap.servers", config.getNamesrvAddr());
        if (!StringUtils.isBlank((CharSequence)config.getSecurityProtocol()) && !StringUtils.isBlank((CharSequence)config.getSaslMechanism())) {
            props.put("security.protocol", config.getSecurityProtocol());
            props.put("sasl.mechanism", config.getSaslMechanism());
            props.put("sasl.jaas.config", MessageFormat.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{0}\" password=\"{1}\";", config.getUserName(), config.getPassWord()));
        }
        this.producer = new KafkaProducer(props);
        ProducerRecord record = new ProducerRecord("shenyu-access-logging", (Object)StringSerializer.class.getName(), (Object)StringSerializer.class.getName());
        try {
            LOG.info("init kafkaLogCollectClient success");
        }
        catch (AuthorizationException | OutOfOrderSequenceException | ProducerFencedException e) {
            LOG.error("Init kafkaLogCollectClient error, We can't recover from these exceptions, so our only option is to close the producer and exit", e);
            this.producer.close();
        }
        catch (KafkaException e) {
            LOG.error("init kafkaLogCollectClient error\uff0cExceptions other than ProducerFencedException or OutOfOrderSequenceException or AuthorizationException, just abort the transaction and try again", (Throwable)e);
        }
    }

    public void consume0(@NonNull List<ShenyuRequestLog> logs) {
        logs.forEach(log -> {
            String logTopic = (String)StringUtils.defaultIfBlank((CharSequence)LogCollectConfigUtils.getTopic((String)log.getPath(), apiTopicMap), (CharSequence)this.topic);
            try {
                this.producer.send(this.toProducerRecord(logTopic, (ShenyuRequestLog)log));
            }
            catch (Exception e) {
                LOG.error("kafka push logs error", (Throwable)e);
            }
        });
    }

    private ProducerRecord<String, String> toProducerRecord(String logTopic, ShenyuRequestLog log) {
        byte[] bytes = JsonUtils.toJson((Object)log).getBytes(StandardCharsets.UTF_8);
        String compressAlg = (String)StringUtils.defaultIfBlank((CharSequence)KafkaLogCollectConfig.INSTANCE.getKafkaLogConfig().getCompressAlg(), (CharSequence)"");
        if ("LZ4".equalsIgnoreCase(compressAlg.trim())) {
            LZ4CompressData lz4CompressData = new LZ4CompressData(bytes.length, this.compressedByte(bytes));
            return new ProducerRecord(logTopic, (Object)log.getPath(), (Object)JsonUtils.toJson((Object)lz4CompressData));
        }
        KafkaLogPushData kafkaLogPushData = this.interceptKafkaLog(log);
        return new ProducerRecord(logTopic, (Object)log.getPath(), (Object)JsonUtils.toJson((Object)kafkaLogPushData));
    }

    private byte[] compressedByte(byte[] srcByte) {
        LZ4Factory factory = LZ4Factory.fastestInstance();
        LZ4Compressor compressor = factory.fastCompressor();
        return compressor.compress(srcByte);
    }

    public static void setTopic(Map<String, String> uriTopicMap) {
        apiTopicMap = uriTopicMap;
    }

    public void close0() {
        if (Objects.nonNull(this.producer)) {
            this.producer.close();
        }
    }

    private KafkaLogPushData interceptKafkaLog(ShenyuRequestLog requestLog) {
        KafkaLogPushData kafkaLogPushData = new KafkaLogPushData();
        BeanUtils.copyProperties((Object)requestLog, (Object)kafkaLogPushData);
        if (StringUtils.isNotBlank((CharSequence)requestLog.getOriginalParamJson())) {
            kafkaLogPushData.setRequestBody(requestLog.getOriginalParamJson());
        }
        HashMap<String, String> keyFieldMap = new HashMap<String, String>();
        if (!CollectionUtils.isEmpty((Collection)requestLog.getKeyFieldList()) && StringUtils.isNotBlank((CharSequence)kafkaLogPushData.getResponseBody())) {
            for (String keyField : requestLog.getKeyFieldList()) {
                String regex = keyField + ":([\\s\\S]*?),\"";
                Matcher matcher = Pattern.compile(regex).matcher(kafkaLogPushData.getResponseBody());
                ArrayList<String> valueList = new ArrayList<String>();
                while (matcher.find()) {
                    valueList.add(matcher.group(1).trim());
                }
                keyFieldMap.put(keyField, ((Object)valueList).toString());
            }
            kafkaLogPushData.setBusinessField(((Object)keyFieldMap).toString());
        }
        byte[] inParamBytes = kafkaLogPushData.getRequestBody().getBytes(StandardCharsets.UTF_8);
        byte[] outParamBytes = kafkaLogPushData.getResponseBody().getBytes(StandardCharsets.UTF_8);
        kafkaLogPushData.setRequestBody(new String(this.compressedByte(inParamBytes)));
        kafkaLogPushData.setResponseBody(new String(this.compressedByte(outParamBytes)));
        return kafkaLogPushData;
    }
}

