package com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer;

import apache.rocketmq.v1.AckMessageResponse;
import apache.rocketmq.v1.Broker;
import apache.rocketmq.v1.ConsumePolicy;
import apache.rocketmq.v1.FilterExpression;
import apache.rocketmq.v1.FilterType;
import apache.rocketmq.v1.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v1.Partition;
import apache.rocketmq.v1.PullMessageRequest;
import apache.rocketmq.v1.QueryOffsetPolicy;
import apache.rocketmq.v1.QueryOffsetRequest;
import apache.rocketmq.v1.ReceiveMessageRequest;
import apache.rocketmq.v1.Resource;
import com.aliyun.openservices.ons.shaded.com.google.common.annotations.VisibleForTesting;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Optional;
import com.aliyun.openservices.ons.shaded.com.google.common.base.Stopwatch;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.Futures;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.RateLimiter;
import com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.SettableFuture;
import com.aliyun.openservices.ons.shaded.com.google.errorprone.annotations.concurrent.GuardedBy;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.util.Durations;
import com.aliyun.openservices.ons.shaded.com.google.protobuf.util.Timestamps;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Code;
import com.aliyun.openservices.ons.shaded.com.google.rpc.Status;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ConsumeStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.MessageModel;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.PullMessageResult;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.PullStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ReceiveMessageResult;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.ReceiveStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.ExpressionType;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.filter.FilterExpression;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.consumer.listener.MessageListenerType;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageExt;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPoint;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageHookPointStatus;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageImplAccessor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptor;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageInterceptorContext;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.message.MessageQueue;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.route.Endpoints;
import com.aliyun.openservices.ons.shaded.org.apache.rocketmq.utility.SimpleFuture;
import com.aliyun.openservices.ons.shaded.org.slf4j.Logger;
import com.aliyun.openservices.ons.shaded.org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* loaded from: input_file:com/aliyun/openservices/ons/shaded/org/apache/rocketmq/client/impl/consumer/ProcessQueueImpl.class */
public class ProcessQueueImpl implements ProcessQueue {
    public static final long RECEIVE_LONG_POLLING_TIMEOUT_MILLIS = 30000;
    public static final long RECEIVE_LATER_DELAY_MILLIS = 1000;
    public static final long PULL_LONG_POLLING_TIMEOUT_MILLIS = 30000;
    public static final long PULL_LATER_DELAY_MILLIS = 1000;
    public static final long ACK_FIFO_MESSAGE_DELAY_MILLIS = 100;
    public static final long FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY_MILLIS = 100;
    private final MessageQueue mq;
    private final FilterExpression filterExpression;
    private final PushConsumerImpl consumerImpl;
    public static final long MAX_IDLE_MILLIS = 2 * Math.max(30000L, 30000L);
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProcessQueueImpl.class);
    private volatile long activityNanoTime = System.nanoTime();
    private volatile long cacheFullNanoTime = Long.MIN_VALUE;
    private volatile boolean dropped = false;

    @GuardedBy("pendingMessagesLock")
    private final List<MessageExt> pendingMessages = new ArrayList();
    private final ReadWriteLock pendingMessagesLock = new ReentrantReadWriteLock();

    @GuardedBy("inflightMessagesLock")
    private final List<MessageExt> inflightMessages = new ArrayList();
    private final ReadWriteLock inflightMessagesLock = new ReentrantReadWriteLock();
    private final AtomicLong cachedMessagesBytes = new AtomicLong(0);
    private final AtomicBoolean fifoConsumptionOccupied = new AtomicBoolean(false);
    private final NextOffsetRecord nextOffsetRecord = new NextOffsetRecord();

    public ProcessQueueImpl(PushConsumerImpl pushConsumerImpl, MessageQueue messageQueue, FilterExpression filterExpression) {
        this.consumerImpl = pushConsumerImpl;
        this.mq = messageQueue;
        this.filterExpression = filterExpression;
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public void drop() {
        this.dropped = true;
    }

    private boolean fifoConsumptionInbound() {
        return this.fifoConsumptionOccupied.compareAndSet(false, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fifoConsumptionOutbound() {
        this.fifoConsumptionOccupied.compareAndSet(true, false);
    }

    @VisibleForTesting
    public void cacheMessages(List<MessageExt> list) {
        ArrayList arrayList = new ArrayList();
        MessageListenerType listenerType = this.consumerImpl.getMessageListener().getListenerType();
        MessageModel messageModel = this.consumerImpl.getMessageModel();
        String namespace = this.consumerImpl.getNamespace();
        this.pendingMessagesLock.writeLock().lock();
        try {
            for (MessageExt messageExt : list) {
                if (!MessageImplAccessor.getMessageImpl(messageExt).isCorrupted()) {
                    this.pendingMessages.add(messageExt);
                    this.cachedMessagesBytes.addAndGet(messageExt.getBody().length);
                    arrayList.add(Long.valueOf(messageExt.getQueueOffset()));
                } else if (MessageModel.BROADCASTING.equals(messageModel)) {
                    log.error("Message is corrupted, ignore it in broadcasting mode, namespace={}, mq={}, messageId={}, clientId={}", namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                } else {
                    if (MessageListenerType.CONCURRENTLY.equals(listenerType)) {
                        log.error("Message is corrupted, nack it for concurrently consumption, namespace={}, mq={}, messageId={}, clientId={}", namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                        this.consumerImpl.nackMessage(messageExt);
                    }
                    if (MessageListenerType.ORDERLY.equals(listenerType)) {
                        log.error("Message is corrupted, forward it to DLQ for fifo consumption, namespace={}, mq={}, messageId={}, clientId={}", namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                        forwardToDeadLetterQueue(messageExt);
                    }
                }
            }
            if (this.consumerImpl.isOffsetRecorded()) {
                this.nextOffsetRecord.add(arrayList);
            }
        } finally {
            this.pendingMessagesLock.writeLock().unlock();
        }
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public List<MessageExt> tryTakeMessages(int i) {
        this.pendingMessagesLock.writeLock().lock();
        this.inflightMessagesLock.writeLock().lock();
        try {
            ArrayList arrayList = new ArrayList();
            RateLimiter rateLimiter = this.consumerImpl.rateLimiter(this.mq.getTopic());
            if (null == rateLimiter) {
                ArrayList arrayList2 = new ArrayList(this.pendingMessages.subList(0, Math.min(this.pendingMessages.size(), i)));
                arrayList.addAll(arrayList2);
                this.inflightMessages.addAll(arrayList2);
                this.pendingMessages.removeAll(arrayList2);
                this.inflightMessagesLock.writeLock().unlock();
                this.pendingMessagesLock.writeLock().unlock();
                return arrayList;
            }
            while (this.pendingMessages.size() > 0 && arrayList.size() < i && rateLimiter.tryAcquire()) {
                MessageExt next = this.pendingMessages.iterator().next();
                arrayList.add(next);
                this.inflightMessages.add(next);
                this.pendingMessages.remove(next);
            }
            return arrayList;
        } finally {
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void eraseMessage(MessageExt messageExt) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(messageExt);
        eraseMessages(arrayList);
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public Optional<MessageExt> tryTakeFifoMessage() {
        this.pendingMessagesLock.writeLock().lock();
        this.inflightMessagesLock.writeLock().lock();
        try {
            if (this.pendingMessages.isEmpty()) {
                Optional<MessageExt> absent = Optional.absent();
                this.inflightMessagesLock.writeLock().unlock();
                this.pendingMessagesLock.writeLock().unlock();
                return absent;
            }
            if (!fifoConsumptionInbound()) {
                log.debug("Fifo consumption task are not finished, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
                Optional<MessageExt> absent2 = Optional.absent();
                this.inflightMessagesLock.writeLock().unlock();
                this.pendingMessagesLock.writeLock().unlock();
                return absent2;
            }
            RateLimiter rateLimiter = this.consumerImpl.rateLimiter(this.mq.getTopic());
            if (null == rateLimiter) {
                MessageExt next = this.pendingMessages.iterator().next();
                this.pendingMessages.remove(next);
                this.inflightMessages.add(next);
                Optional<MessageExt> of = Optional.of(next);
                this.inflightMessagesLock.writeLock().unlock();
                this.pendingMessagesLock.writeLock().unlock();
                return of;
            }
            if (!rateLimiter.tryAcquire()) {
                fifoConsumptionOutbound();
                Optional<MessageExt> absent3 = Optional.absent();
                this.inflightMessagesLock.writeLock().unlock();
                this.pendingMessagesLock.writeLock().unlock();
                return absent3;
            }
            MessageExt next2 = this.pendingMessages.iterator().next();
            this.pendingMessages.remove(next2);
            this.inflightMessages.add(next2);
            Optional<MessageExt> of2 = Optional.of(next2);
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
            return of2;
        } catch (Throwable th) {
            this.inflightMessagesLock.writeLock().unlock();
            this.pendingMessagesLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public void doStats() {
        this.pendingMessagesLock.readLock().lock();
        this.inflightMessagesLock.readLock().lock();
        try {
            log.info("clientId={}, namespace={}, mq={}, pendingMessageQuantity={}, inflightMessageQuantity={}, cachedMessagesBytes={}", this.consumerImpl.getId(), this.consumerImpl.getNamespace(), this.mq, Integer.valueOf(this.pendingMessages.size()), Integer.valueOf(this.inflightMessages.size()), Long.valueOf(this.cachedMessagesBytes.get()));
        } finally {
            this.inflightMessagesLock.readLock().unlock();
            this.pendingMessagesLock.readLock().unlock();
        }
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public void eraseFifoMessage(final MessageExt messageExt, ConsumeStatus consumeStatus) {
        statsConsumptionStatus(consumeStatus);
        if (MessageModel.BROADCASTING.equals(this.consumerImpl.getMessageModel())) {
            eraseMessage(messageExt);
            fifoConsumptionOutbound();
            return;
        }
        final int maxDeliveryAttempts = this.consumerImpl.getMaxDeliveryAttempts();
        int deliveryAttempt = messageExt.getDeliveryAttempt();
        final ConsumeService consumeService = this.consumerImpl.getConsumeService();
        final String namespace = this.consumerImpl.getNamespace();
        if (!ConsumeStatus.ERROR.equals(consumeStatus) || deliveryAttempt >= maxDeliveryAttempts) {
            boolean equals = ConsumeStatus.OK.equals(consumeStatus);
            if (!equals) {
                log.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, attempt={}, namespace={}, mq={}, messageId={}, clientId={}", Integer.valueOf(maxDeliveryAttempts), Integer.valueOf(deliveryAttempt), namespace, this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
            }
            (equals ? ackFifoMessage(messageExt) : forwardToDeadLetterQueue(messageExt)).addListener(new Runnable() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    ProcessQueueImpl.this.eraseMessage(messageExt);
                    ProcessQueueImpl.this.fifoConsumptionOutbound();
                    consumeService.signal();
                }
            }, this.consumerImpl.getConsumptionExecutor());
            return;
        }
        MessageImplAccessor.getMessageImpl(messageExt).getSystemAttribute().setDeliveryAttempt(1 + deliveryAttempt);
        long fifoConsumptionSuspendTimeMillis = this.consumerImpl.getFifoConsumptionSuspendTimeMillis();
        log.debug("Prepare to redeliver the fifo message because of consumption failure, maxAttempt={}, attempt={}, namespace={}, mq={}, messageId={}, suspendTime={}ms, clientId={}", Integer.valueOf(maxDeliveryAttempts), Integer.valueOf(messageExt.getDeliveryAttempt()), namespace, this.mq, messageExt.getMsgId(), Long.valueOf(fifoConsumptionSuspendTimeMillis), this.consumerImpl.getId());
        Futures.addCallback(consumeService.consume(messageExt, fifoConsumptionSuspendTimeMillis, TimeUnit.MILLISECONDS), new FutureCallback<ConsumeStatus>() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.1
            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ConsumeStatus consumeStatus2) {
                ProcessQueueImpl.this.eraseFifoMessage(messageExt, consumeStatus2);
            }

            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ProcessQueueImpl.log.error("[Bug] Exception raised while fifo message redelivery, namespace={}, mq={}, messageId={}, attempt={}, maxAttempts={}, clientId={}", namespace, ProcessQueueImpl.this.mq, messageExt.getMsgId(), Integer.valueOf(messageExt.getDeliveryAttempt()), Integer.valueOf(maxDeliveryAttempts), ProcessQueueImpl.this.consumerImpl.getId(), th);
            }
        });
    }

    private void eraseMessages(List<MessageExt> list) {
        ArrayList arrayList = new ArrayList();
        this.inflightMessagesLock.writeLock().lock();
        try {
            for (MessageExt messageExt : list) {
                if (this.inflightMessages.remove(messageExt)) {
                    this.cachedMessagesBytes.addAndGet(-messageExt.getBody().length);
                }
                arrayList.add(Long.valueOf(messageExt.getQueueOffset()));
            }
            if (this.consumerImpl.isOffsetRecorded()) {
                this.nextOffsetRecord.remove(arrayList);
                Optional<Long> next = this.nextOffsetRecord.next();
                if (next.isPresent()) {
                    this.consumerImpl.updateOffset(this.mq, next.get().longValue());
                }
            }
        } finally {
            this.inflightMessagesLock.writeLock().unlock();
        }
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public void eraseMessages(List<MessageExt> list, ConsumeStatus consumeStatus) {
        statsConsumptionStatus(list.size(), consumeStatus);
        eraseMessages(list);
        if (MessageModel.CLUSTERING.equals(this.consumerImpl.getMessageModel())) {
            if (ConsumeStatus.OK.equals(consumeStatus)) {
                Iterator<MessageExt> it = list.iterator();
                while (it.hasNext()) {
                    ackMessage(it.next());
                }
                return;
            }
            for (MessageExt messageExt : list) {
                int maxDeliveryAttempts = this.consumerImpl.getMaxDeliveryAttempts();
                int deliveryAttempt = messageExt.getDeliveryAttempt();
                if (maxDeliveryAttempts <= deliveryAttempt) {
                    log.error("Failed to consume message finally, run out of attempt times, maxAttempts={}, attempt={}, namespace={}, mq={}, messageId={}, clientId={}", Integer.valueOf(maxDeliveryAttempts), Integer.valueOf(deliveryAttempt), this.consumerImpl.getNamespace(), this.mq, messageExt.getMsgId(), this.consumerImpl.getId());
                }
                this.consumerImpl.nackMessage(messageExt);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReceiveMessageResult(ReceiveMessageResult receiveMessageResult) {
        ReceiveStatus receiveStatus = receiveMessageResult.getReceiveStatus();
        List<MessageExt> messagesFound = receiveMessageResult.getMessagesFound();
        Endpoints endpoints = receiveMessageResult.getEndpoints();
        switch (receiveStatus) {
            case OK:
                if (!messagesFound.isEmpty()) {
                    cacheMessages(messagesFound);
                    this.consumerImpl.getReceivedMessagesQuantity().getAndAdd(messagesFound.size());
                    this.consumerImpl.getConsumeService().signal();
                }
                log.debug("Receive message with OK, namespace={}, mq={}, endpoints={}, messages found count={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, endpoints, Integer.valueOf(messagesFound.size()), this.consumerImpl.getId());
                receiveMessage();
                return;
            case DEADLINE_EXCEEDED:
            case RESOURCE_EXHAUSTED:
            case NOT_FOUND:
            case DATA_CORRUPTED:
            case INTERNAL:
            default:
                log.error("Receive message with status={}, namespace={}, mq={}, endpoints={}, messages found count={}, clientId={}", receiveStatus, this.consumerImpl.getNamespace(), this.mq, endpoints, Integer.valueOf(messagesFound.size()), this.consumerImpl.getId());
                receiveMessageLater();
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onPullMessageResult(PullMessageResult pullMessageResult) {
        PullStatus pullStatus = pullMessageResult.getPullStatus();
        List<MessageExt> messagesFound = pullMessageResult.getMessagesFound();
        switch (pullStatus) {
            case OK:
                if (!messagesFound.isEmpty()) {
                    cacheMessages(messagesFound);
                    this.consumerImpl.getPulledMessagesQuantity().getAndAdd(messagesFound.size());
                    this.consumerImpl.getConsumeService().signal();
                }
                log.debug("Pull message with OK, namespace={}, mq={}, messages found count={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, Integer.valueOf(messagesFound.size()), this.consumerImpl.getId());
                pullMessage(pullMessageResult.getNextBeginOffset());
                return;
            case DEADLINE_EXCEEDED:
            case RESOURCE_EXHAUSTED:
            case NOT_FOUND:
            case OUT_OF_RANGE:
            case INTERNAL:
            default:
                log.error("Pull message with status={}, namespace={}, mq={}, messages found count={}, clientId={}", pullStatus, this.consumerImpl.getNamespace(), this.mq, Integer.valueOf(messagesFound.size()), this.consumerImpl.getId());
                pullMessageLater(pullMessageResult.getNextBeginOffset());
                return;
        }
    }

    @VisibleForTesting
    public void receiveMessage() {
        if (this.dropped) {
            log.debug("Process queue has been dropped, no longer receive message, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
        } else if (!isCacheFull()) {
            receiveMessageImmediately();
        } else {
            log.warn("Process queue cache is full, would receive message later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            receiveMessageLater();
        }
    }

    public void receiveMessageLater() {
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(new Runnable() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.3
                @Override // java.lang.Runnable
                public void run() {
                    ProcessQueueImpl.this.receiveMessage();
                }
            }, 1000L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule receive message request, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), th);
            receiveMessageLater();
        }
    }

    public boolean isCacheFull() {
        long cachedMessagesQuantity = cachedMessagesQuantity();
        int cachedMessagesQuantityThresholdPerQueue = this.consumerImpl.cachedMessagesQuantityThresholdPerQueue();
        if (cachedMessagesQuantityThresholdPerQueue <= cachedMessagesQuantity) {
            log.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}, namespace={}, mq={}, clientId={}", Integer.valueOf(cachedMessagesQuantityThresholdPerQueue), Long.valueOf(cachedMessagesQuantity), this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            this.cacheFullNanoTime = System.nanoTime();
            return true;
        }
        int cachedMessagesBytesThresholdPerQueue = this.consumerImpl.cachedMessagesBytesThresholdPerQueue();
        long cachedMessageBytes = cachedMessageBytes();
        if (cachedMessagesBytesThresholdPerQueue > cachedMessageBytes) {
            return false;
        }
        log.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes, actual={} bytes, namespace={}, mq={}, clientId={}", Integer.valueOf(cachedMessagesBytesThresholdPerQueue), Long.valueOf(cachedMessageBytes), this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
        this.cacheFullNanoTime = System.nanoTime();
        return true;
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public boolean expired() {
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.activityNanoTime);
        if (millis < MAX_IDLE_MILLIS) {
            return false;
        }
        long millis2 = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.cacheFullNanoTime);
        if (millis2 < MAX_IDLE_MILLIS) {
            return false;
        }
        log.warn("Process queue is idle, reception idle time={}ms, cache full idle time={}ms, namespace={}, mq={}, clientId={}", Long.valueOf(millis), Long.valueOf(millis2), this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
        return true;
    }

    private ListenableFuture<Long> queryOffset() {
        QueryOffsetRequest.Builder newBuilder = QueryOffsetRequest.newBuilder();
        switch (this.consumerImpl.getConsumeFromWhere()) {
            case CONSUME_FROM_TIMESTAMP:
                newBuilder.setPolicy(QueryOffsetPolicy.TIME_POINT);
                newBuilder.setTimePoint(Timestamps.fromMillis(this.consumerImpl.getConsumeFromTimeMillis()));
                break;
            case CONSUME_FROM_FIRST_OFFSET:
                newBuilder.setPolicy(QueryOffsetPolicy.BEGINNING);
                break;
            default:
                newBuilder.setPolicy(QueryOffsetPolicy.END);
                break;
        }
        newBuilder.setPartition(getPbPartition());
        return this.consumerImpl.queryOffset(newBuilder.build(), this.mq.getPartition().getBroker().getEndpoints());
    }

    private void pullMessageImmediately() {
        ListenableFuture<Long> listenableFuture;
        if (this.consumerImpl.isOffsetRecorded()) {
            try {
                Optional<Long> readOffset = this.consumerImpl.readOffset(this.mq);
                if (readOffset.isPresent()) {
                    pullMessage(readOffset.get().longValue());
                    return;
                }
            } catch (Throwable th) {
                log.error("Exception raised while reading offset from offset store, drop message queue, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), th);
                this.consumerImpl.dropProcessQueue(this.mq);
                return;
            }
        }
        log.info("Offset not found, try to query offset from remote, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
        try {
            listenableFuture = queryOffset();
        } catch (Throwable th2) {
            SettableFuture create = SettableFuture.create();
            create.setException(th2);
            listenableFuture = create;
        }
        final Endpoints endpoints = this.mq.getPartition().getBroker().getEndpoints();
        Futures.addCallback(listenableFuture, new FutureCallback<Long>() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.4
            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Long l) {
                ProcessQueueImpl.log.info("Query offset successfully, namespace={}, mq={}, endpoints={}, offset={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, l, ProcessQueueImpl.this.consumerImpl.getId());
                if (ProcessQueueImpl.this.consumerImpl.isOffsetRecorded()) {
                    ProcessQueueImpl.this.consumerImpl.updateOffset(ProcessQueueImpl.this.mq, l.longValue());
                }
                ProcessQueueImpl.this.pullMessage(l.longValue());
            }

            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th3) {
                ProcessQueueImpl.log.error("Exception raised while querying offset to pull, drop message queue, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), th3);
                ProcessQueueImpl.this.consumerImpl.dropProcessQueue(ProcessQueueImpl.this.mq);
            }
        });
    }

    private void pullMessageImmediately(final long j) {
        if (!this.consumerImpl.isRunning()) {
            log.info("Stop to pull message because consumer is not running, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            return;
        }
        try {
            final Endpoints endpoints = this.mq.getPartition().getBroker().getEndpoints();
            PullMessageRequest wrapPullMessageRequest = wrapPullMessageRequest(j);
            this.activityNanoTime = System.nanoTime();
            final MessageInterceptorContext build = MessageInterceptorContext.builder().setBatchSize(wrapPullMessageRequest.getBatchSize()).setTopic(this.mq.getTopic()).build();
            this.consumerImpl.intercept(MessageHookPoint.PRE_PULL, build);
            final Stopwatch createStarted = Stopwatch.createStarted();
            Futures.addCallback(this.consumerImpl.pullMessage(wrapPullMessageRequest, endpoints, 30000L), new FutureCallback<PullMessageResult>() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.5
                @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
                public void onSuccess(PullMessageResult pullMessageResult) {
                    long elapsed = createStarted.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                    List<MessageExt> messagesFound = pullMessageResult.getMessagesFound();
                    MessageInterceptorContext build2 = build.toBuilder().setDuration(elapsed).setStatus(PullStatus.OK.equals(pullMessageResult.getPullStatus()) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR).build();
                    if (messagesFound.isEmpty()) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_PULL, build2);
                    }
                    Iterator<MessageExt> it = messagesFound.iterator();
                    while (it.hasNext()) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_PULL, it.next(), build2);
                    }
                    try {
                        ProcessQueueImpl.this.onPullMessageResult(pullMessageResult);
                    } catch (Throwable th) {
                        ProcessQueueImpl.log.error("[Bug] Exception raised while handling pull result, would pull later, namespace={} mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), th);
                        ProcessQueueImpl.this.pullMessageLater(j);
                    }
                }

                @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_PULL, build.toBuilder().setDuration(createStarted.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT)).setStatus(MessageHookPointStatus.ERROR).setThrowable(th).build());
                    ProcessQueueImpl.log.error("Exception raised while pull message, would pull later, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), th);
                    ProcessQueueImpl.this.pullMessageLater(j);
                }
            });
            this.consumerImpl.getPullTimes().getAndIncrement();
        } catch (Throwable th) {
            log.error("Exception raised while pull message, would pull message later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), th);
            pullMessageLater(j);
        }
    }

    public void pullMessageLater(final long j) {
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(new Runnable() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.6
                @Override // java.lang.Runnable
                public void run() {
                    ProcessQueueImpl.this.pullMessage(j);
                }
            }, 1000L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule pull message request, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), th);
            pullMessageLater(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void pullMessage(long j) {
        String id = this.consumerImpl.getId();
        if (this.dropped) {
            log.info("Process queue has been dropped, no longer pull message, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, id);
        } else if (!isCacheFull()) {
            pullMessageImmediately(j);
        } else {
            log.warn("Process queue cache is full, would pull message later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, id);
            pullMessageLater(j);
        }
    }

    private int getMaxAwaitBatchSize() {
        return Math.min(Math.max(this.consumerImpl.cachedMessagesQuantityThresholdPerQueue() - cachedMessagesQuantity(), 1), this.consumerImpl.getMaxAwaitBatchSizePerQueue());
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public void fetchMessageImmediately() {
        if (MessageModel.CLUSTERING.equals(this.consumerImpl.getMessageModel())) {
            receiveMessageImmediately();
        } else {
            pullMessageImmediately();
        }
    }

    ReceiveMessageRequest wrapReceiveMessageRequest() {
        int maxAwaitBatchSize = getMaxAwaitBatchSize();
        ReceiveMessageRequest.Builder awaitTime = ReceiveMessageRequest.newBuilder().setGroup(this.consumerImpl.getPbGroup()).setClientId(this.consumerImpl.getId()).setPartition(getPbPartition()).setBatchSize(maxAwaitBatchSize).setInvisibleDuration(Durations.fromMillis(this.consumerImpl.getConsumptionTimeoutMillis())).setAwaitTime(Durations.fromMillis(this.consumerImpl.getMaxAwaitTimeMillisPerQueue()));
        switch (this.consumerImpl.getConsumeFromWhere()) {
            case CONSUME_FROM_TIMESTAMP:
                awaitTime.setConsumePolicy(ConsumePolicy.TARGET_TIMESTAMP);
                break;
            case CONSUME_FROM_FIRST_OFFSET:
                awaitTime.setConsumePolicy(ConsumePolicy.PLAYBACK);
                break;
            case CONSUME_FROM_MAX_OFFSET:
                awaitTime.setConsumePolicy(ConsumePolicy.DISCARD);
                break;
            default:
                awaitTime.setConsumePolicy(ConsumePolicy.RESUME);
                break;
        }
        awaitTime.setFilterExpression(getPbFilterExpression());
        awaitTime.setFifoFlag(MessageListenerType.ORDERLY.equals(this.consumerImpl.getMessageListener().getListenerType()));
        return awaitTime.build();
    }

    PullMessageRequest wrapPullMessageRequest(long j) {
        return PullMessageRequest.newBuilder().setGroup(this.consumerImpl.getPbGroup()).setPartition(getPbPartition()).setOffset(j).setBatchSize(getMaxAwaitBatchSize()).setAwaitTime(Durations.fromMillis(this.consumerImpl.getMaxAwaitTimeMillisPerQueue())).setFilterExpression(getPbFilterExpression()).setClientId(this.consumerImpl.getId()).build();
    }

    private void receiveMessageImmediately() {
        if (!this.consumerImpl.isRunning()) {
            log.info("Stop to receive message because consumer is not running, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId());
            return;
        }
        try {
            final Endpoints endpoints = this.mq.getPartition().getBroker().getEndpoints();
            ReceiveMessageRequest wrapReceiveMessageRequest = wrapReceiveMessageRequest();
            this.activityNanoTime = System.nanoTime();
            final MessageInterceptorContext build = MessageInterceptorContext.builder().setBatchSize(wrapReceiveMessageRequest.getBatchSize()).setTopic(this.mq.getTopic()).build();
            this.consumerImpl.intercept(MessageHookPoint.PRE_RECEIVE, build);
            final Stopwatch createStarted = Stopwatch.createStarted();
            Futures.addCallback(this.consumerImpl.receiveMessage(wrapReceiveMessageRequest, endpoints, 30000L), new FutureCallback<ReceiveMessageResult>() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.7
                @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
                public void onSuccess(ReceiveMessageResult receiveMessageResult) {
                    long elapsed = createStarted.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT);
                    List<MessageExt> messagesFound = receiveMessageResult.getMessagesFound();
                    MessageInterceptorContext build2 = build.toBuilder().setDuration(elapsed).setStatus(ReceiveStatus.OK.equals(receiveMessageResult.getReceiveStatus()) ? MessageHookPointStatus.OK : MessageHookPointStatus.ERROR).build();
                    if (messagesFound.isEmpty()) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_RECEIVE, build2);
                    }
                    Iterator<MessageExt> it = messagesFound.iterator();
                    while (it.hasNext()) {
                        ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_RECEIVE, it.next(), build2);
                    }
                    try {
                        ProcessQueueImpl.this.onReceiveMessageResult(receiveMessageResult);
                    } catch (Throwable th) {
                        ProcessQueueImpl.log.error("[Bug] Exception raised while handling receive result, would receive later, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), th);
                        ProcessQueueImpl.this.receiveMessageLater();
                    }
                }

                @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    ProcessQueueImpl.this.consumerImpl.intercept(MessageHookPoint.POST_RECEIVE, build.toBuilder().setDuration(createStarted.elapsed(MessageInterceptor.DEFAULT_TIME_UNIT)).setStatus(MessageHookPointStatus.ERROR).setThrowable(th).build());
                    ProcessQueueImpl.log.error("Exception raised while message reception, would receive later, namespace={}, mq={}, endpoints={}, clientId={}", ProcessQueueImpl.this.consumerImpl.getNamespace(), ProcessQueueImpl.this.mq, endpoints, ProcessQueueImpl.this.consumerImpl.getId(), th);
                    ProcessQueueImpl.this.receiveMessageLater();
                }
            });
            this.consumerImpl.getReceptionTimes().getAndIncrement();
        } catch (Throwable th) {
            log.error("Exception raised while message reception, would receive later, namespace={}, mq={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, this.consumerImpl.getId(), th);
            receiveMessageLater();
        }
    }

    private SimpleFuture ackFifoMessage(MessageExt messageExt) {
        SimpleFuture simpleFuture = new SimpleFuture();
        ackFifoMessage(messageExt, 1, simpleFuture);
        return simpleFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ackFifoMessage(final MessageExt messageExt, final int i, final SimpleFuture simpleFuture) {
        final Endpoints endpoints = messageExt.getEndpoints();
        final String namespace = this.consumerImpl.getNamespace();
        final String id = this.consumerImpl.getId();
        Futures.addCallback(this.consumerImpl.ackMessage(messageExt, i), new FutureCallback<AckMessageResponse>() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.8
            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(AckMessageResponse ackMessageResponse) {
                Status status = ackMessageResponse.getCommon().getStatus();
                Code forNumber = Code.forNumber(status.getCode());
                if (!Code.OK.equals(forNumber)) {
                    ProcessQueueImpl.log.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}, messageId={}, namespace={}, mq={}, code={}, endpoints={}, status message=[{}]", id, Integer.valueOf(i), messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, forNumber, endpoints, status.getMessage());
                    ProcessQueueImpl.this.ackFifoMessageLater(messageExt, 1 + i, simpleFuture);
                } else {
                    if (1 < i) {
                        ProcessQueueImpl.log.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, namespace={}, mq={}, endpoints={}", id, Integer.valueOf(i), messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, endpoints);
                    }
                    simpleFuture.markAsDone();
                }
            }

            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ProcessQueueImpl.log.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later, attempt={}, messageId={}, namespace={}, mq={}, endpoints={}", id, Integer.valueOf(i), messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, endpoints, th);
                ProcessQueueImpl.this.ackFifoMessageLater(messageExt, 1 + i, simpleFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ackFifoMessageLater(final MessageExt messageExt, final int i, final SimpleFuture simpleFuture) {
        String msgId = messageExt.getMsgId();
        String id = this.consumerImpl.getId();
        if (this.dropped) {
            log.info("Process queue was dropped, give up to ack message, namespace={}, mq={}, messageId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, id);
            return;
        }
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(new Runnable() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.9
                @Override // java.lang.Runnable
                public void run() {
                    ProcessQueueImpl.this.ackFifoMessage(messageExt, i, simpleFuture);
                }
            }, 100L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule ack fifo message request, namespace={}, mq={}, msgId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, id);
            ackFifoMessageLater(messageExt, 1 + i, simpleFuture);
        }
    }

    private SimpleFuture forwardToDeadLetterQueue(MessageExt messageExt) {
        SimpleFuture simpleFuture = new SimpleFuture();
        forwardToDeadLetterQueue(messageExt, 1, simpleFuture);
        return simpleFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forwardToDeadLetterQueue(final MessageExt messageExt, final int i, final SimpleFuture simpleFuture) {
        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue = this.consumerImpl.forwardMessageToDeadLetterQueue(messageExt, i);
        final String namespace = this.consumerImpl.getNamespace();
        final String id = this.consumerImpl.getId();
        Futures.addCallback(forwardMessageToDeadLetterQueue, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.10
            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ForwardMessageToDeadLetterQueueResponse forwardMessageToDeadLetterQueueResponse) {
                Status status = forwardMessageToDeadLetterQueueResponse.getCommon().getStatus();
                Code forNumber = Code.forNumber(status.getCode());
                if (!Code.OK.equals(forNumber)) {
                    ProcessQueueImpl.log.error("Failed to forward message to DLQ, would attempt to re-forward later, clientId={}, messageId={}, attempt={}, namespace={}, mq={}, code={}, status message=[{}]", id, messageExt.getMsgId(), Integer.valueOf(i), namespace, ProcessQueueImpl.this.mq, forNumber, status.getMessage());
                    ProcessQueueImpl.this.forwardToDeadLetterQueueLater(messageExt, 1 + i, simpleFuture);
                } else {
                    if (1 < i) {
                        ProcessQueueImpl.log.info("Re-forward message to DLQ successfully, clientId={}, attempt={}, messageId={}, namespace={}, mq={}", id, Integer.valueOf(i), messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq);
                    }
                    simpleFuture.markAsDone();
                }
            }

            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ProcessQueueImpl.log.error("Exception raised while forward message to DLQ, would attempt to re-forward later, clientId={}, attempt={}, messageId={}, namespace={}, mq={}", id, Integer.valueOf(i), messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, th);
                ProcessQueueImpl.this.forwardToDeadLetterQueueLater(messageExt, 1 + i, simpleFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void forwardToDeadLetterQueueLater(final MessageExt messageExt, final int i, final SimpleFuture simpleFuture) {
        String msgId = messageExt.getMsgId();
        String id = this.consumerImpl.getId();
        if (this.dropped) {
            log.info("Process queue was dropped, give up to forward message to DLQ, namespace={}, mq={}, messageId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, id);
            return;
        }
        ScheduledExecutorService scheduler = this.consumerImpl.getScheduler();
        try {
            scheduler.schedule(new Runnable() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.11
                @Override // java.lang.Runnable
                public void run() {
                    ProcessQueueImpl.this.forwardToDeadLetterQueue(messageExt, i, simpleFuture);
                }
            }, 100L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            if (scheduler.isShutdown()) {
                return;
            }
            log.error("[Bug] Failed to schedule DLQ message request, namespace={}, mq={}, msgId={}, clientId={}", this.consumerImpl.getNamespace(), this.mq, msgId, id);
            forwardToDeadLetterQueueLater(messageExt, 1 + i, simpleFuture);
        }
    }

    public void ackMessage(final MessageExt messageExt) {
        ListenableFuture<AckMessageResponse> ackMessage = this.consumerImpl.ackMessage(messageExt);
        final String namespace = this.consumerImpl.getNamespace();
        final String id = this.consumerImpl.getId();
        Futures.addCallback(ackMessage, new FutureCallback<AckMessageResponse>() { // from class: com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueueImpl.12
            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(AckMessageResponse ackMessageResponse) {
                Status status = ackMessageResponse.getCommon().getStatus();
                Code forNumber = Code.forNumber(status.getCode());
                if (Code.OK.equals(forNumber)) {
                    ProcessQueueImpl.log.trace("Ack message successfully, clientId={}, messageId={}, namespace={}, mq={}", id, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq);
                } else {
                    ProcessQueueImpl.log.error("Failed to ack message, clientId={}, messageId={}, namespace={}, mq={}, code={}, status message=[{}]", id, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, forNumber, status.getMessage());
                }
            }

            @Override // com.aliyun.openservices.ons.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                ProcessQueueImpl.log.error("Exception raised while ack message, clientId={}, messageId={}, namespace={}, mq={}", id, messageExt.getMsgId(), namespace, ProcessQueueImpl.this.mq, th);
            }
        });
    }

    private apache.rocketmq.v1.FilterExpression getPbFilterExpression() {
        ExpressionType expressionType = this.filterExpression.getExpressionType();
        FilterExpression.Builder newBuilder = apache.rocketmq.v1.FilterExpression.newBuilder();
        newBuilder.setExpression(this.filterExpression.getExpression());
        switch (expressionType) {
            case SQL92:
                return newBuilder.setType(FilterType.SQL).build();
            case TAG:
            default:
                return newBuilder.setType(FilterType.TAG).build();
        }
    }

    private Partition getPbPartition() {
        Resource build = Resource.newBuilder().setResourceNamespace(this.consumerImpl.getNamespace()).setName(this.mq.getTopic()).build();
        return Partition.newBuilder().setTopic(build).setId(this.mq.getQueueId()).setBroker(Broker.newBuilder().setName(this.mq.getBrokerName()).build()).build();
    }

    public int cachedMessagesQuantity() {
        this.pendingMessagesLock.readLock().lock();
        this.inflightMessagesLock.readLock().lock();
        try {
            return this.pendingMessages.size() + this.inflightMessages.size();
        } finally {
            this.inflightMessagesLock.readLock().unlock();
            this.pendingMessagesLock.readLock().unlock();
        }
    }

    public int inflightMessagesQuantity() {
        this.inflightMessagesLock.readLock().lock();
        try {
            return this.inflightMessages.size();
        } finally {
            this.inflightMessagesLock.readLock().unlock();
        }
    }

    public long cachedMessageBytes() {
        return this.cachedMessagesBytes.get();
    }

    private void statsConsumptionStatus(ConsumeStatus consumeStatus) {
        statsConsumptionStatus(1, consumeStatus);
    }

    private void statsConsumptionStatus(int i, ConsumeStatus consumeStatus) {
        if (ConsumeStatus.OK.equals(consumeStatus)) {
            this.consumerImpl.getConsumptionOkQuantity().addAndGet(i);
        } else {
            this.consumerImpl.getConsumptionErrorQuantity().addAndGet(i);
        }
    }

    @Override // com.aliyun.openservices.ons.shaded.org.apache.rocketmq.client.impl.consumer.ProcessQueue
    public MessageQueue getMessageQueue() {
        return this.mq;
    }
}
