/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.plugin.kafaka.impl;

import com.ohaotian.plugin.kafaka.KafkaConsumerListener;
import com.ohaotian.plugin.kafaka.config.KafkaConfigVO;
import com.ohaotian.plugin.kafaka.util.ExecutorProcessPool;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public abstract class KafkaConsumerListenerImpl
implements KafkaConsumerListener {
    private KafkaConfigVO kafkaConfigVO;

    public void init(String groupId, String topic, KafkaConfigVO kafkaConfigVO) {
        this.kafkaConfigVO = kafkaConfigVO;
        this.consume(groupId, topic);
    }

    @Override
    public KafkaConsumer<String, String> createConsumer(String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.kafkaConfigVO.getBootstrapServers());
        props.put("group.id", groupId);
        props.put("enable.auto.commit", this.kafkaConfigVO.getEnableAutoCommit());
        props.put("auto.commit.interval.ms", this.kafkaConfigVO.getAutoCommitIntervalMs());
        props.put("session.timeout.ms", this.kafkaConfigVO.getSessionTimeoutMs());
        props.put("auto.offset.reset", this.kafkaConfigVO.getAutoOffsetReset());
        props.put("value.deserializer", this.kafkaConfigVO.getValueDerializer());
        props.put("key.deserializer", this.kafkaConfigVO.getKeyDerializer());
        if (StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getSecurityProtocol())) {
            props.setProperty("security.protocol", this.kafkaConfigVO.getSecurityProtocol());
        }
        if (StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getSaslMechanism())) {
            props.setProperty("sasl.mechanism", this.kafkaConfigVO.getSaslMechanism());
        }
        if (StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getUsername()) && StringUtils.isNotBlank((CharSequence)this.kafkaConfigVO.getPassword())) {
            String jassc = "org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"" + this.kafkaConfigVO.getUsername() + "\"\npassword=\"" + this.kafkaConfigVO.getPassword() + "\";";
            props.setProperty("sasl.jaas.config", jassc);
        }
        return new KafkaConsumer(props);
    }

    public abstract void onMessage(ConsumerRecord<String, String> var1);

    @Override
    public void consume(String groupId, String topic) {
        final KafkaConsumer<String, String> consumer = this.createConsumer(groupId);
        consumer.subscribe(Arrays.asList(topic));
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                block0: while (true) {
                    ConsumerRecords records = consumer.poll(Duration.ofSeconds(100L));
                    Iterator iterator = records.iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block0;
                        ConsumerRecord record = (ConsumerRecord)iterator.next();
                        KafkaConsumerListenerImpl.this.onMessage((ConsumerRecord<String, String>)record);
                    }
                    break;
                }
            }
        };
        ExecutorProcessPool.getInstance().executeByCustomThread(runnable);
    }

    @Override
    public void consume(String groupId, String ... topics) {
        final KafkaConsumer<String, String> consumer = this.createConsumer(groupId);
        consumer.subscribe(Arrays.asList(topics));
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                block0: while (true) {
                    ConsumerRecords records = consumer.poll(Duration.ofSeconds(100L));
                    Iterator iterator = records.iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block0;
                        ConsumerRecord record = (ConsumerRecord)iterator.next();
                        KafkaConsumerListenerImpl.this.onMessage((ConsumerRecord<String, String>)record);
                    }
                    break;
                }
            }
        };
        ExecutorProcessPool.getInstance().executeByCustomThread(runnable);
    }
}

