/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.client;

import com.alipay.sofa.sofamq.client.ClientLoggerUtil;
import com.alipay.sofa.sofamq.client.LdcSendMessageHook;
import com.alipay.sofa.sofamq.client.MQClientRPCHook;
import com.alipay.sofa.sofamq.client.MQUtil;
import com.alipay.sofa.sofamq.client.MessageBuilderImpl;
import com.alipay.sofa.sofamq.client.Metrics;
import com.alipay.sofa.sofamq.client.SofaMQClientAbstract;
import com.alipay.sofa.sofamq.client.trace.SendMessageContext;
import com.alipay.sofa.sofamq.client.trace.TraceUtils;
import com.alipay.sofa.sofamq.client.trace.common.TraceDispatcherType;
import com.alipay.sofa.sofamq.client.trace.dispatch.NameServerAddressSetter;
import com.alipay.sofa.sofamq.client.trace.dispatch.impl.AsyncArrayDispatcher;
import com.alipay.sofa.sofamq.client.trace.hook.ClientSendMessageHookImpl;
import com.alipay.sofa.sofamq.client.util.DevGroupUtils;
import com.alipay.sofa.sofamq.org.shade.apache.commons.lang3.StringUtils;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.exception.MQBrokerException;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.exception.MQClientException;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.CommunicationMode;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.DefaultMQProducer;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.MessageQueueSelector;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.SendResult;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.UtilAll;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.Message;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageClientIDSetter;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageQueue;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageType;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.logging.InternalLogger;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.remoting.RPCHook;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.remoting.exception.RemotingConnectException;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import io.openmessaging.api.MessageBuilder;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.Producer;
import io.openmessaging.api.ProducerBase;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.exception.OMSRuntimeException;
import io.openmessaging.api.order.OrderProducer;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;

public class SofaMQProducerAbstract
extends SofaMQClientAbstract
implements ProducerBase,
Producer,
OrderProducer {
    protected static final InternalLogger LOGGER = ClientLoggerUtil.getClientLogger();
    protected DefaultMQProducer defaultMQProducer;
    protected String group;

    public SofaMQProducerAbstract(Properties properties) {
        super(properties);
        this.initInnerProducer(properties);
    }

    protected void initInnerProducer(Properties properties) {
        String producerGroup;
        this.group = producerGroup = properties.getProperty("groupId");
        if (StringUtils.isEmpty(producerGroup)) {
            producerGroup = "__SOFAMQ_PRODUCER_DEFAULT_GROUP";
        }
        this.defaultMQProducer = this.newProducer(producerGroup);
        this.defaultMQProducer.setProducerGroup(producerGroup);
        boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty("isVipChannelEnabled", "false"));
        this.defaultMQProducer.setVipChannelEnabled(isVipChannelEnabled);
        if (properties.containsKey("sendMsgTimeoutMills")) {
            this.defaultMQProducer.setSendMsgTimeout(Integer.parseInt(properties.get("sendMsgTimeoutMills").toString()));
        } else {
            this.defaultMQProducer.setSendMsgTimeout(5000);
        }
        if (properties.containsKey("exactlyOnceDelivery")) {
            this.defaultMQProducer.setAddExtendUniqInfo(Boolean.parseBoolean(properties.get("exactlyOnceDelivery").toString()));
        }
        String instanceName = this.buildInstanceName();
        this.defaultMQProducer.setInstanceName(instanceName);
        this.defaultMQProducer.setNamesrvAddr(this.getNameServerAddr());
        this.defaultMQProducer.setMaxMessageSize(0x400000);
        String msgTraceSwitch = properties.getProperty("msgTraceSwitch");
        if (!UtilAll.isBlank(msgTraceSwitch) && !Boolean.parseBoolean(msgTraceSwitch)) {
            LOGGER.info("MQ Client Disable the Trace Hook!");
        } else {
            try {
                Properties tempProperties = new Properties();
                tempProperties.put("ACCESS_KEY", this.sessionCredentials.getAccessKey());
                tempProperties.put("SECRET_KEY", this.sessionCredentials.getSecretKey());
                tempProperties.put("MAX_MSG_SIZE", "128000");
                tempProperties.put("ASYNC_BUFFER_SIZE", "2048");
                tempProperties.put("MAX_BATCH_NUM", "100");
                tempProperties.put("INSTANCE_NAME", "PID_CLIENT_INNER_TRACE_PRODUCER" + (StringUtils.isBlank(this.cell) ? "" : "_" + this.cell));
                tempProperties.put("DispatcherType", TraceDispatcherType.PRODUCER.name());
                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, this.sessionCredentials, new NameServerAddressSetter(){

                    @Override
                    public String getNewNameServerAddress() {
                        return SofaMQProducerAbstract.this.getNameServerAddr();
                    }
                });
                dispatcher.setHostProducer(this.defaultMQProducer.getDefaultMQProducerImpl());
                this.traceDispatcher = dispatcher;
                this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(new ClientSendMessageHookImpl(this.traceDispatcher));
            }
            catch (Throwable e) {
                LOGGER.error("system mqtrace hook init failed ,maybe can't send message trace data.", e);
            }
        }
        this.registerSendMessageHook(properties);
    }

    protected void registerSendMessageHook(Properties properties) {
        if (this.ldc) {
            this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(new LdcSendMessageHook(properties));
        }
    }

    protected DefaultMQProducer newProducer(String group) {
        return new LocalFirstMQProducer(this.getNamespace(), group, new MQClientRPCHook(this.sessionCredentials));
    }

    @Override
    public void start() {
        try {
            if (this.started.compareAndSet(false, true)) {
                this.defaultMQProducer.start();
                super.start();
                LOGGER.info("start producer[{}] with [props:{}]", (Object)this.group, (Object)this.properties);
            }
        }
        catch (Exception e) {
            String errorMsg = String.format("start producer with [props:%s] fail, errorMsg=%s", this.properties, e.getMessage());
            LOGGER.error(errorMsg, e);
            throw new OMSRuntimeException(errorMsg, (Throwable)e);
        }
    }

    @Override
    public void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            this.defaultMQProducer.shutdown();
            LOGGER.info("shutdown producer[{}] with [props:{}]", (Object)this.group, (Object)this.properties);
        }
        super.shutdown();
    }

    @Override
    protected void updateNameServerAddr(String newAddrs) {
        this.defaultMQProducer.setNamesrvAddr(newAddrs);
        this.defaultMQProducer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().updateNameServerAddressList(newAddrs);
    }

    protected OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) {
        if (e instanceof MQClientException) {
            if (e.getCause() != null) {
                if (e.getCause() instanceof RemotingConnectException) {
                    return new OMSRuntimeException(String.format("Connect broker failed, Topic=%s, msgId=%s. [props:%s]", topic, msgId, this.properties));
                }
                if (e.getCause() instanceof RemotingTimeoutException) {
                    return new OMSRuntimeException(String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s. [props:%s]", this.defaultMQProducer.getSendMsgTimeout(), topic, msgId, this.properties));
                }
                if (e.getCause() instanceof MQBrokerException) {
                    MQBrokerException excep = (MQBrokerException)e.getCause();
                    return new OMSRuntimeException(String.format("Receive a broker exception, Topic=%s, msgId=%s, %s. [props:%s]", topic, msgId, excep.getErrorMessage(), this.properties));
                }
            } else {
                MQClientException excep = (MQClientException)e;
                if (-1 == excep.getResponseCode()) {
                    return new OMSRuntimeException(String.format("Topic does not exist, Topic=%s, msgId=%s. [props:%s]", topic, msgId, this.properties));
                }
                if (13 == excep.getResponseCode()) {
                    return new OMSRuntimeException(String.format("ONS Client check message exception, Topic=%s, msgId=%s. [props:%s]", topic, msgId, this.properties));
                }
            }
        } else if (e instanceof OMSRuntimeException) {
            throw (OMSRuntimeException)e;
        }
        return new OMSRuntimeException("defaultMQProducer send exception", e);
    }

    @Override
    public <T> MessageBuilder<T> messageBuilder() {
        if (this.schemaProvider != null) {
            return new MessageBuilderImpl(this.schemaValidator);
        }
        return new MessageBuilderImpl();
    }

    @Override
    public io.openmessaging.api.SendResult send(io.openmessaging.api.Message message) {
        this.checkProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
        Message msgRMQ = MQUtil.msgConvert(message);
        DevGroupUtils.injectDevGroupOnDemand(msgRMQ);
        SendMessageContext sendMessageContext = TraceUtils.producerSendStart(msgRMQ).setGroup(this.group);
        try {
            SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ);
            this.checkSendResultRMQ(sendResultRMQ);
            sendMessageContext.setSendResult(sendResultRMQ);
            message.setMsgID(sendResultRMQ.getMsgId());
            io.openmessaging.api.SendResult sendResult = new io.openmessaging.api.SendResult();
            sendResult.setTopic(sendResultRMQ.getMessageQueue().getTopic());
            sendResult.setMessageId(sendResultRMQ.getMsgId());
            io.openmessaging.api.SendResult sendResult2 = sendResult;
            return sendResult2;
        }
        catch (Exception e) {
            sendMessageContext.setException(e);
            LOGGER.error(String.format("Send message Exception, %s", message), e);
            throw this.checkProducerException(message.getTopic(), message.getMsgID(), e);
        }
        finally {
            TraceUtils.producerSendEnd(sendMessageContext);
            Metrics.send(sendMessageContext);
        }
    }

    @Override
    public void sendOneway(io.openmessaging.api.Message message) {
        this.checkProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
        Message msgRMQ = MQUtil.msgConvert(message);
        DevGroupUtils.injectDevGroupOnDemand(msgRMQ);
        SendMessageContext sendMessageContext = TraceUtils.producerSendStart(msgRMQ).setGroup(this.group).setCommunicationMode(CommunicationMode.ONEWAY);
        try {
            this.defaultMQProducer.sendOneway(msgRMQ);
            message.setMsgID(MessageClientIDSetter.getUniqID(msgRMQ));
        }
        catch (Exception e) {
            sendMessageContext.setException(e);
            LOGGER.error(String.format("Send message oneway Exception, %s", message), e);
            throw this.checkProducerException(message.getTopic(), message.getMsgID(), e);
        }
        finally {
            TraceUtils.producerSendEnd(sendMessageContext);
            Metrics.send(sendMessageContext);
        }
    }

    @Override
    public void sendAsync(io.openmessaging.api.Message message, SendCallback sendCallback) {
        this.checkProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
        Message msgRMQ = MQUtil.msgConvert(message);
        DevGroupUtils.injectDevGroupOnDemand(msgRMQ);
        SendMessageContext sendMessageContext = TraceUtils.producerSendStart(msgRMQ).setGroup(this.group).setCommunicationMode(CommunicationMode.ASYNC);
        try {
            this.defaultMQProducer.send(msgRMQ, this.sendCallbackConvert(message, sendCallback, sendMessageContext));
            message.setMsgID(MessageClientIDSetter.getUniqID(msgRMQ));
        }
        catch (Exception e) {
            TraceUtils.producerSendEnd(sendMessageContext.setException(e));
            Metrics.send(sendMessageContext);
            LOGGER.error(String.format("Send message async Exception, %s", message), e);
            throw this.checkProducerException(message.getTopic(), message.getMsgID(), e);
        }
    }

    @Override
    public void setCallbackExecutor(ExecutorService callbackExecutor) {
        this.defaultMQProducer.setCallbackExecutor(callbackExecutor);
    }

    private com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.SendCallback sendCallbackConvert(final io.openmessaging.api.Message message, final SendCallback sendCallback, final SendMessageContext sendMessageContext) {
        com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.SendCallback rmqSendCallback = new com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.SendCallback(){

            @Override
            public void onSuccess(SendResult sendResult) {
                try {
                    SofaMQProducerAbstract.this.checkSendResultRMQ(sendResult);
                    TraceUtils.producerSendEnd(sendMessageContext.setSendResult(sendResult));
                    Metrics.send(sendMessageContext);
                    sendCallback.onSuccess(SofaMQProducerAbstract.this.sendResultConvert(sendResult));
                }
                catch (OMSRuntimeException e) {
                    this.onException(e);
                }
            }

            @Override
            public void onException(Throwable e) {
                TraceUtils.producerSendEnd(sendMessageContext.setException(e));
                Metrics.send(sendMessageContext);
                String topic = message.getTopic();
                String msgId = message.getMsgID();
                OMSRuntimeException onsEx = SofaMQProducerAbstract.this.checkProducerException(topic, msgId, e);
                OnExceptionContext context = new OnExceptionContext();
                context.setTopic(topic);
                context.setMessageId(msgId);
                context.setException(onsEx);
                sendCallback.onException(context);
            }
        };
        return rmqSendCallback;
    }

    private io.openmessaging.api.SendResult sendResultConvert(SendResult rmqSendResult) {
        io.openmessaging.api.SendResult sendResult = new io.openmessaging.api.SendResult();
        sendResult.setTopic(rmqSendResult.getMessageQueue().getTopic());
        sendResult.setMessageId(rmqSendResult.getMsgId());
        return sendResult;
    }

    @Override
    public io.openmessaging.api.SendResult send(io.openmessaging.api.Message message, String shardingKey) {
        if (UtilAll.isBlank(shardingKey)) {
            throw new OMSRuntimeException("'shardingKey' is blank.");
        }
        message.setShardingKey(shardingKey);
        this.checkProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
        Message msgRMQ = MQUtil.msgConvert(message);
        DevGroupUtils.injectDevGroupOnDemand(msgRMQ);
        SendMessageContext sendMessageContext = TraceUtils.producerSendStart(msgRMQ).setGroup(this.group).setMsgType(MessageType.Order_Msg);
        try {
            SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ, new MessageQueueSelector(){

                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object shardingKey) {
                    int select = Math.abs(shardingKey.hashCode());
                    if (select < 0) {
                        select = 0;
                    }
                    return mqs.get(select % mqs.size());
                }
            }, shardingKey);
            this.checkSendResultRMQ(sendResultRMQ);
            sendMessageContext.setSendResult(sendResultRMQ);
            message.setMsgID(sendResultRMQ.getMsgId());
            io.openmessaging.api.SendResult sendResult = new io.openmessaging.api.SendResult();
            sendResult.setTopic(message.getTopic());
            sendResult.setMessageId(sendResultRMQ.getMsgId());
            io.openmessaging.api.SendResult sendResult2 = sendResult;
            return sendResult2;
        }
        catch (Exception e) {
            sendMessageContext.setException(e);
            throw new OMSRuntimeException("defaultMQProducer send order exception", (Throwable)e);
        }
        finally {
            TraceUtils.producerSendEnd(sendMessageContext);
            Metrics.send(sendMessageContext);
        }
    }

    static class LocalFirstTopicPublishInfo
    extends TopicPublishInfo {
        public LocalFirstTopicPublishInfo(TopicPublishInfo topicPublishInfo, String dataCenter) {
            this.setOrderTopic(topicPublishInfo.isOrderTopic());
            this.setHaveTopicRouterInfo(topicPublishInfo.isHaveTopicRouterInfo());
            this.setSendWhichQueue(topicPublishInfo.getSendWhichQueue());
            this.setTopicRouteData(topicPublishInfo.getTopicRouteData());
            LinkedList<MessageQueue> messageQueues = new LinkedList<MessageQueue>();
            for (MessageQueue messageQueue : topicPublishInfo.getMessageQueueList()) {
                if (!messageQueue.getBrokerName().startsWith(dataCenter + "@")) continue;
                messageQueues.add(messageQueue);
            }
            if (messageQueues.size() == 0) {
                messageQueues.addAll(topicPublishInfo.getMessageQueueList());
            }
            this.setMessageQueueList(Collections.unmodifiableList(messageQueues));
        }
    }

    class LocalFirstMQProducerImpl
    extends DefaultMQProducerImpl {
        public LocalFirstMQProducerImpl(DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
            super(defaultMQProducer, rpcHook);
        }

        @Override
        protected TopicPublishInfo tryToFindTopicPublishInfo(String topic) {
            TopicPublishInfo topicPublishInfo = super.tryToFindTopicPublishInfo(topic);
            if (topicPublishInfo == null) {
                return null;
            }
            return new LocalFirstTopicPublishInfo(topicPublishInfo, SofaMQProducerAbstract.this.dataCenter);
        }
    }

    class LocalFirstMQProducer
    extends DefaultMQProducer {
        public LocalFirstMQProducer(String namespace, String producerGroup, RPCHook rpcHook) {
            this.namespace = namespace;
            this.producerGroup = producerGroup;
            this.defaultMQProducerImpl = new LocalFirstMQProducerImpl(this, rpcHook);
        }
    }
}

