/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.consumer;

import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.consumer.MessageQueueListener;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.consumer.store.ReadOffsetType;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.exception.MQClientException;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.consumer.ProcessQueue;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.consumer.PullRequest;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.factory.MQClientInstance;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.UtilAll;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.consumer.ConsumeFromWhere;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageQueue;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.utils.ThreadUtils;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;

public class RebalanceLitePullImpl
extends RebalanceImpl {
    private final DefaultLitePullConsumerImpl litePullConsumerImpl;
    private static ExecutorService messageQueueListenersInTopicExecutor = ThreadUtils.newSingleThreadExecutor("Lite-Pull-Rebalance-Topic", true);

    public RebalanceLitePullImpl(DefaultLitePullConsumerImpl litePullConsumerImpl) {
        this(null, null, null, null, litePullConsumerImpl);
    }

    public RebalanceLitePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl litePullConsumerImpl) {
        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
        this.litePullConsumerImpl = litePullConsumerImpl;
    }

    @Override
    public void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided) {
        List<MessageQueueListener> messageQueueListenersInTopic;
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            }
            catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
        if ((messageQueueListenersInTopic = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListenerInTopic(topic)) == null || messageQueueListenersInTopic == Collections.EMPTY_LIST) {
            return;
        }
        messageQueueListenersInTopicExecutor.submit(new Runnable(){

            @Override
            public void run() {
                for (MessageQueueListener messageQueueListenerInTopic : messageQueueListenersInTopic) {
                    try {
                        messageQueueListenerInTopic.messageQueueChanged(topic, mqAll, mqDivided);
                    }
                    catch (Throwable e) {
                        RebalanceImpl.log.error("messageQueueChanged in topic exception", e);
                    }
                }
            }
        });
    }

    @Override
    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
        this.litePullConsumerImpl.getOffsetStore().persist(mq);
        this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
        return true;
    }

    @Override
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_ACTIVELY;
    }

    @Override
    public void removeDirtyOffset(MessageQueue mq) {
        this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
    }

    @Override
    public long computePullFromWhere(MessageQueue mq) {
        ConsumeFromWhere consumeFromWhere = this.litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
        long result = -1L;
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET: {
                long lastOffset = this.litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
                if (lastOffset >= 0L) {
                    result = lastOffset;
                    break;
                }
                if (-1L == lastOffset) {
                    if (mq.getTopic().startsWith("%RETRY%")) {
                        result = 0L;
                        break;
                    }
                    try {
                        result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                    }
                    catch (MQClientException e) {
                        result = -1L;
                    }
                    break;
                }
                result = -1L;
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {
                long lastOffset = this.litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
                if (lastOffset >= 0L) {
                    result = lastOffset;
                    break;
                }
                if (-1L == lastOffset) {
                    result = 0L;
                    break;
                }
                result = -1L;
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {
                long lastOffset = this.litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
                if (lastOffset >= 0L) {
                    result = lastOffset;
                    break;
                }
                if (-1L == lastOffset) {
                    if (mq.getTopic().startsWith("%RETRY%")) {
                        try {
                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                        }
                        catch (MQClientException e) {
                            result = -1L;
                        }
                        break;
                    }
                    try {
                        long timestamp = UtilAll.parseDate(this.litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeTimestamp(), "yyyyMMddHHmmss").getTime();
                        result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                    }
                    catch (MQClientException e) {
                        result = -1L;
                    }
                    break;
                }
                result = -1L;
                break;
            }
        }
        return result;
    }

    @Override
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    }

    @Override
    public ProcessQueue createProcessQueue() {
        return new ProcessQueue();
    }
}

