package org.apache.rocketmq.streams.core.function.supplier;

import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.AggregateAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.Window;
import org.apache.rocketmq.streams.core.window.WindowInfo;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.window.WindowState;
import org.apache.rocketmq.streams.core.window.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.class */
public class WindowAggregateSupplier<K, V, OV> implements Supplier<Processor<V>> {
    private static final Logger logger = LoggerFactory.getLogger(WindowAggregateSupplier.class.getName());
    private String name;
    private WindowInfo windowInfo;
    private Supplier<OV> initAction;
    private AggregateAction<K, V, OV> aggregateAction;

    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier$CommonWindowFire.class */
    public abstract class CommonWindowFire extends AbstractWindowProcessor<V> {
        protected WindowStore<K, OV> windowStore;

        public CommonWindowFire() {
        }

        protected void fireWindowEndTimeLassThanWatermark(long j, String str, K k) throws Throwable {
            List<Pair<WindowKey, WindowState<K, OV>>> searchLessThanWatermark = this.windowStore.searchLessThanWatermark(new WindowKey(str, toHexString(k), Long.valueOf(j), 0L));
            for (int size = searchLessThanWatermark.size() - 1; size >= 0; size--) {
                Pair<WindowKey, WindowState<K, OV>> pair = searchLessThanWatermark.get(size);
                WindowKey key = pair.getKey();
                WindowState<K, OV> value = pair.getValue();
                Properties header = this.context.getHeader();
                header.put(Constant.WINDOW_START_TIME, key.getWindowStart());
                header.put(Constant.WINDOW_END_TIME, key.getWindowEnd());
                Data convert = super.convert(new Data<>(value.getKey(), value.getValue(), Long.valueOf(value.getRecordLastTimestamp()), header));
                if (WindowAggregateSupplier.logger.isDebugEnabled()) {
                    WindowAggregateSupplier.logger.debug("fire window, windowKey={}, search watermark={}, window: [{} - {}], data to next:[{}]", new Object[]{key.toString(), Long.valueOf(j), Utils.format(key.getWindowStart().longValue()), Utils.format(key.getWindowEnd().longValue()), convert});
                }
                this.context.forward(convert);
                this.windowStore.deleteByKey(key);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier$SessionWindowAggregateProcessor.class */
    public class SessionWindowAggregateProcessor extends WindowAggregateSupplier<K, V, OV>.CommonWindowFire {
        private final String name;
        private final WindowInfo windowInfo;
        private Supplier<OV> initAction;
        private AggregateAction<K, V, OV> aggregateAction;
        private MessageQueue stateTopicMessageQueue;

        public SessionWindowAggregateProcessor(String str, WindowInfo windowInfo, Supplier<OV> supplier, AggregateAction<K, V, OV> aggregateAction) {
            super();
            this.name = str + SessionWindowAggregateProcessor.class.getSimpleName();
            this.windowInfo = windowInfo;
            this.initAction = supplier;
            this.aggregateAction = aggregateAction;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<V> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.windowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.stateTopicMessageQueue = new MessageQueue(getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, getSourceBrokerName(), getSourceQueueId().intValue());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(V v) throws Throwable {
            Object key = this.context.getKey();
            long dataTime = this.context.getDataTime();
            Pair<Long, Long> fireIfSessionOut = fireIfSessionOut(key, v, dataTime, this.context.getWatermark());
            if (fireIfSessionOut != null) {
                WindowState<K, OV> windowState = new WindowState<>(key, this.aggregateAction.calculate(key, v, this.initAction.get()), dataTime);
                if (dataTime < windowState.getRecordEarliestTimestamp()) {
                    windowState.setRecordEarliestTimestamp(dataTime);
                }
                WindowKey windowKey = new WindowKey(this.name, super.toHexString(key), fireIfSessionOut.getValue(), fireIfSessionOut.getKey());
                WindowAggregateSupplier.logger.info("new session window, with key={}, valueTime={}, sessionBegin=[{}], sessionEnd=[{}]", new Object[]{key, Long.valueOf(dataTime), Utils.format(fireIfSessionOut.getKey().longValue()), Utils.format(fireIfSessionOut.getValue().longValue())});
                this.windowStore.put(this.stateTopicMessageQueue, windowKey, windowState);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private Pair<Long, Long> fireIfSessionOut(K k, V v, long j, long j2) throws Throwable {
            List<Pair<WindowKey, WindowState<K, OV>>> searchMatchKeyPrefix = this.windowStore.searchMatchKeyPrefix(new WindowKey(this.name, null, 0L, 0L));
            if (searchMatchKeyPrefix.size() == 0) {
                return new Pair<>(Long.valueOf(j), Long.valueOf(j + this.windowInfo.getSessionTimeout().toMilliseconds()));
            }
            WindowAggregateSupplier.logger.debug("exist session state num={}", Integer.valueOf(searchMatchKeyPrefix.size()));
            Iterator<Pair<WindowKey, WindowState<K, OV>>> it = searchMatchKeyPrefix.iterator();
            int i = 0;
            long j3 = 0;
            long j4 = Long.MIN_VALUE;
            while (it.hasNext()) {
                Pair<WindowKey, WindowState<K, OV>> next = it.next();
                int i2 = i;
                i++;
                WindowAggregateSupplier.logger.debug("exist session state{}=[{}]", Integer.valueOf(i2), next);
                WindowKey key = next.getKey();
                WindowState<K, OV> value = next.getValue();
                long longValue = key.getWindowEnd().longValue();
                if (i == searchMatchKeyPrefix.size()) {
                    j3 = longValue;
                }
                if (longValue < j2) {
                    fire(k, key, value);
                    it.remove();
                    j4 = Long.max(longValue, j4);
                }
            }
            if (j < j4) {
                WindowAggregateSupplier.logger.warn("late data, discard. key=[{}], data=[{}], dataTime < maxFireSessionEnd: [{}] < [{}]", new Object[]{k, v, Long.valueOf(j), Long.valueOf(j4)});
                return null;
            }
            boolean z = false;
            WindowKey windowKey = null;
            for (int i3 = 0; i3 < searchMatchKeyPrefix.size(); i3++) {
                Pair<WindowKey, WindowState<K, OV>> pair = searchMatchKeyPrefix.get(i3);
                WindowKey key2 = pair.getKey();
                WindowState<K, OV> value2 = pair.getValue();
                if (key2.getWindowEnd().longValue() < j) {
                    z = true;
                } else if (key2.getWindowStart().longValue() <= j) {
                    WindowAggregateSupplier.logger.debug("data belong to exist session window.dataTime=[{}], window:[{} - {}]", new Object[]{Long.valueOf(j), Utils.format(key2.getWindowStart().longValue()), Utils.format(key2.getWindowEnd().longValue())});
                    value2.setValue(this.aggregateAction.calculate(k, v, value2.getValue()));
                    value2.setRecordLastTimestamp(j);
                    if (j < value2.getRecordEarliestTimestamp()) {
                        value2.setRecordEarliestTimestamp(j);
                    }
                    if (i3 == searchMatchKeyPrefix.size() - 1) {
                        long milliseconds = j + this.windowInfo.getSessionTimeout().toMilliseconds();
                        if (key2.getWindowEnd().longValue() < milliseconds) {
                            WindowAggregateSupplier.logger.debug("update exist session window, before:[{} - {}], after:[{} - {}]", new Object[]{Utils.format(key2.getWindowStart().longValue()), Utils.format(key2.getWindowEnd().longValue()), Utils.format(key2.getWindowStart().longValue()), Utils.format(milliseconds)});
                            windowKey = key2;
                            key2 = new WindowKey(key2.getOperatorName(), key2.getKey2String(), Long.valueOf(milliseconds), key2.getWindowStart());
                        }
                    }
                } else {
                    WindowAggregateSupplier.logger.warn("discard data: key=[{}], data=[{}], dataTime=[{}], watermark=[{}]", new Object[]{k, v, Long.valueOf(j), Long.valueOf(j2)});
                }
                this.windowStore.put(this.stateTopicMessageQueue, key2, value2);
                this.windowStore.deleteByKey(windowKey);
            }
            if (searchMatchKeyPrefix.size() == 0 || z) {
                return new Pair<>(Long.valueOf(j3), Long.valueOf(j + this.windowInfo.getSessionTimeout().toMilliseconds()));
            }
            return null;
        }

        private void fire(K k, WindowKey windowKey, WindowState<K, OV> windowState) throws Throwable {
            long longValue = windowKey.getWindowEnd().longValue();
            long longValue2 = windowState.getRecordEarliestTimestamp() == Long.MAX_VALUE ? windowKey.getWindowStart().longValue() : windowState.getRecordEarliestTimestamp();
            WindowAggregateSupplier.logger.info("fire session,windowKey={}, search keyPrefix={}, window: [{} - {}]", new Object[]{windowKey, k.toString(), Utils.format(longValue2), Utils.format(longValue)});
            Properties header = this.context.getHeader();
            header.put(Constant.WINDOW_START_TIME, Long.valueOf(longValue2));
            header.put(Constant.WINDOW_END_TIME, Long.valueOf(longValue));
            this.context.forward(super.convert(new Data<>(windowState.getKey(), windowState.getValue(), Long.valueOf(windowState.getRecordLastTimestamp()), header)));
            this.windowStore.deleteByKey(windowKey);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier$WindowAggregateProcessor.class */
    public class WindowAggregateProcessor extends WindowAggregateSupplier<K, V, OV>.CommonWindowFire {
        private final WindowInfo windowInfo;
        private String name;
        private Supplier<OV> initAction;
        private AggregateAction<K, V, OV> aggregateAction;
        private MessageQueue stateTopicMessageQueue;
        private final AtomicReference<Throwable> errorReference;

        public WindowAggregateProcessor(String str, WindowInfo windowInfo, Supplier<OV> supplier, AggregateAction<K, V, OV> aggregateAction) {
            super();
            this.errorReference = new AtomicReference<>(null);
            this.name = str + WindowAggregateProcessor.class.getSimpleName();
            this.windowInfo = windowInfo;
            this.initAction = supplier;
            this.aggregateAction = aggregateAction;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<V> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.windowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.stateTopicMessageQueue = new MessageQueue(getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, getSourceBrokerName(), getSourceQueueId().intValue());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(V v) throws Throwable {
            Throwable th = this.errorReference.get();
            if (th != null) {
                this.errorReference.set(null);
                throw th;
            }
            Object key = this.context.getKey();
            long dataTime = this.context.getDataTime();
            long watermark = this.context.getWatermark();
            if (dataTime < watermark) {
                WindowAggregateSupplier.logger.warn("discard data:[{}], watermark[{}] > time[{}],", new Object[]{v, Long.valueOf(watermark), Long.valueOf(dataTime)});
                return;
            }
            fireWindowEndTimeLassThanWatermark(watermark, this.name, key);
            for (Window window : super.calculateWindow(this.windowInfo, dataTime)) {
                WindowAggregateSupplier.logger.debug("timestamp=" + dataTime + ". time -> window: " + Utils.format(dataTime) + "->" + window);
                WindowKey windowKey = new WindowKey(this.name, super.toHexString(key), Long.valueOf(window.getEndTime()), Long.valueOf(window.getStartTime()));
                WindowState<K, OV> windowState = this.windowStore.get(windowKey);
                OV value = (windowState == null || windowState.getValue() == null) ? this.initAction.get() : windowState.getValue();
                Object calculate = this.aggregateAction.calculate(key, v, value);
                if (calculate == null || !calculate.equals(value)) {
                    this.windowStore.put(this.stateTopicMessageQueue, windowKey, new WindowState<>(key, calculate, dataTime));
                }
            }
            try {
                fireWindowEndTimeLassThanWatermark(watermark, this.name, key);
            } catch (Throwable th2) {
                this.errorReference.compareAndSet(null, th2);
            }
        }
    }

    public WindowAggregateSupplier(String str, WindowInfo windowInfo, Supplier<OV> supplier, AggregateAction<K, V, OV> aggregateAction) {
        this.name = str;
        this.windowInfo = windowInfo;
        this.initAction = supplier;
        this.aggregateAction = aggregateAction;
    }

    @Override // java.util.function.Supplier
    public Processor<V> get() {
        WindowInfo.WindowType windowType = this.windowInfo.getWindowType();
        switch (windowType) {
            case SLIDING_WINDOW:
            case TUMBLING_WINDOW:
                return new WindowAggregateProcessor(this.name, this.windowInfo, this.initAction, this.aggregateAction);
            case SESSION_WINDOW:
                return new SessionWindowAggregateProcessor(this.name, this.windowInfo, this.initAction, this.aggregateAction);
            default:
                throw new RuntimeException("window type is error, WindowType=" + windowType);
        }
    }
}
