/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.client;

import com.alipay.sofa.sofamq.client.ClientLoggerUtil;
import com.alipay.sofa.sofamq.client.MQClientRPCHook;
import com.alipay.sofa.sofamq.client.MQUtil;
import com.alipay.sofa.sofamq.client.Metrics;
import com.alipay.sofa.sofamq.client.MultiGenericTransactionCheckerImpl;
import com.alipay.sofa.sofamq.client.MultiTransactionChecker;
import com.alipay.sofa.sofamq.client.MultiTransactionCheckerImpl;
import com.alipay.sofa.sofamq.client.SofaMQProducerAbstract;
import com.alipay.sofa.sofamq.client.exception.ConfigurationException;
import com.alipay.sofa.sofamq.client.trace.SendMessageContext;
import com.alipay.sofa.sofamq.client.trace.TraceUtils;
import com.alipay.sofa.sofamq.client.trace.common.TraceBean;
import com.alipay.sofa.sofamq.client.trace.common.TraceContext;
import com.alipay.sofa.sofamq.client.trace.common.TraceType;
import com.alipay.sofa.sofamq.client.util.DevGroupUtils;
import com.alipay.sofa.sofamq.org.shade.apache.commons.lang.StringUtils;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.Validators;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.CommunicationMode;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.DefaultMQProducer;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.LocalTransactionState;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.SendResult;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.TransactionCheckListener;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.TransactionMQProducer;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.Message;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageAccessor;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageExt;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageType;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.protocol.NamespaceUtil;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.logging.InternalLogger;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.remoting.RPCHook;
import io.openmessaging.api.GenericMessage;
import io.openmessaging.api.TransactionalResult;
import io.openmessaging.api.exception.OMSRuntimeException;
import io.openmessaging.api.transaction.GenericLocalTransactionChecker;
import io.openmessaging.api.transaction.GenericLocalTransactionExecuter;
import io.openmessaging.api.transaction.LocalTransactionChecker;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
import io.openmessaging.api.transaction.TransactionProducer;
import io.openmessaging.api.transaction.TransactionStatus;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class TransactionProducerImpl
extends SofaMQProducerAbstract
implements TransactionProducer {
    protected static final InternalLogger LOGGER = ClientLoggerUtil.getClientLogger();
    private static final long NOOP_CHECK_IMMUNITY_TIME = -1L;
    private static final long MAX_CHECK_IMMUNITY_TIME_IN_SECONDS = TimeUnit.MINUTES.toSeconds(10L);
    private static final long MIN_CHECK_IMMUNITY_TIME_IN_SECONDS = TimeUnit.MINUTES.toSeconds(1L);
    private DefaultMQProducerImpl defaultMQProducerImpl;
    private long checkImmunityTimeInSeconds;
    private LocalTransactionChecker transactionChecker;

    public TransactionProducerImpl(Properties properties, LocalTransactionChecker localTransactionChecker) {
        this(properties);
        this.transactionChecker = localTransactionChecker;
        this.setTransactionListener(localTransactionChecker);
    }

    public TransactionProducerImpl(Properties properties) {
        super(properties);
        this.checkImmunityTimeInSeconds = Long.valueOf(properties.getProperty("CHECK_IMMUNITY_TIME_IN_SECONDS", Long.toString(-1L)));
        if (this.checkImmunityTimeInSeconds != -1L) {
            this.checkImmunityTime(this.checkImmunityTimeInSeconds);
        }
    }

    private void setTransactionListener(final LocalTransactionChecker localTransactionChecker) {
        ((TransactionMQProducer)this.defaultMQProducer).setTransactionCheckListener(new TransactionCheckListener(){

            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                io.openmessaging.api.Message message = MQUtil.msgConvert(msg);
                message.setMsgID(msg.getMsgId());
                TransactionStatus check = localTransactionChecker.check(message);
                return TransactionProducerImpl.this.handleTransactionCheckResult(message, check);
            }
        });
    }

    @Override
    protected DefaultMQProducer newProducer(String group) {
        return new LocalFirstTransactionMQProducer(this.getNamespace(), group, new MQClientRPCHook(this.sessionCredentials));
    }

    @Override
    public synchronized <T> boolean registerGenericLocalTransactionChecker(String topic, GenericLocalTransactionChecker<T> sourceChecker) {
        if (this.transactionChecker == null) {
            this.transactionChecker = new MultiGenericTransactionCheckerImpl();
            this.setTransactionListener(this.transactionChecker);
        }
        if (!(this.transactionChecker instanceof MultiGenericTransactionCheckerImpl)) {
            throw new IllegalStateException(String.format("transactionChecker for %s do not support generic transactionChecker", topic));
        }
        MultiGenericTransactionCheckerImpl multiTransactionChecker = (MultiGenericTransactionCheckerImpl)this.transactionChecker;
        if (this.schemaValidator == null) {
            throw new ConfigurationException(String.format("schema is not enabled, topic=%s", topic));
        }
        if (!byte[].class.equals(sourceChecker.payloadClass())) {
            this.schemaValidator.validateWriteSchema(topic, sourceChecker.payloadClass());
        }
        multiTransactionChecker.registerGenericTransactionChecker(topic, sourceChecker, this.schemaProvider);
        return true;
    }

    @Override
    public boolean registerLocalTransactionChecker(String topic, LocalTransactionChecker sourceChecker) {
        if (this.transactionChecker == null) {
            this.transactionChecker = new MultiTransactionCheckerImpl();
            this.setTransactionListener(this.transactionChecker);
        }
        if (!(this.transactionChecker instanceof MultiTransactionCheckerImpl)) {
            throw new IllegalStateException(String.format("transactionChecker for %s do not support generic transactionChecker", topic));
        }
        MultiTransactionCheckerImpl multiTransactionChecker = (MultiTransactionCheckerImpl)this.transactionChecker;
        if (this.schemaValidator == null) {
            throw new ConfigurationException(String.format("schema is not enabled, topic=%s", topic));
        }
        multiTransactionChecker.registerTransactionChecker(topic, sourceChecker);
        return true;
    }

    private LocalTransactionState handleTransactionCheckResult(io.openmessaging.api.Message message, TransactionStatus check) {
        if (TransactionStatus.CommitTransaction == check) {
            this.sendTraceData(message, true);
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        if (TransactionStatus.RollbackTransaction == check) {
            this.sendTraceData(message, false);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public io.openmessaging.api.SendResult send(io.openmessaging.api.Message message, LocalTransactionExecuter executor, Object arg) {
        TransactionalResult transactionalResult = this.prepare(message);
        TransactionStatus transactionStatus = TransactionStatus.Unknow;
        if (executor != null) {
            try {
                transactionStatus = executor.execute(message, arg);
            }
            catch (Throwable e) {
                LOGGER.warn("exec LocalTransactionExecuter exception", e);
            }
        }
        switch (transactionStatus) {
            case CommitTransaction: {
                transactionalResult.commit();
                return transactionalResult;
            }
            case RollbackTransaction: {
                transactionalResult.rollback();
                throw new OMSRuntimeException("local transaction branch failed ,so transaction rollback");
            }
        }
        ((TransactionalResultImpl)transactionalResult).endTransaction(LocalTransactionState.UNKNOW);
        return transactionalResult;
    }

    @Override
    public <T> io.openmessaging.api.SendResult send(io.openmessaging.api.Message message, GenericLocalTransactionExecuter<T> executor, Object arg) {
        TransactionalResult transactionalResult = this.prepare(message);
        TransactionStatus transactionStatus = TransactionStatus.Unknow;
        if (executor != null) {
            try {
                transactionStatus = executor.execute((GenericMessage)message, arg);
            }
            catch (Throwable e) {
                LOGGER.warn("exec LocalTransactionExecuter exception", e);
            }
        }
        switch (transactionStatus) {
            case CommitTransaction: {
                transactionalResult.commit();
                return transactionalResult;
            }
            case RollbackTransaction: {
                transactionalResult.rollback();
                throw new OMSRuntimeException("local transaction branch failed ,so transaction rollback");
            }
        }
        ((TransactionalResultImpl)transactionalResult).endTransaction(LocalTransactionState.UNKNOW);
        return transactionalResult;
    }

    @Override
    public TransactionalResult prepare(io.openmessaging.api.Message message) {
        this.checkProducerServiceState(this.defaultMQProducer.getDefaultMQProducerImpl());
        this.validateTransactionChecker(message);
        Message msgRMQ = MQUtil.msgConvert(message);
        DevGroupUtils.injectDevGroupOnDemand(msgRMQ);
        msgRMQ.setTopic(NamespaceUtil.wrapNamespace(this.defaultMQProducer.getNamespace(), msgRMQ.getTopic()));
        String txnCheckSecInProperty = msgRMQ.getUserProperty("CHECK_IMMUNITY_TIME_IN_SECONDS");
        if (txnCheckSecInProperty != null) {
            this.checkImmunityTime(Long.valueOf(txnCheckSecInProperty));
        } else if (this.checkImmunityTimeInSeconds != -1L) {
            msgRMQ.putUserProperty("CHECK_IMMUNITY_TIME_IN_SECONDS", Long.toString(this.checkImmunityTimeInSeconds));
        }
        SendMessageContext sendMessageContext = TraceUtils.producerSendStart(msgRMQ).setGroup(this.group).setMsgType(MessageType.Trans_Msg_Half);
        try {
            Validators.checkMessage(msgRMQ, this.defaultMQProducer);
            SendResult sendResultRMQ = null;
            MessageAccessor.putProperty(msgRMQ, "TRAN_MSG", "true");
            MessageAccessor.putProperty(msgRMQ, "PGROUP", this.defaultMQProducer.getProducerGroup());
            sendResultRMQ = this.defaultMQProducerImpl.send(msgRMQ);
            this.checkSendResultRMQ(sendResultRMQ);
            message.setMsgID(sendResultRMQ.getMsgId());
            if (sendResultRMQ.getTransactionId() != null) {
                msgRMQ.putUserProperty("__transactionId__", sendResultRMQ.getTransactionId());
                message.setMsgID(sendResultRMQ.getTransactionId());
            }
            sendMessageContext.setSendResult(sendResultRMQ);
            TransactionalResultImpl transactionalResultImpl = new TransactionalResultImpl(sendResultRMQ, message, msgRMQ);
            return transactionalResultImpl;
        }
        catch (Exception e) {
            sendMessageContext.setException(e);
            throw new OMSRuntimeException(e);
        }
        finally {
            TraceUtils.producerSendEnd(sendMessageContext);
            Metrics.send(sendMessageContext);
        }
    }

    private void validateTransactionChecker(io.openmessaging.api.Message message) {
        MultiTransactionChecker transactionChecker;
        if (this.transactionChecker == null) {
            throw new IllegalStateException("transactionChecker is not registered");
        }
        if (this.transactionChecker instanceof MultiGenericTransactionCheckerImpl && !((MultiGenericTransactionCheckerImpl)(transactionChecker = (MultiGenericTransactionCheckerImpl)this.transactionChecker)).isTransactionCheckerRegistered(message.getTopic())) {
            throw new IllegalStateException(String.format("transactionChecker is not registered for %s", message.getTopic()));
        }
        if (this.transactionChecker instanceof MultiTransactionCheckerImpl) {
            if (StringUtils.isNotEmpty(message.getSystemProperties("__SCHEMA.ID"))) {
                throw new IllegalStateException("generic message can not be sent with transactionChecker is registered");
            }
            transactionChecker = (MultiTransactionCheckerImpl)this.transactionChecker;
            if (!((MultiTransactionCheckerImpl)transactionChecker).isTransactionCheckerRegistered(message.getTopic())) {
                throw new IllegalStateException(String.format("transactionChecker is not registered for %s", message.getTopic()));
            }
        }
    }

    protected void sendTraceData(io.openmessaging.api.Message message, boolean success) {
        try {
            TraceContext context = new TraceContext();
            context.setTraceType(TraceType.Pub);
            context.setGroupName(NamespaceUtil.withoutNamespace(this.defaultMQProducer.getProducerGroup()));
            context.setRegionId("DefaultRegion");
            context.setSuccess(success);
            TraceBean traceBean = new TraceBean();
            traceBean.setTopic(NamespaceUtil.withoutNamespace(message.getTopic()));
            traceBean.setMsgId(message.getMsgID());
            traceBean.setMsgType(MessageType.Trans_msg_Commit);
            ArrayList<TraceBean> beans = new ArrayList<TraceBean>();
            beans.add(traceBean);
            context.setTraceBeans(beans);
            this.traceDispatcher.append(context);
        }
        catch (Exception e) {
            LOGGER.error("transaction commit tracedata failed", e);
        }
    }

    private void checkImmunityTime(long secs) {
        if (secs > MAX_CHECK_IMMUNITY_TIME_IN_SECONDS || secs < MIN_CHECK_IMMUNITY_TIME_IN_SECONDS) {
            throw new IllegalArgumentException(String.format("CHECK_IMMUNITY_TIME_IN_SECONDS %s should between [60, 600]", secs));
        }
    }

    class TransactionalResultImpl
    extends TransactionalResult {
        private SendResult sendResultRMQ;
        private io.openmessaging.api.Message message;
        private Message msgRMQ;

        TransactionalResultImpl(SendResult sendResultRMQ, io.openmessaging.api.Message message, Message msgRMQ) {
            this.sendResultRMQ = sendResultRMQ;
            this.message = message;
            this.msgRMQ = msgRMQ;
            this.setMessageId(sendResultRMQ.getMsgId());
            this.setTopic(sendResultRMQ.getMessageQueue().getTopic());
        }

        @Override
        public void commit() {
            this.endTransaction(LocalTransactionState.COMMIT_MESSAGE);
        }

        @Override
        public void rollback() {
            this.endTransaction(LocalTransactionState.ROLLBACK_MESSAGE);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void endTransaction(LocalTransactionState state) {
            if (state == LocalTransactionState.UNKNOW) {
                return;
            }
            SendMessageContext sendMessageContext = TraceUtils.producerSendStart(this.msgRMQ).setGroup(TransactionProducerImpl.this.group).setCommunicationMode(CommunicationMode.ONEWAY).setMsgType(MessageType.Trans_msg_Commit).setTransactionState(state);
            try {
                TransactionProducerImpl.this.defaultMQProducerImpl.endTransaction(this.sendResultRMQ, state, null);
                if (LocalTransactionState.UNKNOW != state) {
                    TransactionProducerImpl.this.sendTraceData(this.message, LocalTransactionState.COMMIT_MESSAGE.equals((Object)state));
                }
            }
            catch (Exception e) {
                LOGGER.warn("local transaction execute " + (Object)((Object)state) + ", but end broker transaction failed", e);
                sendMessageContext.setException(e);
            }
            finally {
                TraceUtils.producerSendEnd(sendMessageContext);
                Metrics.send(sendMessageContext);
            }
        }
    }

    class LocalFirstTransactionMQProducer
    extends TransactionMQProducer {
        public LocalFirstTransactionMQProducer(String namespace, String producerGroup, RPCHook rpcHook) {
            this.namespace = namespace;
            this.producerGroup = producerGroup;
            this.defaultMQProducerImpl = new SofaMQProducerAbstract.LocalFirstMQProducerImpl(TransactionProducerImpl.this, this, rpcHook);
            TransactionProducerImpl.this.defaultMQProducerImpl = this.defaultMQProducerImpl;
        }
    }
}

