/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.plugin.kafaka.impl;

import com.alibaba.fastjson.JSONObject;
import com.ohaotian.plugin.kafaka.KafkaProducerService;
import com.ohaotian.plugin.kafaka.config.KafkaConfigVO;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service(value="kafkaProducerService")
public class KafkaProducerServiceImpl
implements KafkaProducerService {
    private static AdminClient admin;
    @Autowired
    private KafkaConfigVO kafkaConfigVO;

    @Override
    public KafkaProducer<String, String> createProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.kafkaConfigVO.getBootstrapServers());
        props.put("key.serializer", this.kafkaConfigVO.getValueSerializer());
        props.put("value.serializer", this.kafkaConfigVO.getValueSerializer());
        props.put("acks", this.kafkaConfigVO.getAcks());
        props.put("retries", this.kafkaConfigVO.getRetries());
        props.put("batch.size", this.kafkaConfigVO.getBatchSize());
        props.put("linger.ms", this.kafkaConfigVO.getLingerMs());
        props.put("buffer.memory", this.kafkaConfigVO.getBufferMemory());
        if (StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getSecurityProtocol())) {
            props.setProperty("security.protocol", this.kafkaConfigVO.getSecurityProtocol());
        }
        if (StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getSaslMechanism())) {
            props.setProperty("sasl.mechanism", this.kafkaConfigVO.getSaslMechanism());
        }
        if (StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getUsername()) && StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getPassword())) {
            String jassc = "org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"" + this.kafkaConfigVO.getUsername() + "\"\npassword=\"" + this.kafkaConfigVO.getPassword() + "\";";
            props.setProperty("sasl.jaas.config", jassc);
        }
        return new KafkaProducer(props);
    }

    @Override
    public void sendMessage(String topic, String jsonMessage) {
        KafkaProducer<String, String> producer = this.createProducer();
        producer.send(new ProducerRecord(topic, (Object)jsonMessage));
        producer.close();
    }

    @Override
    public void sendMessage(String topic, String ... jsonMessages) throws InterruptedException {
        KafkaProducer<String, String> producer = this.createProducer();
        for (String jsonMessage : jsonMessages) {
            producer.send(new ProducerRecord(topic, (Object)jsonMessage));
        }
        producer.close();
    }

    @Override
    public void sendMessage(String topic, List<Map<Object, Object>> mapMessageToJSONForArray) {
        KafkaProducer<String, String> producer = this.createProducer();
        for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
            String array = JSONObject.toJSON(mapMessageToJSON).toString();
            producer.send(new ProducerRecord(topic, (Object)array));
        }
        producer.close();
    }

    @Override
    public void sendMessage(String topic, Map<Object, Object> mapMessageToJSON) {
        KafkaProducer<String, String> producer = this.createProducer();
        String array = JSONObject.toJSON(mapMessageToJSON).toString();
        producer.send(new ProducerRecord(topic, (Object)array));
        producer.close();
    }

    @Override
    public void createTopic(String name, int numPartitions, int replicationFactor) {
        if (admin == null) {
            this.createAdmin();
        }
        HashMap configs = new HashMap();
        CreateTopicsResult result = admin.createTopics(Arrays.asList(new NewTopic(name, numPartitions, (short)replicationFactor).configs(configs)));
        for (Map.Entry entry : result.values().entrySet()) {
            try {
                ((KafkaFuture)entry.getValue()).get();
                System.out.println("topic " + (String)entry.getKey() + " created");
            }
            catch (InterruptedException | ExecutionException e) {
                if (!(ExceptionUtils.getRootCause((Throwable)e) instanceof TopicExistsException)) continue;
                System.out.println("topic " + (String)entry.getKey() + " existed");
            }
        }
    }

    @Override
    public void deleteTopic(String name, String ... names) {
        if (admin == null) {
            this.createAdmin();
        }
        HashMap configs = new HashMap();
        List<String> topics = Arrays.asList(names);
        topics.add(name);
        DeleteTopicsResult result = admin.deleteTopics(topics);
        for (Map.Entry entry : result.values().entrySet()) {
            try {
                ((KafkaFuture)entry.getValue()).get();
                System.out.println("topic " + (String)entry.getKey() + " deleted");
            }
            catch (InterruptedException | ExecutionException e) {
                if (!(ExceptionUtils.getRootCause((Throwable)e) instanceof UnknownTopicOrPartitionException)) continue;
                System.out.println("topic " + (String)entry.getKey() + " not exist");
            }
        }
    }

    @Override
    public void describeTopic(String name) {
        if (admin == null) {
            this.createAdmin();
        }
        HashMap configs = new HashMap();
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(name);
        DescribeTopicsResult result = admin.describeTopics(topics);
        for (Map.Entry entry : result.values().entrySet()) {
            try {
                ((KafkaFuture)entry.getValue()).get();
                System.out.println("topic " + (String)entry.getKey() + " describe");
                System.out.println("\t name: " + ((TopicDescription)((KafkaFuture)entry.getValue()).get()).name());
                System.out.println("\t partitions: ");
                ((TopicDescription)((KafkaFuture)entry.getValue()).get()).partitions().stream().forEach(p -> {
                    System.out.println("\t\t index: " + p.partition());
                    System.out.println("\t\t\t leader: " + p.leader());
                    System.out.println("\t\t\t replicas: " + p.replicas());
                    System.out.println("\t\t\t isr: " + p.isr());
                });
                System.out.println("\t internal: " + ((TopicDescription)((KafkaFuture)entry.getValue()).get()).isInternal());
            }
            catch (InterruptedException | ExecutionException e) {
                if (!(ExceptionUtils.getRootCause((Throwable)e) instanceof UnknownTopicOrPartitionException)) continue;
                System.out.println("topic " + (String)entry.getKey() + " not exist");
            }
        }
    }

    private void createAdmin() {
        this.createAdmin(this.kafkaConfigVO.getBootstrapServers());
    }

    public void createAdmin(String servers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        admin = AdminClient.create((Properties)props);
    }

    @Override
    public Set<String> listTopic() {
        if (admin == null) {
            this.createAdmin();
        }
        ListTopicsResult result = admin.listTopics();
        try {
            ((Set)result.names().get()).stream().map(x -> x + "\t").forEach(System.out::print);
            return (Set)result.names().get();
        }
        catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return null;
        }
    }
}

