package com.aliyun.dts.subscribe.clients.recordprocessor;

import com.aliyun.dts.subscribe.clients.ConsumerContext;
import com.aliyun.dts.subscribe.clients.common.RecordListener;
import com.aliyun.dts.subscribe.clients.common.Util;
import com.aliyun.dts.subscribe.clients.record.DefaultUserRecord;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/dts/subscribe/clients/recordprocessor/EtlRecordProcessor.class */
public class EtlRecordProcessor implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(EtlRecordProcessor.class);
    private final LinkedBlockingQueue<DefaultUserRecord> toProcessRecord;
    private final Map<String, RecordListener> recordListeners;
    private ConsumerContext consumerContext;

    public EtlRecordProcessor(ConsumerContext consumerContext, LinkedBlockingQueue<DefaultUserRecord> linkedBlockingQueue, Map<String, RecordListener> map) {
        this.consumerContext = consumerContext;
        this.toProcessRecord = linkedBlockingQueue;
        this.recordListeners = map;
    }

    @Override // java.lang.Runnable
    public void run() {
        DefaultUserRecord peek;
        while (!this.consumerContext.isExited()) {
            int i = 0;
            while (true) {
                try {
                    peek = this.toProcessRecord.peek();
                    if (null != peek || this.consumerContext.isExited()) {
                        break;
                    }
                    Util.sleepMS(5L);
                    i++;
                    if (i % 1000 == 0 && this.consumerContext.hasValidTopicPartitions()) {
                        log.info("EtlRecordProcessor: haven't receive records from generator for  5s");
                    }
                } catch (Exception e) {
                    this.consumerContext.exit();
                }
            }
            if (this.consumerContext.isExited()) {
                return;
            }
            Iterator<RecordListener> it = this.recordListeners.values().iterator();
            while (it.hasNext()) {
                it.next().consume(peek);
            }
            this.toProcessRecord.poll();
        }
    }

    public void registerRecordListener(String str, RecordListener recordListener) {
        Util.require((null == str || null == recordListener) ? false : true, "null value not accepted");
        this.recordListeners.put(str, recordListener);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.consumerContext.exit();
    }
}
