package com.ohaotian.plugin.kafaka.impl;

import com.ohaotian.plugin.kafaka.KafkaConsumerListener;
import com.ohaotian.plugin.kafaka.config.KafkaConfigVO;
import com.ohaotian.plugin.kafaka.util.ExecutorProcessPool;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/ohaotian/plugin/kafaka/impl/KafkaConsumerListenerImpl.class */
public abstract class KafkaConsumerListenerImpl implements KafkaConsumerListener {
    private KafkaConfigVO kafkaConfigVO;

    public void init(String str, String str2, KafkaConfigVO kafkaConfigVO) {
        this.kafkaConfigVO = kafkaConfigVO;
        consume(str, str2);
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaConsumerListener
    public KafkaConsumer<String, String> createConsumer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaConfigVO.getBootstrapServers());
        properties.put("group.id", str);
        properties.put("enable.auto.commit", this.kafkaConfigVO.getEnableAutoCommit());
        properties.put("auto.commit.interval.ms", this.kafkaConfigVO.getAutoCommitIntervalMs());
        properties.put("session.timeout.ms", this.kafkaConfigVO.getSessionTimeoutMs());
        properties.put("auto.offset.reset", this.kafkaConfigVO.getAutoOffsetReset());
        properties.put("value.deserializer", this.kafkaConfigVO.getValueDerializer());
        properties.put("key.deserializer", this.kafkaConfigVO.getKeyDerializer());
        if (StringUtils.isNotBlank(this.kafkaConfigVO.getSecurityProtocol())) {
            properties.setProperty("security.protocol", this.kafkaConfigVO.getSecurityProtocol());
        }
        if (StringUtils.isNotBlank(this.kafkaConfigVO.getSaslMechanism())) {
            properties.setProperty("sasl.mechanism", this.kafkaConfigVO.getSaslMechanism());
        }
        if (StringUtils.isNotBlank(this.kafkaConfigVO.getUsername()) && StringUtils.isNotBlank(this.kafkaConfigVO.getPassword())) {
            properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"" + this.kafkaConfigVO.getUsername() + "\"\npassword=\"" + this.kafkaConfigVO.getPassword() + "\";");
        }
        return new KafkaConsumer<>(properties);
    }

    public abstract void onMessage(ConsumerRecord<String, String> consumerRecord);

    @Override // com.ohaotian.plugin.kafaka.KafkaConsumerListener
    public void consume(String str, String str2) {
        final KafkaConsumer<String, String> createConsumer = createConsumer(str);
        createConsumer.subscribe(Arrays.asList(str2));
        ExecutorProcessPool.getInstance().executeByCustomThread(new Runnable() { // from class: com.ohaotian.plugin.kafaka.impl.KafkaConsumerListenerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    Iterator it = createConsumer.poll(Duration.ofSeconds(100L)).iterator();
                    while (it.hasNext()) {
                        KafkaConsumerListenerImpl.this.onMessage((ConsumerRecord) it.next());
                    }
                }
            }
        });
    }

    @Override // com.ohaotian.plugin.kafaka.KafkaConsumerListener
    public void consume(String str, String... strArr) {
        final KafkaConsumer<String, String> createConsumer = createConsumer(str);
        createConsumer.subscribe(Arrays.asList(strArr));
        ExecutorProcessPool.getInstance().executeByCustomThread(new Runnable() { // from class: com.ohaotian.plugin.kafaka.impl.KafkaConsumerListenerImpl.2
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    Iterator it = createConsumer.poll(Duration.ofSeconds(100L)).iterator();
                    while (it.hasNext()) {
                        KafkaConsumerListenerImpl.this.onMessage((ConsumerRecord) it.next());
                    }
                }
            }
        });
    }
}
