package com.taobao.notify.remotingclient.processor;

import com.taobao.eagleeye.EagleEye;
import com.taobao.gecko.core.command.Command;
import com.taobao.gecko.core.command.ResponseStatus;
import com.taobao.gecko.core.command.kernel.BooleanAckCommand;
import com.taobao.gecko.service.Connection;
import com.taobao.gecko.service.RequestProcessor;
import com.taobao.notify.client.manager.NotifyGroup;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.diagnosis.manager.NotifyDiagnosisRecordManager;
import com.taobao.notify.message.Message;
import com.taobao.notify.message.MessageAccessor;
import com.taobao.notify.remoting.core.command.request.DeliverMessageCommand;
import com.taobao.notify.remoting.core.command.request.MessageCommitRollBackCommand;
import com.taobao.notify.remoting.core.command.request.SendMessageCommand;
import com.taobao.notify.remoting.core.command.response.MessageAckCommand;
import com.taobao.notify.remoting.core.command.response.NotifyBooleanAckCommand;
import com.taobao.notify.remotingclient.MessageListener;
import com.taobao.notify.remotingclient.MessageStatus;
import com.taobao.notify.remotingclient.logging.LoggingService;
import com.taobao.notify.utils.LoggerPrefix;
import java.util.HashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/taobao/notify/remotingclient/processor/DeliverMessageProcessor.class */
public class DeliverMessageProcessor implements RequestProcessor<DeliverMessageCommand> {
    static final Logger logger = Logger.getLogger(DeliverMessageProcessor.class);
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(DeliverMessageProcessor.class);
    private final NotifyGroupManager notifyGroupManager;
    private final LoggingService loggingService = LoggingService.getInstance();
    private final ThreadPoolExecutor deliverMessageWorkTP;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.taobao.notify.remotingclient.processor.DeliverMessageProcessor$2, reason: invalid class name */
    /* loaded from: input_file:com/taobao/notify/remotingclient/processor/DeliverMessageProcessor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status = new int[MessageCommitRollBackCommand.Status.values().length];

        static {
            try {
                $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[MessageCommitRollBackCommand.Status.ROLLBACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[MessageCommitRollBackCommand.Status.COMMITTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[MessageCommitRollBackCommand.Status.NOACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public DeliverMessageProcessor(NotifyGroupManager notifyGroupManager, ThreadPoolExecutor threadPoolExecutor) {
        if (null == threadPoolExecutor) {
            logger.error(LogPrefix + "需要为DeliverMessageProcessor设置工作线程池,如果是集成测试环境，请忽略此信息");
        }
        this.notifyGroupManager = notifyGroupManager;
        this.deliverMessageWorkTP = threadPoolExecutor;
    }

    public ThreadPoolExecutor getExecutor() {
        return this.deliverMessageWorkTP;
    }

    public void handleRequest(final DeliverMessageCommand deliverMessageCommand, final Connection connection) {
        MessageListener messageListener;
        final Message processType = deliverMessageCommand.getMessage().toProcessType();
        final MessageStatus messageStatus = new MessageStatus(processType);
        String targetGroup = deliverMessageCommand.getMessage().getTargetGroup();
        if (targetGroup == null) {
            throw new NullPointerException("Null targetGroup. \r\nMessage: \r\n" + processType.toStringWithoutBody());
        }
        final String logMessage = this.loggingService.logMessage(processType);
        NotifyGroup group = this.notifyGroupManager.getGroup(targetGroup);
        ThreadPoolExecutor threadPoolExecutor = null;
        if (null != group) {
            messageListener = group.getMsgListener();
            threadPoolExecutor = group.getDeliverMessageWorkTP();
        } else {
            messageListener = null;
        }
        if (threadPoolExecutor == null) {
            innerHandlerRequest(deliverMessageCommand, connection, processType, messageStatus, logMessage, messageListener);
            return;
        }
        try {
            final MessageListener messageListener2 = messageListener;
            threadPoolExecutor.execute(new Runnable() { // from class: com.taobao.notify.remotingclient.processor.DeliverMessageProcessor.1
                @Override // java.lang.Runnable
                public void run() {
                    DeliverMessageProcessor.this.innerHandlerRequest(deliverMessageCommand, connection, processType, messageStatus, logMessage, messageListener2);
                }
            });
        } catch (RejectedExecutionException e) {
            responseThreadPoolBusy(deliverMessageCommand, connection, processType, logMessage);
        }
    }

    private void responseThreadPoolBusy(DeliverMessageCommand deliverMessageCommand, Connection connection, Message message, String str) {
        BooleanAckCommand createBooleanAckCommand = connection.getRemotingContext().getCommandFactory().createBooleanAckCommand(deliverMessageCommand.getRequestHeader(), ResponseStatus.THREADPOOL_BUSY, "groupId:[" + message.getTargetGroup() + "]的deliverMessageWorkTP线程池繁忙");
        logger.warn("groupId:[" + message.getTargetGroup() + "]的deliverMessageWorkTP线程池繁忙");
        response(connection, message, str, createBooleanAckCommand);
    }

    private void recordEagleEyeLog(Message message, int i) {
        StringBuilder sb = new StringBuilder();
        sb.append("recv~").append(MessageAccessor.getOriginType(message).toString()).append(":").append(message.getTopic()).append(":").append(message.getMessageType() == null ? "" : message.getMessageType()).append(":").append(message.getGroupId());
        EagleEye.rpcServerRecv("Notify", sb.toString());
        EagleEye.responseSize(i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void innerHandlerRequest(DeliverMessageCommand deliverMessageCommand, Connection connection, Message message, MessageStatus messageStatus, String str, MessageListener messageListener) {
        MessageAckCommand messageAckCommand = null;
        try {
            String stringProperty = message.getStringProperty("eagleTraceId");
            String stringProperty2 = message.getStringProperty("eagleRpcId");
            if (StringUtils.isNotBlank(stringProperty) && StringUtils.isNotBlank(stringProperty2)) {
                HashMap hashMap = new HashMap();
                hashMap.put("traceId", stringProperty);
                hashMap.put("rpcId", EagleEye.generateMulticastRpcId(stringProperty2, message.getTargetGroup()));
                hashMap.put("eagleEyeUserData", message.getStringProperty("eagleData"));
                EagleEye.setRpcContext(hashMap);
            } else {
                String generateTraceId = EagleEye.generateTraceId((String) null);
                HashMap hashMap2 = new HashMap();
                hashMap2.put("traceId", generateTraceId);
                hashMap2.put("rpcId", EagleEye.generateMulticastRpcId("9", message.getTargetGroup()));
                EagleEye.setRpcContext(hashMap2);
            }
            recordEagleEyeLog(message, deliverMessageCommand.getTotalBodyLength());
            EagleEye.remoteIp(connection.getRemoteSocketAddress().getAddress().getHostAddress());
        } catch (Throwable th) {
        }
        try {
            if (null != messageListener) {
                String str2 = null;
                ResponseStatus responseStatus = ResponseStatus.NO_ERROR;
                try {
                    try {
                        messageListener.receiveMessage(message, messageStatus);
                        if (messageStatus.getStatus() != null) {
                            switch (AnonymousClass2.$SwitchMap$com$taobao$notify$remoting$core$command$request$MessageCommitRollBackCommand$Status[messageStatus.getStatus().ordinal()]) {
                                case 1:
                                    String reason = messageStatus.getReason();
                                    if (null == reason || "".equals(reason)) {
                                        reason = "消息经处理后，被订阅者回滚。但没有给出原因。";
                                        logger.warn(LogPrefix + "消息经处理后，被订阅者回滚。但没有给出原因，请设置原因，方便追踪问题!!!");
                                    }
                                    str2 = reason;
                                    responseStatus = ResponseStatus.ERROR;
                                    this.loggingService.logResult(str, message, responseStatus + ":" + reason);
                                    break;
                                case 2:
                                case 3:
                                    responseStatus = ResponseStatus.NO_ERROR;
                                    this.loggingService.logResult(str, message, responseStatus);
                                    messageAckCommand = new MessageAckCommand(deliverMessageCommand, responseStatus, (byte[]) null, message.getMessageId());
                                    break;
                            }
                        } else if (messageStatus.isRollbackOnly()) {
                            String reason2 = messageStatus.getReason();
                            if (null == reason2 || "".equals(reason2)) {
                                reason2 = "消息经处理后，被订阅者回滚。但没有给出原因。";
                                logger.warn(LogPrefix + "消息经处理后，被订阅者回滚。但没有给出原因，请设置原因，方便追踪问题!!!");
                            }
                            str2 = reason2;
                            responseStatus = ResponseStatus.ERROR;
                            this.loggingService.logResult(str, message, responseStatus + ":" + reason2);
                        } else {
                            responseStatus = ResponseStatus.NO_ERROR;
                            this.loggingService.logResult(str, message, responseStatus);
                            messageAckCommand = new MessageAckCommand(deliverMessageCommand, responseStatus, (byte[]) null, message.getMessageId());
                        }
                        if (null != str2) {
                            messageAckCommand = new NotifyBooleanAckCommand(deliverMessageCommand, responseStatus, str2.toString());
                        }
                    } finally {
                        if (false) {
                            new NotifyBooleanAckCommand(deliverMessageCommand, responseStatus, str2.toString());
                        }
                    }
                } catch (RuntimeException e) {
                    this.loggingService.logResult(str, message, "处理消息发生异常", e);
                    logger.error(LogPrefix + "处理消息发生异常", e);
                    String str3 = "发生异常：" + e.getMessage();
                    ResponseStatus responseStatus2 = ResponseStatus.EXCEPTION;
                    if (null != str3) {
                        messageAckCommand = new NotifyBooleanAckCommand(deliverMessageCommand, responseStatus2, str3.toString());
                    }
                }
            } else {
                this.loggingService.logResult(str, message, "订阅者没有设置MessageListener,topic=" + message.getTopic() + ",type=" + message.getMessageType() + ",groupId=" + message.getTargetGroup());
                logger.error(LogPrefix + "订阅者没有设置MessageListener,topic=" + message.getTopic() + ",type=" + message.getMessageType() + ",groupId=" + message.getTargetGroup());
                messageAckCommand = new NotifyBooleanAckCommand(deliverMessageCommand, ResponseStatus.UNKNOWN, "订阅者没有设置MessageListener, Topic为：" + message.getTopic() + " MessageType为：" + message.getMessageType() + ",groupId=" + message.getTargetGroup());
            }
            response(connection, message, str, messageAckCommand);
            NotifyDiagnosisRecordManager.getInstance().recordRecentReceivedMessage(message, messageAckCommand);
            if (messageStatus.getReplyMessage() != null) {
                SendMessageCommand sendMessageCommand = new SendMessageCommand(messageStatus.getReplyMessage());
                response(connection, message, str, sendMessageCommand);
                if (logger.isDebugEnabled()) {
                    logger.debug("本消息需要回执,回执信息:\n[replyId:" + sendMessageCommand.getMessage().getReplyId() + "\nreplyTopic:" + sendMessageCommand.getMessage().getTopic() + "\nreplyMessageType:" + sendMessageCommand.getMessage().getMessageType() + "]\n");
                }
            }
        } finally {
            EagleEye.rpcServerSend(3);
        }
    }

    private void response(Connection connection, Message message, String str, Command command) {
        try {
            connection.response(command);
        } catch (Exception e) {
            this.loggingService.logResult(str, message, "发送消息失败", e);
            logger.error(LogPrefix + "发送消息失败", e);
        }
    }
}
