package org.apache.rocketmq.streams.core.function.supplier;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.function.Supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueJoinAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.JoinType;
import org.apache.rocketmq.streams.core.window.StreamType;
import org.apache.rocketmq.streams.core.window.Window;
import org.apache.rocketmq.streams.core.window.WindowInfo;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.window.WindowState;
import org.apache.rocketmq.streams.core.window.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.class */
public class JoinWindowAggregateSupplier<K, V1, V2, OUT> implements Supplier<Processor<? super OUT>> {
    private static final Logger logger = LoggerFactory.getLogger(JoinWindowAggregateSupplier.class.getName());
    private String name;
    private WindowInfo windowInfo;
    private final ValueJoinAction<V1, V2, OUT> joinAction;
    private JoinType joinType;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier$JoinStreamWindowAggregateProcessor.class */
    public class JoinStreamWindowAggregateProcessor extends AbstractWindowProcessor<Object> {
        private String name;
        private final WindowInfo windowInfo;
        private final JoinType joinType;
        private ValueJoinAction<V1, V2, OUT> joinAction;
        private MessageQueue stateTopicMessageQueue;
        private WindowStore<K, V1> leftWindowStore;
        private WindowStore<K, V2> rightWindowStore;

        public JoinStreamWindowAggregateProcessor(String str, WindowInfo windowInfo, JoinType joinType, ValueJoinAction<V1, V2, OUT> valueJoinAction) {
            this.name = Utils.buildKey(str, JoinStreamWindowAggregateProcessor.class.getSimpleName());
            this.windowInfo = windowInfo;
            this.joinType = joinType;
            this.joinAction = valueJoinAction;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<Object> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.leftWindowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.rightWindowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.stateTopicMessageQueue = new MessageQueue(getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, getSourceBrokerName(), getSourceQueueId().intValue());
        }

        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(Object obj) throws Throwable {
            Object key = this.context.getKey();
            long dataTime = this.context.getDataTime();
            Properties header = this.context.getHeader();
            long watermark = this.context.getWatermark();
            WindowInfo.JoinStream joinStream = (WindowInfo.JoinStream) header.get(Constant.STREAM_TAG);
            if (dataTime < watermark) {
                return;
            }
            StreamType streamType = joinStream.getStreamType();
            store(key, obj, dataTime, streamType);
            fire(watermark, streamType);
        }

        private void store(Object obj, Object obj2, long j, StreamType streamType) throws Throwable {
            String buildKey = Utils.buildKey(this.name, streamType.name());
            for (Window window : super.calculateWindow(this.windowInfo, j)) {
                JoinWindowAggregateSupplier.logger.debug("timestamp=" + j + ". time -> window: " + Utils.format(j) + "->" + window);
                WindowKey windowKey = new WindowKey(buildKey, super.toHexString(obj), Long.valueOf(window.getEndTime()), Long.valueOf(window.getStartTime()));
                switch (streamType) {
                    case LEFT_STREAM:
                        this.leftWindowStore.put(this.stateTopicMessageQueue, windowKey, new WindowState<>(obj, obj2, j));
                        break;
                    case RIGHT_STREAM:
                        this.rightWindowStore.put(this.stateTopicMessageQueue, windowKey, new WindowState<>(obj, obj2, j));
                        break;
                }
            }
        }

        private void fire(long j, StreamType streamType) throws Throwable {
            List<Pair<WindowKey, WindowState<K, V1>>> searchLessThanWatermark = this.leftWindowStore.searchLessThanWatermark(new WindowKey(Utils.buildKey(this.name, StreamType.LEFT_STREAM.name()), null, Long.valueOf(j), 0L));
            List<Pair<WindowKey, WindowState<K, V2>>> searchLessThanWatermark2 = this.rightWindowStore.searchLessThanWatermark(new WindowKey(Utils.buildKey(this.name, StreamType.RIGHT_STREAM.name()), null, Long.valueOf(j), 0L));
            if (searchLessThanWatermark.size() == 0 && searchLessThanWatermark2.size() == 0) {
                return;
            }
            searchLessThanWatermark.sort(Comparator.comparing(pair -> {
                return ((WindowKey) pair.getKey()).getWindowEnd();
            }));
            searchLessThanWatermark2.sort(Comparator.comparing(pair2 -> {
                return ((WindowKey) pair2.getKey()).getWindowEnd();
            }));
            switch (this.joinType) {
                case INNER_JOIN:
                    for (Pair<WindowKey, WindowState<K, V1>> pair3 : searchLessThanWatermark) {
                        String keyAndWindow = pair3.getKey().getKeyAndWindow();
                        for (Pair<WindowKey, WindowState<K, V2>> pair4 : searchLessThanWatermark2) {
                            if (keyAndWindow.equals(pair4.getKey().getKeyAndWindow())) {
                                OUT apply = this.joinAction.apply(pair3.getValue().getValue(), pair4.getValue().getValue());
                                Properties header = this.context.getHeader();
                                header.put(Constant.WINDOW_START_TIME, pair3.getKey().getWindowStart());
                                header.put(Constant.WINDOW_END_TIME, pair3.getKey().getWindowEnd());
                                this.context.forward(super.convert(new Data<>(this.context.getKey(), apply, Long.valueOf(this.context.getDataTime()), header)));
                            }
                        }
                    }
                    break;
                case LEFT_JOIN:
                    switch (streamType) {
                        case LEFT_STREAM:
                            for (Pair<WindowKey, WindowState<K, V1>> pair5 : searchLessThanWatermark) {
                                String keyAndWindow2 = pair5.getKey().getKeyAndWindow();
                                Pair<WindowKey, WindowState<K, V2>> pair6 = null;
                                Iterator<Pair<WindowKey, WindowState<K, V2>>> it = searchLessThanWatermark2.iterator();
                                while (true) {
                                    if (it.hasNext()) {
                                        Pair<WindowKey, WindowState<K, V2>> next = it.next();
                                        if (next.getKey().getKeyAndWindow().equals(keyAndWindow2)) {
                                            pair6 = next;
                                        }
                                    }
                                }
                                V1 value = pair5.getValue().getValue();
                                V2 v2 = null;
                                if (pair6 != null) {
                                    v2 = pair6.getValue().getValue();
                                }
                                OUT apply2 = this.joinAction.apply(value, v2);
                                Properties header2 = this.context.getHeader();
                                header2.put(Constant.WINDOW_START_TIME, pair5.getKey().getWindowStart());
                                header2.put(Constant.WINDOW_END_TIME, pair5.getKey().getWindowEnd());
                                this.context.forward(super.convert(new Data<>(this.context.getKey(), apply2, Long.valueOf(this.context.getDataTime()), header2)));
                            }
                            break;
                    }
            }
            Iterator<Pair<WindowKey, WindowState<K, V1>>> it2 = searchLessThanWatermark.iterator();
            while (it2.hasNext()) {
                this.leftWindowStore.deleteByKey(it2.next().getKey());
            }
            Iterator<Pair<WindowKey, WindowState<K, V2>>> it3 = searchLessThanWatermark2.iterator();
            while (it3.hasNext()) {
                this.rightWindowStore.deleteByKey(it3.next().getKey());
            }
        }
    }

    public JoinWindowAggregateSupplier(String str, WindowInfo windowInfo, ValueJoinAction<V1, V2, OUT> valueJoinAction) {
        this.name = str;
        this.windowInfo = windowInfo;
        this.joinType = windowInfo.getJoinStream().getJoinType();
        this.joinAction = valueJoinAction;
    }

    @Override // java.util.function.Supplier
    public Processor<Object> get() {
        return new JoinStreamWindowAggregateProcessor(this.name, this.windowInfo, this.joinType, this.joinAction);
    }
}
