/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink.testutils;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaDataReader
implements ExternalSystemDataReader<String> {
    private final KafkaConsumer<String, String> consumer;

    public KafkaDataReader(Properties properties, Collection<TopicPartition> partitions) {
        this.consumer = new KafkaConsumer(properties);
        this.consumer.assign(partitions);
        this.consumer.seekToBeginning(partitions);
    }

    public List<String> poll(Duration timeout) {
        ConsumerRecords consumerRecords;
        LinkedList<String> result = new LinkedList<String>();
        try {
            consumerRecords = this.consumer.poll(timeout);
        }
        catch (WakeupException we) {
            return Collections.emptyList();
        }
        Iterator iterator = consumerRecords.iterator();
        while (iterator.hasNext()) {
            result.add((String)((ConsumerRecord)iterator.next()).value());
        }
        return result;
    }

    public void close() throws Exception {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}

