package org.apache.rocketmq.streams.core.rstream;

import org.apache.rocketmq.streams.core.function.AggregateAction;
import org.apache.rocketmq.streams.core.function.FilterAction;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.function.accumulator.CountAccumulator;
import org.apache.rocketmq.streams.core.function.supplier.FilterSupplier;
import org.apache.rocketmq.streams.core.function.supplier.SinkSupplier;
import org.apache.rocketmq.streams.core.function.supplier.ValueChangeSupplier;
import org.apache.rocketmq.streams.core.function.supplier.WindowAccumulatorSupplier;
import org.apache.rocketmq.streams.core.function.supplier.WindowAggregateSupplier;
import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
import org.apache.rocketmq.streams.core.topology.virtual.GraphNode;
import org.apache.rocketmq.streams.core.topology.virtual.ProcessorNode;
import org.apache.rocketmq.streams.core.topology.virtual.ShuffleProcessorNode;
import org.apache.rocketmq.streams.core.topology.virtual.SinkGraphNode;
import org.apache.rocketmq.streams.core.util.OperatorNameMaker;
import org.apache.rocketmq.streams.core.window.WindowInfo;

/* loaded from: input_file:org/apache/rocketmq/streams/core/rstream/WindowStreamImpl.class */
public class WindowStreamImpl<K, V> implements WindowStream<K, V> {
    private final Pipeline pipeline;
    private final GraphNode parent;
    private final WindowInfo windowInfo;

    public WindowStreamImpl(Pipeline pipeline, GraphNode graphNode, WindowInfo windowInfo) {
        this.pipeline = pipeline;
        this.parent = graphNode;
        this.windowInfo = windowInfo;
    }

    @Override // org.apache.rocketmq.streams.core.rstream.WindowStream
    public WindowStream<K, Integer> count() {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.WINDOW_COUNT_PREFIX, this.pipeline.getJobId());
        WindowAccumulatorSupplier windowAccumulatorSupplier = new WindowAccumulatorSupplier(makeName, this.windowInfo, obj -> {
            return obj;
        }, new CountAccumulator());
        return this.pipeline.addWindowStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), windowAccumulatorSupplier) : new ProcessorNode(makeName, this.parent.getName(), windowAccumulatorSupplier), this.parent, this.windowInfo);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.WindowStream
    public WindowStream<K, V> filter(FilterAction<V> filterAction) {
        return this.pipeline.addWindowStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.FILTER_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new FilterSupplier(filterAction)), this.parent, this.windowInfo);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.WindowStream
    public <OUT> WindowStream<K, OUT> map(ValueMapperAction<V, OUT> valueMapperAction) {
        return this.pipeline.addWindowStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.MAP_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new ValueChangeSupplier(valueMapperAction)), this.parent, this.windowInfo);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.WindowStream
    public <OUT> WindowStream<K, OUT> aggregate(AggregateAction<K, V, OUT> aggregateAction) {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.WINDOW_AGGREGATE_PREFIX, this.pipeline.getJobId());
        WindowAggregateSupplier windowAggregateSupplier = new WindowAggregateSupplier(makeName, this.windowInfo, () -> {
            return null;
        }, aggregateAction);
        return this.pipeline.addWindowStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), windowAggregateSupplier) : new ProcessorNode(makeName, this.parent.getName(), windowAggregateSupplier), this.parent, this.windowInfo);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.WindowStream
    public <OUT> WindowStream<K, OUT> aggregate(Accumulator<V, OUT> accumulator) {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.WINDOW_AGGREGATE_PREFIX, this.pipeline.getJobId());
        WindowAccumulatorSupplier windowAccumulatorSupplier = new WindowAccumulatorSupplier(makeName, this.windowInfo, obj -> {
            return obj;
        }, accumulator);
        return this.pipeline.addWindowStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), windowAccumulatorSupplier) : new ProcessorNode(makeName, this.parent.getName(), windowAccumulatorSupplier), this.parent, this.windowInfo);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.WindowStream
    public RStream<V> toRStream() {
        return new RStreamImpl(this.pipeline, this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.WindowStream
    public void sink(String str, KeyValueSerializer<K, V> keyValueSerializer) {
        this.pipeline.addVirtualSink(new SinkGraphNode(OperatorNameMaker.makeName(OperatorNameMaker.SINK_PREFIX, this.pipeline.getJobId()), this.parent.getName(), str, new SinkSupplier(str, keyValueSerializer)), this.parent);
    }
}
