package org.apache.rocketmq.streams.core.state;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
import org.apache.rocketmq.streams.core.serialization.ShuffleProtocol;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.RocketMQUtil;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/state/RocketMQStore.class */
public class RocketMQStore extends AbstractStore implements StateStore {
    private static final Logger logger = LoggerFactory.getLogger(RocketMQStore.class.getName());
    private final DefaultMQProducer producer;
    private final DefaultMQAdminExt mqAdmin;
    private final RocksDBStore rocksDBStore;
    private final Properties properties;
    private final ExecutorService executor = Executors.newFixedThreadPool(8);
    private final ShuffleProtocol protocol = new ShuffleProtocol();
    private final ConcurrentHashMap<MessageQueue, CountDownLatch2> recoveringQueueMutex = new ConcurrentHashMap<>();

    public RocketMQStore(DefaultMQProducer defaultMQProducer, RocksDBStore rocksDBStore, DefaultMQAdminExt defaultMQAdminExt, Properties properties) {
        this.producer = defaultMQProducer;
        this.mqAdmin = defaultMQAdminExt;
        this.rocksDBStore = rocksDBStore;
        this.properties = properties;
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public void init() throws Throwable {
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public void recover(Set<MessageQueue> set, Set<MessageQueue> set2) throws Throwable {
        loadState(set);
        removeState(set2);
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public void waitIfNotReady(MessageQueue messageQueue) throws RecoverStateStoreThrowable {
        CountDownLatch2 countDownLatch2 = this.recoveringQueueMutex.get(convertSourceTopicQueue2StateTopicQueue(messageQueue));
        long j = 0;
        long j2 = 0;
        try {
            try {
                j = System.currentTimeMillis();
                countDownLatch2.await(5000L, TimeUnit.MILLISECONDS);
                j2 = System.currentTimeMillis();
                long j3 = j2 - j;
                if (j3 > 2000) {
                    logger.error("recover finish, consume time:" + j3 + " ms.");
                }
            } catch (Throwable th) {
                throw new RecoverStateStoreThrowable(th);
            }
        } catch (Throwable th2) {
            long j4 = j2 - j;
            if (j4 > 2000) {
                logger.error("recover finish, consume time:" + j4 + " ms.");
            }
            throw th2;
        }
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public byte[] get(byte[] bArr) throws Throwable {
        return (bArr == null || bArr.length == 0) ? new byte[0] : this.rocksDBStore.get(bArr);
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public void put(MessageQueue messageQueue, byte[] bArr, byte[] bArr2) throws Throwable {
        super.putInCalculating(buildKey(messageQueue), bArr);
        this.rocksDBStore.put(bArr, bArr2);
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public List<Pair<byte[], byte[]>> searchStateLessThanWatermark(String str, long j, ValueMapperAction<byte[], WindowKey> valueMapperAction) throws Throwable {
        return StringUtils.isEmpty(str) ? new ArrayList() : this.rocksDBStore.searchStateLessThanWatermark(str, j, valueMapperAction);
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public List<Pair<String, byte[]>> searchByKeyPrefix(String str, ValueMapperAction<String, byte[]> valueMapperAction, ValueMapperAction<byte[], String> valueMapperAction2) throws Throwable {
        return StringUtils.isEmpty(str) ? new ArrayList() : this.rocksDBStore.searchByKeyPrefix(str, valueMapperAction, valueMapperAction2);
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public void delete(byte[] bArr) throws Throwable {
        if (bArr == null || bArr.length == 0) {
            return;
        }
        String[] split = Utils.split(super.whichStateTopicQueueBelongTo(bArr));
        String str = split[1];
        MessageQueue messageQueue = new MessageQueue(split[1], split[0], Integer.parseInt(split[2]));
        Message message = new Message(str, Constant.EMPTY_BODY.getBytes(StandardCharsets.UTF_8));
        message.setKeys(Utils.toHexString(bArr));
        message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, bArr.getClass().getName());
        message.putUserProperty(Constant.EMPTY_BODY, Constant.TRUE);
        this.producer.send(message, messageQueue);
        this.rocksDBStore.deleteByKey(bArr);
        super.removeAllKey(bArr);
        logger.debug("delete key: " + new String(bArr, StandardCharsets.UTF_8) + ",MessageQueue: " + messageQueue);
    }

    @Override // org.apache.rocketmq.streams.core.state.StateStore
    public void persist(Set<MessageQueue> set) throws Throwable {
        MessageQueue next;
        String buildKey;
        Set<byte[]> inCalculating;
        if (set == null || set.size() == 0) {
            return;
        }
        Iterator<MessageQueue> it = convertSourceTopicQueue2StateTopicQueue(set).iterator();
        while (it.hasNext() && (inCalculating = super.getInCalculating((buildKey = buildKey((next = it.next()))))) != null && inCalculating.size() != 0) {
            createStateTopic(next.getTopic());
            for (byte[] bArr : inCalculating) {
                byte[] bArr2 = this.rocksDBStore.get(bArr);
                if (bArr2 != null) {
                    Message message = new Message(next.getTopic(), this.protocol.merge(bArr, bArr2));
                    message.setKeys(Utils.toHexString(bArr));
                    try {
                        logger.debug("persist key: " + new String(bArr, StandardCharsets.UTF_8) + ",messageQueue: " + next);
                    } catch (Throwable th) {
                    }
                    this.producer.send(message, next);
                }
            }
            super.removeCalculating(buildKey);
        }
    }

    public void loadState(Set<MessageQueue> set) throws Throwable {
        if (set == null || set.size() == 0) {
            return;
        }
        DefaultLitePullConsumer defaultLitePullConsumer = new DefaultLitePullConsumer(StreamConfig.ROCKETMQ_STREAMS_STATE_CONSUMER_GROUP);
        defaultLitePullConsumer.setNamesrvAddr(this.properties.getProperty("rocketmq.namesrv.addr"));
        defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultLitePullConsumer.setAutoCommit(false);
        defaultLitePullConsumer.start();
        Set<MessageQueue> convertSourceTopicQueue2StateTopicQueue = convertSourceTopicQueue2StateTopicQueue(set);
        Iterator<MessageQueue> it = convertSourceTopicQueue2StateTopicQueue.iterator();
        while (it.hasNext()) {
            createStateTopic(it.next().getTopic());
        }
        defaultLitePullConsumer.assign(convertSourceTopicQueue2StateTopicQueue);
        Iterator<MessageQueue> it2 = convertSourceTopicQueue2StateTopicQueue.iterator();
        while (it2.hasNext()) {
            defaultLitePullConsumer.seekToBegin(it2.next());
        }
        try {
            this.executor.submit(() -> {
                try {
                    try {
                        pullToLast(defaultLitePullConsumer);
                        defaultLitePullConsumer.shutdown();
                    } catch (Throwable th) {
                        logger.error("pull to last error.", th);
                        throw new RuntimeException(th);
                    }
                } catch (Throwable th2) {
                    defaultLitePullConsumer.shutdown();
                    throw th2;
                }
            }).get(100L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
        }
    }

    public void removeState(Set<MessageQueue> set) throws Throwable {
        if (set == null || set.size() == 0) {
            return;
        }
        try {
            this.executor.submit(() -> {
                try {
                    if (set.size() == 0) {
                        return;
                    }
                    Set<MessageQueue> convertSourceTopicQueue2StateTopicQueue = convertSourceTopicQueue2StateTopicQueue((Set<MessageQueue>) set);
                    for (String str : ((Map) ((Stream) convertSourceTopicQueue2StateTopicQueue.stream().parallel()).collect(Collectors.groupingBy(this::buildKey))).keySet()) {
                        Iterator<byte[]> it = super.getAll(str).iterator();
                        while (it.hasNext()) {
                            this.rocksDBStore.deleteByKey(it.next());
                        }
                        super.removeAll(str);
                    }
                    Iterator<MessageQueue> it2 = convertSourceTopicQueue2StateTopicQueue.iterator();
                    while (it2.hasNext()) {
                        this.recoveringQueueMutex.remove(it2.next());
                    }
                } catch (Throwable th) {
                    logger.error("remove state error", th);
                    throw new RuntimeException(th);
                }
            }).get(100L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
        }
    }

    private void pullToLast(DefaultLitePullConsumer defaultLitePullConsumer) throws Throwable {
        Iterator it = defaultLitePullConsumer.assignment().iterator();
        while (it.hasNext()) {
            this.recoveringQueueMutex.computeIfAbsent((MessageQueue) it.next(), messageQueue -> {
                return new CountDownLatch2(1);
            });
        }
        List<MessageExt> arrayList = new ArrayList<>();
        List poll = defaultLitePullConsumer.poll(50L);
        while (true) {
            List list = poll;
            if (list == null || list.size() == 0) {
                break;
            }
            arrayList.addAll(list);
            if (arrayList.size() <= 1000) {
                poll = defaultLitePullConsumer.poll(50L);
            } else {
                replayState(arrayList);
                arrayList.clear();
                poll = defaultLitePullConsumer.poll(50L);
            }
        }
        if (arrayList.size() != 0) {
            replayState(arrayList);
        }
        Iterator it2 = defaultLitePullConsumer.assignment().iterator();
        while (it2.hasNext()) {
            this.recoveringQueueMutex.get((MessageQueue) it2.next()).countDown();
        }
    }

    private void replayState(List<MessageExt> list) throws Throwable {
        if (list == null || list.size() == 0) {
            return;
        }
        Map map = (Map) ((Stream) list.stream().parallel()).collect(Collectors.groupingBy(this::buildKey));
        Iterator it = map.keySet().iterator();
        while (it.hasNext()) {
            Map map2 = (Map) ((Stream) ((List) map.get((String) it.next())).stream().parallel()).collect(Collectors.groupingBy((v0) -> {
                return v0.getKeys();
            }));
            Iterator it2 = map2.keySet().iterator();
            while (it2.hasNext()) {
                List<MessageExt> sortByQueueOffset = sortByQueueOffset((List) map2.get((String) it2.next()));
                MessageExt messageExt = sortByQueueOffset.get(sortByQueueOffset.size() - 1);
                if (!Constant.TRUE.equals(messageExt.getUserProperty(Constant.EMPTY_BODY))) {
                    Pair<byte[], byte[]> split = this.protocol.split(messageExt.getBody());
                    byte[] key = split.getKey();
                    byte[] value = split.getValue();
                    MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(), messageExt.getBrokerName(), messageExt.getQueueId());
                    try {
                        logger.debug("recover state, key: " + new String(key, StandardCharsets.UTF_8) + ", stateTopicQueue: " + messageQueue);
                    } catch (Throwable th) {
                    }
                    super.putInRecover(buildKey(messageQueue), key);
                    this.rocksDBStore.put(key, value);
                }
            }
        }
    }

    private List<MessageExt> sortByQueueOffset(List<MessageExt> list) {
        if (list == null || list.size() == 0) {
            return new ArrayList();
        }
        list.sort((messageExt, messageExt2) -> {
            long queueOffset = messageExt.getQueueOffset() - messageExt2.getQueueOffset();
            if (queueOffset > 0) {
                return 1;
            }
            return queueOffset < 0 ? -1 : 0;
        });
        return list;
    }

    private void createStateTopic(String str) throws Exception {
        if (RocketMQUtil.checkWhetherExist(str)) {
            return;
        }
        Pair<Integer, Set<String>> totalQueueNumAndClusters = getTotalQueueNumAndClusters(stateTopic2SourceTopic(str));
        RocketMQUtil.createStaticCompactTopic(this.mqAdmin, str, totalQueueNumAndClusters.getKey().intValue(), totalQueueNumAndClusters.getValue());
    }

    private Pair<Integer, Set<String>> getTotalQueueNumAndClusters(String str) throws Exception {
        int i = 0;
        TopicRouteData examineTopicRouteInfo = this.mqAdmin.examineTopicRouteInfo(str);
        List queueDatas = examineTopicRouteInfo.getQueueDatas();
        Set keySet = ((Map) examineTopicRouteInfo.getBrokerDatas().stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getCluster();
        }))).keySet();
        Iterator it = queueDatas.iterator();
        while (it.hasNext()) {
            i += ((QueueData) it.next()).getReadQueueNums();
        }
        return new Pair<>(Integer.valueOf(i), keySet);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.rocksDBStore.close();
        this.executor.shutdown();
    }
}
