package com.taobao.notify.remotingclient.processor;

import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.message.BytesMessage;
import com.taobao.notify.message.Message;
import com.taobao.notify.message.StringMessage;
import com.taobao.notify.remotingclient.MessageListener;
import com.taobao.notify.remotingclient.MessageListenerAsync;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.remotingclient.concurrent.MarkedRunnable;
import com.taobao.notify.remotingclient.logging.LoggingService;
import com.taobao.notify.transferobject.ReceiveMessageResponse;
import com.taobao.notify.utils.ConcurrentHashSet;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.Profiler;
import com.taobao.remoting.RequestProcessor;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/taobao/notify/remotingclient/processor/MessageProcessorAsync.class */
public abstract class MessageProcessorAsync<T extends Message> extends MessageProcessor<T> {
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(MessageProcessorAsync.class);
    private LoggingService loggingService;
    private final boolean async;
    private final Collection<Integer> rejectedChannels;
    private final long printTPLogInterval = 2000;
    private AtomicLong printTpLogLastTime;

    /* loaded from: input_file:com/taobao/notify/remotingclient/processor/MessageProcessorAsync$BytesMessageProcessorAsync.class */
    public static class BytesMessageProcessorAsync extends MessageProcessorAsync<BytesMessage> {
        public BytesMessageProcessorAsync(NotifyGroupManager notifyGroupManager, ThreadPoolExecutor threadPoolExecutor, boolean z) {
            super(notifyGroupManager, threadPoolExecutor, z);
        }

        public Class<BytesMessage> interest() {
            return BytesMessage.class;
        }

        @Override // com.taobao.notify.remotingclient.processor.MessageProcessorAsync, com.taobao.notify.remotingclient.processor.MessageProcessor
        public /* bridge */ /* synthetic */ void handleRequest(Object obj, RequestProcessor.AppResponseOutput appResponseOutput) {
            super.handleRequest((BytesMessageProcessorAsync) obj, appResponseOutput);
        }
    }

    /* loaded from: input_file:com/taobao/notify/remotingclient/processor/MessageProcessorAsync$StringMessageProcessorAsync.class */
    public static class StringMessageProcessorAsync extends MessageProcessorAsync<StringMessage> {
        public StringMessageProcessorAsync(NotifyGroupManager notifyGroupManager, ThreadPoolExecutor threadPoolExecutor, boolean z) {
            super(notifyGroupManager, threadPoolExecutor, z);
        }

        public Class<StringMessage> interest() {
            return StringMessage.class;
        }

        @Override // com.taobao.notify.remotingclient.processor.MessageProcessorAsync, com.taobao.notify.remotingclient.processor.MessageProcessor
        public /* bridge */ /* synthetic */ void handleRequest(Object obj, RequestProcessor.AppResponseOutput appResponseOutput) {
            super.handleRequest((StringMessageProcessorAsync) obj, appResponseOutput);
        }
    }

    public MessageProcessorAsync(NotifyGroupManager notifyGroupManager, ThreadPoolExecutor threadPoolExecutor, boolean z) {
        super(notifyGroupManager, threadPoolExecutor);
        this.loggingService = LoggingService.getInstance();
        this.rejectedChannels = new ConcurrentHashSet();
        this.printTPLogInterval = 2000L;
        this.printTpLogLastTime = new AtomicLong(-1L);
        this.async = z;
    }

    @Override // com.taobao.notify.remotingclient.processor.MessageProcessor
    public void handleRequest(T t, RequestProcessor.AppResponseOutput appResponseOutput) {
        innerHandleRequest(t, appResponseOutput);
    }

    protected void innerHandleRequest(T t, final RequestProcessor.AppResponseOutput appResponseOutput) {
        long j = this.printTpLogLastTime.get();
        long currentTimeMillis = System.currentTimeMillis();
        if (logger.isInfoEnabled() && currentTimeMillis > j + 2000 && this.printTpLogLastTime.compareAndSet(j, currentTimeMillis)) {
            logger.info(getExecutor());
        }
        String targetGroup = t.getTargetGroup();
        final Message processType = t.toProcessType();
        if (targetGroup == null) {
            throw new NullPointerException("Null targetGroup. \r\nMessage: \r\n" + processType.toStringWithoutBody());
        }
        NotifyGroup group = this.notifyGroupManager.getGroup(targetGroup);
        MessageListener messageListener = null;
        if (null != group) {
            messageListener = group.getMsgListener();
        }
        if (null == messageListener) {
            responseOutput("订阅者没有设置MessageListener", ReceiveMessageResponse.Result.NO_LISTENER, processType, null, true, appResponseOutput);
            return;
        }
        final MessageStatus messageStatus = new MessageStatus();
        if (!(messageListener instanceof MessageListenerAsync)) {
            messageReceive(messageStatus, processType, messageListener, appResponseOutput);
            return;
        }
        final MessageListenerAsync messageListenerAsync = (MessageListenerAsync) messageListener;
        try {
            final Object preReceive = messageListenerAsync.preReceive(processType, messageStatus);
            if (!this.async) {
                boolean z = true;
                try {
                    z = messageReceive(messageStatus, processType, messageListener, appResponseOutput);
                    messageListenerAsync.postReceive(processType, messageStatus, z);
                    return;
                } catch (Throwable th) {
                    messageListenerAsync.postReceive(processType, messageStatus, z);
                    throw th;
                }
            }
            MarkedRunnable markedRunnable = null;
            boolean z2 = true;
            try {
                try {
                    markedRunnable = new MarkedRunnable() { // from class: com.taobao.notify.remotingclient.processor.MessageProcessorAsync.1
                        @Override // java.lang.Runnable
                        public void run() {
                            boolean z3 = true;
                            try {
                                z3 = MessageProcessorAsync.this.messageReceive(messageStatus, processType, messageListenerAsync, appResponseOutput);
                                messageListenerAsync.postReceive(processType, messageStatus, z3);
                            } catch (Throwable th2) {
                                messageListenerAsync.postReceive(processType, messageStatus, z3);
                                throw th2;
                            }
                        }

                        public Object getContext() {
                            return preReceive;
                        }

                        public Map<Integer, Integer> getChannelInfo() {
                            return null;
                        }

                        public Collection<Integer> getRejectedChannels() {
                            return MessageProcessorAsync.this.rejectedChannels;
                        }
                    };
                    getExecutor().execute(markedRunnable);
                    if (1 == 0 || markedRunnable == null || markedRunnable.getMark() == null) {
                        return;
                    }
                    this.rejectedChannels.remove(markedRunnable.getMark());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Channel=" + markedRunnable.getMark() + ", context=" + preReceive);
                    }
                } catch (RejectedExecutionException e) {
                    if (markedRunnable != null) {
                        z2 = false;
                        this.rejectedChannels.add(markedRunnable.getMark());
                        logger.error("Rejected channel=" + markedRunnable.getMark());
                    }
                    onRejectedExecutionException((MessageProcessorAsync<T>) t, appResponseOutput);
                    if (!z2 || markedRunnable == null || markedRunnable.getMark() == null) {
                        return;
                    }
                    this.rejectedChannels.remove(markedRunnable.getMark());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Channel=" + markedRunnable.getMark() + ", context=" + preReceive);
                    }
                }
            } catch (Throwable th2) {
                if (z2 && markedRunnable != null && markedRunnable.getMark() != null) {
                    this.rejectedChannels.remove(markedRunnable.getMark());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Channel=" + markedRunnable.getMark() + ", context=" + preReceive);
                    }
                }
                throw th2;
            }
        } catch (RuntimeException e2) {
            messageStatus.setRollbackOnly();
            messageStatus.setReason("preReceive异常，消息将重试" + e2);
            messageReceive(messageStatus, processType, messageListenerAsync, appResponseOutput);
        }
    }

    protected boolean messageReceive(MessageStatus messageStatus, Message message, MessageListener messageListener, RequestProcessor.AppResponseOutput appResponseOutput) {
        Profiler.enter("notifyClient.onReceiveMessage");
        boolean z = true;
        try {
            try {
                if (!messageStatus.isRollbackOnly()) {
                    messageListener.receiveMessage(message, messageStatus);
                }
                if (messageStatus.isRollbackOnly()) {
                    String reason = messageStatus.getReason();
                    if (null == reason || reason.length() == 0) {
                        reason = "消息经处理后，被订阅者回滚。但没有给出原因。";
                        logger.warn(LogPrefix + "消息经处理后，被订阅者回滚。但没有给出原因，请设置原因，方便追踪问题!!!");
                    }
                    responseOutput(ReceiveMessageResponse.Result.ERROR + ":" + reason, ReceiveMessageResponse.Result.ERROR, message, null, false, appResponseOutput);
                } else {
                    z = !responseOutput(null, ReceiveMessageResponse.Result.SUCCESS, message, null, false, appResponseOutput);
                }
            } catch (RuntimeException e) {
                responseOutput("处理消息发生异常" + e.getMessage(), ReceiveMessageResponse.Result.EXCEPT, message, e, true, appResponseOutput);
            }
            return z;
        } finally {
            Profiler.release("notifyClient.onReceiveMessage");
        }
    }

    protected boolean responseOutput(String str, ReceiveMessageResponse.Result result, Message message, Exception exc, boolean z, RequestProcessor.AppResponseOutput appResponseOutput) {
        ReceiveMessageResponse receiveMessageResponse = new ReceiveMessageResponse();
        if (str != null && str.length() > 0) {
            receiveMessageResponse.setErrorMsg(str);
            if (z) {
                logger.error(LogPrefix + str);
            }
        }
        String logMessage = this.loggingService.logMessage(message);
        if (exc == null) {
            this.loggingService.logResult(logMessage, message, str);
        } else {
            this.loggingService.logResult(logMessage, message, str, exc);
        }
        receiveMessageResponse.setResult(result.toByte());
        try {
            appResponseOutput.write(receiveMessageResponse);
            return true;
        } catch (Exception e) {
            this.loggingService.logResult(logMessage, message, "发送处理消息结果失败", e);
            logger.error(LogPrefix + "发送处理消息结果失败", e);
            return false;
        }
    }
}
