package org.apache.rocketmq.streams.core.function.supplier;

import java.util.function.Supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.AggregateAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.state.StateStore;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier.class */
public class AggregateSupplier<K, V, OV> implements Supplier<Processor<V>> {
    private final String currentName;
    private final String parentName;
    private Supplier<OV> initAction;
    private AggregateAction<K, V, OV> aggregateAction;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/AggregateSupplier$AggregateProcessor.class */
    public class AggregateProcessor extends AbstractProcessor<V> {
        private final String currentName;
        private final String parentName;
        private final Supplier<OV> initAction;
        private final AggregateAction<K, V, OV> aggregateAction;
        private StateStore stateStore;
        private MessageQueue stateTopicMessageQueue;

        public AggregateProcessor(String str, String str2, Supplier<OV> supplier, AggregateAction<K, V, OV> aggregateAction) {
            this.currentName = str;
            this.parentName = str2;
            this.initAction = supplier;
            this.aggregateAction = aggregateAction;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<V> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.stateStore = super.waitStateReplay();
            this.stateTopicMessageQueue = new MessageQueue(getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, getSourceBrokerName(), getSourceQueueId().intValue());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(V v) throws Throwable {
            Object key = this.context.getKey();
            byte[] object2Byte = super.object2Byte(key);
            byte[] bArr = this.stateStore.get(object2Byte);
            Object calculate = this.aggregateAction.calculate(key, v, (bArr == null || bArr.length == 0) ? this.initAction.get() : super.byte2Object(bArr));
            this.stateStore.put(this.stateTopicMessageQueue, object2Byte, super.object2Byte(calculate));
            this.context.forward(super.convert(new Data<>(key, calculate, Long.valueOf(this.context.getDataTime()), this.context.getHeader())));
        }
    }

    public AggregateSupplier(String str, String str2, Supplier<OV> supplier, AggregateAction<K, V, OV> aggregateAction) {
        this.currentName = str;
        this.parentName = str2;
        this.initAction = supplier;
        this.aggregateAction = aggregateAction;
    }

    @Override // java.util.function.Supplier
    public Processor<V> get() {
        return new AggregateProcessor(this.currentName, this.parentName, this.initAction, this.aggregateAction);
    }
}
