/*
 * Decompiled with CFR 0.152.
 */
package io.gravitee.reporter.elasticsearch.indexer;

import io.gravitee.elasticsearch.client.Client;
import io.gravitee.node.api.Node;
import io.gravitee.reporter.api.Reportable;
import io.gravitee.reporter.elasticsearch.config.PipelineConfiguration;
import io.gravitee.reporter.elasticsearch.config.ReporterConfiguration;
import io.gravitee.reporter.elasticsearch.indexer.AbstractIndexer;
import io.gravitee.reporter.elasticsearch.indexer.DocumentBulkProcessor;
import io.gravitee.reporter.elasticsearch.indexer.name.IndexNameGenerator;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.rxjava3.core.Vertx;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class BulkIndexer
extends AbstractIndexer {
    @Autowired
    private Client client;
    @Autowired
    private Vertx vertx;
    private final PublishProcessor<Reportable> bulkProcessor = PublishProcessor.create();

    protected BulkIndexer(ReporterConfiguration configuration, PipelineConfiguration pipelineConfiguration, IndexNameGenerator indexNameGenerator, Node node) {
        super(configuration, pipelineConfiguration, indexNameGenerator, node);
    }

    @PostConstruct
    public void initialize() {
        this.bulkProcessor.onBackpressureBuffer().observeOn(Schedulers.io()).flatMap(reportable -> Flowable.just((Object)reportable).map(this::transform).onErrorResumeWith((Publisher)Flowable.empty())).buffer(this.configuration.getFlushInterval().longValue(), TimeUnit.SECONDS, this.configuration.getBulkActions().intValue()).filter(payload -> !payload.isEmpty()).subscribe((Subscriber)new DocumentBulkProcessor(this.client));
    }

    @Override
    public void index(Reportable reportable) {
        this.bulkProcessor.onNext((Object)reportable);
    }
}

