package com.ohaotian.plugin.kafaka.impl;

import com.alibaba.fastjson.JSONObject;
import com.ohaotian.plugin.kafaka.KafkaProducerService;
import com.ohaotian.plugin.kafaka.config.KafkaConfigVO;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("kafkaProducerService")
/* loaded from: input_file:com/ohaotian/plugin/kafaka/impl/KafkaProducerServiceImpl.class */
public class KafkaProducerServiceImpl implements KafkaProducerService {
    private static AdminClient admin;

    @Autowired
    private KafkaConfigVO kafkaConfigVO;

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public KafkaProducer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaConfigVO.getBootstrapServers());
        properties.put("key.serializer", this.kafkaConfigVO.getValueSerializer());
        properties.put("value.serializer", this.kafkaConfigVO.getValueSerializer());
        properties.put("acks", this.kafkaConfigVO.getAcks());
        properties.put("retries", this.kafkaConfigVO.getRetries());
        properties.put("batch.size", this.kafkaConfigVO.getBatchSize());
        properties.put("linger.ms", this.kafkaConfigVO.getLingerMs());
        properties.put("buffer.memory", this.kafkaConfigVO.getBufferMemory());
        if (StringUtils.isNotBlank(this.kafkaConfigVO.getSecurityProtocol())) {
            properties.setProperty("security.protocol", this.kafkaConfigVO.getSecurityProtocol());
        }
        if (StringUtils.isNotBlank(this.kafkaConfigVO.getSaslMechanism())) {
            properties.setProperty("sasl.mechanism", this.kafkaConfigVO.getSaslMechanism());
        }
        if (StringUtils.isNotBlank(this.kafkaConfigVO.getUsername()) && StringUtils.isNotBlank(this.kafkaConfigVO.getPassword())) {
            properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"" + this.kafkaConfigVO.getUsername() + "\"\npassword=\"" + this.kafkaConfigVO.getPassword() + "\";");
        }
        return new KafkaProducer<>(properties);
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public void sendMessage(String str, String str2) {
        KafkaProducer<String, String> createProducer = createProducer();
        createProducer.send(new ProducerRecord(str, str2));
        createProducer.close();
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public void sendMessage(String str, String... strArr) throws InterruptedException {
        KafkaProducer<String, String> createProducer = createProducer();
        for (String str2 : strArr) {
            createProducer.send(new ProducerRecord(str, str2));
        }
        createProducer.close();
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public void sendMessage(String str, List<Map<Object, Object>> list) {
        KafkaProducer<String, String> createProducer = createProducer();
        Iterator<Map<Object, Object>> it = list.iterator();
        while (it.hasNext()) {
            createProducer.send(new ProducerRecord(str, JSONObject.toJSON(it.next()).toString()));
        }
        createProducer.close();
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public void sendMessage(String str, Map<Object, Object> map) {
        KafkaProducer<String, String> createProducer = createProducer();
        createProducer.send(new ProducerRecord(str, JSONObject.toJSON(map).toString()));
        createProducer.close();
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public void createTopic(String str, int i, int i2) {
        if (admin == null) {
            createAdmin();
        }
        for (Map.Entry entry : admin.createTopics(Arrays.asList(new NewTopic(str, i, (short) i2).configs(new HashMap()))).values().entrySet()) {
            try {
                ((KafkaFuture) entry.getValue()).get();
                System.out.println("topic " + ((String) entry.getKey()) + " created");
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof TopicExistsException) {
                    System.out.println("topic " + ((String) entry.getKey()) + " existed");
                }
            }
        }
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public void deleteTopic(String str, String... strArr) {
        if (admin == null) {
            createAdmin();
        }
        new HashMap();
        List asList = Arrays.asList(strArr);
        asList.add(str);
        for (Map.Entry entry : admin.deleteTopics(asList).values().entrySet()) {
            try {
                ((KafkaFuture) entry.getValue()).get();
                System.out.println("topic " + ((String) entry.getKey()) + " deleted");
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                    System.out.println("topic " + ((String) entry.getKey()) + " not exist");
                }
            }
        }
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public void describeTopic(String str) {
        if (admin == null) {
            createAdmin();
        }
        new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        for (Map.Entry entry : admin.describeTopics(arrayList).values().entrySet()) {
            try {
                ((KafkaFuture) entry.getValue()).get();
                System.out.println("topic " + ((String) entry.getKey()) + " describe");
                System.out.println("\t name: " + ((TopicDescription) ((KafkaFuture) entry.getValue()).get()).name());
                System.out.println("\t partitions: ");
                ((TopicDescription) ((KafkaFuture) entry.getValue()).get()).partitions().stream().forEach(topicPartitionInfo -> {
                    System.out.println("\t\t index: " + topicPartitionInfo.partition());
                    System.out.println("\t\t\t leader: " + topicPartitionInfo.leader());
                    System.out.println("\t\t\t replicas: " + topicPartitionInfo.replicas());
                    System.out.println("\t\t\t isr: " + topicPartitionInfo.isr());
                });
                System.out.println("\t internal: " + ((TopicDescription) ((KafkaFuture) entry.getValue()).get()).isInternal());
            } catch (InterruptedException | ExecutionException e) {
                if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                    System.out.println("topic " + ((String) entry.getKey()) + " not exist");
                }
            }
        }
    }

    private void createAdmin() {
        createAdmin(this.kafkaConfigVO.getBootstrapServers());
    }

    public void createAdmin(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        admin = AdminClient.create(properties);
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaProducerService
    public Set<String> listTopic() {
        if (admin == null) {
            createAdmin();
        }
        ListTopicsResult listTopics = admin.listTopics();
        try {
            Stream map = ((Set) listTopics.names().get()).stream().map(str -> {
                return str + "\t";
            });
            PrintStream printStream = System.out;
            Objects.requireNonNull(printStream);
            map.forEach(printStream::print);
            return (Set) listTopics.names().get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return null;
        }
    }
}
