package org.apache.rocketmq.streams.core.function.supplier;

import java.util.function.Supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.SelectAction;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.state.StateStore;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier.class */
public class AccumulatorSupplier<K, V, R, OV> implements Supplier<Processor<V>> {
    private final String currentName;
    private final String parentName;
    private SelectAction<R, V> selectAction;
    private Accumulator<R, OV> accumulator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/AccumulatorSupplier$AccumulatorProcessor.class */
    public class AccumulatorProcessor extends AbstractProcessor<V> {
        private final String currentName;
        private final String parentName;
        private StateStore stateStore;
        private MessageQueue stateTopicMessageQueue;
        private SelectAction<R, V> selectAction;
        private Accumulator<R, OV> accumulator;

        public AccumulatorProcessor(String str, String str2, SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
            this.currentName = str;
            this.parentName = str2;
            this.selectAction = selectAction;
            this.accumulator = accumulator;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<V> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.stateStore = super.waitStateReplay();
            this.stateTopicMessageQueue = new MessageQueue(getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, getSourceBrokerName(), getSourceQueueId().intValue());
        }

        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(V v) throws Throwable {
            Object key = this.context.getKey();
            byte[] object2Byte = super.object2Byte(key);
            byte[] bArr = this.stateStore.get(object2Byte);
            Accumulator<R, OV> m1clone = (bArr == null || bArr.length == 0) ? this.accumulator.m1clone() : (Accumulator) super.byte2Object(bArr);
            m1clone.addValue(this.selectAction.select(v));
            OV result = m1clone.result(null);
            this.stateStore.put(this.stateTopicMessageQueue, object2Byte, super.object2Byte(m1clone));
            this.context.forward(super.convert(new Data<>(key, result, Long.valueOf(this.context.getDataTime()), this.context.getHeader())));
        }
    }

    public AccumulatorSupplier(String str, String str2, SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
        this.currentName = str;
        this.parentName = str2;
        this.selectAction = selectAction;
        this.accumulator = accumulator;
    }

    @Override // java.util.function.Supplier
    public Processor<V> get() {
        return new AccumulatorProcessor(this.currentName, this.parentName, this.selectAction, this.accumulator);
    }
}
