/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.sofamq.client.trace.dispatch.impl;

import com.alipay.sofa.sofamq.auth.ClientRPCHook;
import com.alipay.sofa.sofamq.auth.authority.SessionCredentials;
import com.alipay.sofa.sofamq.client.trace.common.TraceContext;
import com.alipay.sofa.sofamq.client.trace.common.TraceDataEncoder;
import com.alipay.sofa.sofamq.client.trace.common.TraceTransferBean;
import com.alipay.sofa.sofamq.client.trace.dispatch.NameServerAddressSetter;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.common.ThreadLocalIndex;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.exception.MQClientException;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.log.ClientLogger;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.DefaultMQProducer;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.MessageQueueSelector;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.SendCallback;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.client.producer.SendResult;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.ThreadFactoryImpl;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.Message;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.message.MessageQueue;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.common.protocol.NamespaceUtil;
import com.alipay.sofa.sofamq.org.shade.apache.rocketmq.logging.InternalLogger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class InnerTraceProducer {
    private static final InternalLogger CLIENT_LOG = ClientLogger.getLog();
    private static final AtomicInteger instanceNum = new AtomicInteger(0);
    private final int instanceId = instanceNum.getAndIncrement();
    private final int queueSize;
    private final int batchSize;
    private final int threadNum = Math.max(8, Runtime.getRuntime().availableProcessors());
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecuter;
    private final ScheduledExecutorService scheduledExecutorService;
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<TraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private NameServerAddressSetter nameserverAddressSetter;
    private String traceNamesrvAddr;
    private static Map<String, Map<String, InnerTraceProducer>> dispatcherTable = new ConcurrentHashMap<String, Map<String, InnerTraceProducer>>();
    private static Map<String, InnerTraceProducer> innerTraceProducers = new ConcurrentHashMap<String, InnerTraceProducer>();
    private static Map<String, AtomicBoolean> isStarted = new ConcurrentHashMap<String, AtomicBoolean>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static InnerTraceProducer getTraceDispatcherProducer(Properties properties, NameServerAddressSetter nameserverAddressSetter) {
        boolean innerProducerSingleton = Boolean.parseBoolean(properties.getProperty("TraceProducerSingleton", "true"));
        if (!innerProducerSingleton) {
            return new InnerTraceProducer(properties, nameserverAddressSetter);
        }
        Class<InnerTraceProducer> clazz = InnerTraceProducer.class;
        synchronized (InnerTraceProducer.class) {
            String instanceName = properties.getProperty("INSTANCE_NAME");
            if (innerTraceProducers.containsKey(instanceName)) {
                // ** MonitorExit[var3_3] (shouldn't be in output)
                return innerTraceProducers.get(instanceName);
            }
            innerTraceProducers.put(instanceName, new InnerTraceProducer(properties, nameserverAddressSetter));
            dispatcherTable.put(instanceName, new ConcurrentHashMap());
            isStarted.put(instanceName, new AtomicBoolean());
            // ** MonitorExit[var3_3] (shouldn't be in output)
            return innerTraceProducers.get(instanceName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static InnerTraceProducer getTraceDispatcherProducer(Properties properties, SessionCredentials sessionCredentials, NameServerAddressSetter nameserverAddressSetter) {
        boolean innerProducerSingleton = Boolean.parseBoolean(properties.getProperty("TraceProducerSingleton", "true"));
        if (!innerProducerSingleton) {
            return new InnerTraceProducer(properties, sessionCredentials, nameserverAddressSetter);
        }
        Class<InnerTraceProducer> clazz = InnerTraceProducer.class;
        synchronized (InnerTraceProducer.class) {
            String instanceName = properties.getProperty("INSTANCE_NAME");
            if (innerTraceProducers.containsKey(instanceName)) {
                // ** MonitorExit[var4_4] (shouldn't be in output)
                return innerTraceProducers.get(instanceName);
            }
            innerTraceProducers.put(instanceName, new InnerTraceProducer(properties, sessionCredentials, nameserverAddressSetter));
            dispatcherTable.put(instanceName, new ConcurrentHashMap());
            isStarted.put(instanceName, new AtomicBoolean());
            // ** MonitorExit[var4_4] (shouldn't be in output)
            return innerTraceProducers.get(instanceName);
        }
    }

    public static void registerTraceDispatcher(String dispatcherId, InnerTraceProducer producer, boolean isSingleton) throws MQClientException {
        dispatcherTable.get(producer.getInstanceName()).put(dispatcherId, producer);
        if (isSingleton) {
            if (producer != null && isStarted.get(producer.getInstanceName()).compareAndSet(false, true)) {
                producer.start();
            }
        } else if (producer != null) {
            producer.start();
        }
    }

    public static void unregisterTraceDispatcher(String dispatcherId, InnerTraceProducer producer, boolean isSingleton) {
        dispatcherTable.remove(dispatcherId);
        if (isSingleton) {
            if (dispatcherTable.isEmpty() && producer != null && isStarted.get(producer.getInstanceName()).get()) {
                innerTraceProducers.remove(producer.getInstanceName());
                isStarted.get(producer.getInstanceName()).set(false);
                producer.shutdown();
            }
        } else {
            producer.shutdown();
        }
    }

    public boolean append(TraceContext ctx) {
        boolean result = this.traceContextQueue.offer(ctx);
        if (!result) {
            CLIENT_LOG.info("buffer full" + this.discardCount.incrementAndGet() + " ,context is " + ctx);
        }
        return result;
    }

    public InnerTraceProducer(Properties properties, NameServerAddressSetter nameserverAddressSetter) {
        int queueSize = Integer.parseInt(properties.getProperty("ASYNC_BUFFER_SIZE", "2048"));
        this.queueSize = queueSize = 1 << 32 - Integer.numberOfLeadingZeros(queueSize - 1);
        this.batchSize = Integer.parseInt(properties.getProperty("MAX_BATCH_NUM", "1"));
        this.discardCount = new AtomicLong(0L);
        this.traceContextQueue = new ArrayBlockingQueue(1024);
        this.appenderQueue = new ArrayBlockingQueue(this.queueSize);
        this.traceExecuter = new ThreadPoolExecutor(this.threadNum, this.threadNum, 60000L, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_" + this.instanceId + "_"));
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "OnsTrace-UpdateNameServerThread" + InnerTraceProducer.this.instanceId);
            }
        });
        this.nameserverAddressSetter = nameserverAddressSetter;
        this.traceProducer = this.createProducer(properties, nameserverAddressSetter);
    }

    public InnerTraceProducer(Properties properties, SessionCredentials sessionCredentials, NameServerAddressSetter nameserverAddressSetter) {
        int queueSize = Integer.parseInt(properties.getProperty("ASYNC_BUFFER_SIZE", "2048"));
        this.queueSize = queueSize = 1 << 32 - Integer.numberOfLeadingZeros(queueSize - 1);
        this.batchSize = Integer.parseInt(properties.getProperty("MAX_BATCH_NUM", "1"));
        this.discardCount = new AtomicLong(0L);
        this.traceContextQueue = new ArrayBlockingQueue(1024);
        this.appenderQueue = new ArrayBlockingQueue(queueSize);
        this.traceExecuter = new ThreadPoolExecutor(this.threadNum, this.threadNum, 60000L, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_" + this.instanceId + "_"));
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "OnsTrace-UpdateNameServerThread" + InnerTraceProducer.this.instanceId);
            }
        });
        this.nameserverAddressSetter = nameserverAddressSetter;
        this.traceProducer = this.createProducer(properties, sessionCredentials, nameserverAddressSetter);
    }

    public String getInstanceName() {
        return this.traceProducer.getInstanceName();
    }

    private DefaultMQProducer createProducer(Properties properties, NameServerAddressSetter nameserverAddressSetter) {
        SessionCredentials sessionCredentials = new SessionCredentials();
        Properties sessionProperties = new Properties();
        String accessKey = properties.getProperty("ACCESS_KEY");
        String secretKey = properties.getProperty("SECRET_KEY");
        sessionProperties.put("ACCESS_KEY", accessKey);
        sessionProperties.put("SECRET_KEY", secretKey);
        sessionCredentials.updateContent(sessionProperties);
        DefaultMQProducer producer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
        producer.setProducerGroup(accessKey + "_INNER_TRACE_PRODUCER");
        producer.setSendMsgTimeout(5000);
        producer.setInstanceName(properties.getProperty("INSTANCE_NAME", String.valueOf(System.currentTimeMillis())));
        producer.setNamesrvAddr(nameserverAddressSetter.getNewNameServerAddress());
        producer.setVipChannelEnabled(false);
        int maxSize = Integer.parseInt(properties.getProperty("MAX_MSG_SIZE", "128000"));
        producer.setMaxMessageSize(maxSize - 10000);
        return producer;
    }

    private DefaultMQProducer createProducer(Properties properties, SessionCredentials sessionCredentials, NameServerAddressSetter nameserverAddressSetter) {
        String accessKey = properties.getProperty("ACCESS_KEY");
        DefaultMQProducer producer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
        producer.setProducerGroup(accessKey.replace('.', '-') + "_INNER_TRACE_PRODUCER");
        producer.setSendMsgTimeout(5000);
        producer.setInstanceName(properties.getProperty("INSTANCE_NAME", String.valueOf(System.currentTimeMillis())));
        producer.setNamesrvAddr(nameserverAddressSetter.getNewNameServerAddress());
        producer.setVipChannelEnabled(false);
        int maxSize = Integer.parseInt(properties.getProperty("MAX_MSG_SIZE", "128000"));
        producer.setMaxMessageSize(maxSize - 10000);
        return producer;
    }

    private void start() throws MQClientException {
        this.worker = new ThreadFactoryImpl("MQ-AsyncArrayDispatcher-Thread" + this.instanceId, true).newThread(new AsyncRunnable());
        this.worker.start();
        this.traceProducer.start();
        this.registerShutDownHook();
        this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    String newAddrs = InnerTraceProducer.this.nameserverAddressSetter.getNewNameServerAddress();
                    if (newAddrs != null && !newAddrs.equals(InnerTraceProducer.this.traceNamesrvAddr)) {
                        CLIENT_LOG.info("trace producer update name server address , old is {}, new is {}", (Object)InnerTraceProducer.this.traceNamesrvAddr, (Object)newAddrs);
                        InnerTraceProducer.this.traceNamesrvAddr = newAddrs;
                        InnerTraceProducer.this.traceProducer.setNamesrvAddr(newAddrs);
                        InnerTraceProducer.this.traceProducer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().updateNameServerAddressList(newAddrs);
                    }
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }, 30000L, 30000L, TimeUnit.MILLISECONDS);
    }

    private void shutdown() {
        this.flush();
        this.stopped = true;
        this.traceExecuter.shutdown();
        this.removeShutdownHook();
        this.scheduledExecutorService.shutdown();
        this.traceProducer.shutdown();
    }

    private void flush() {
        long end = System.currentTimeMillis() + 500L;
        while (this.traceContextQueue.size() > 0 || this.appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
            try {
                this.flushTraceContext();
            }
            catch (Throwable throwable) {}
        }
        CLIENT_LOG.info("------end trace send " + this.traceContextQueue.size() + "   " + this.appenderQueue.size());
    }

    private void registerShutDownHook() {
        if (this.shutDownHook == null) {
            this.shutDownHook = new ThreadFactoryImpl("ShutdownHookMQTrace").newThread(new Runnable(){
                private volatile boolean hasShutdown = false;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    4 var1_1 = this;
                    synchronized (var1_1) {
                        if (!this.hasShutdown) {
                            try {
                                InnerTraceProducer.this.flush();
                            }
                            catch (Throwable e) {
                                CLIENT_LOG.error("system mqtrace hook shutdown failed ,maybe loss some trace data");
                            }
                        }
                    }
                }
            });
            Runtime.getRuntime().addShutdownHook(this.shutDownHook);
        }
    }

    private void removeShutdownHook() {
        if (this.shutDownHook != null) {
            Runtime.getRuntime().removeShutdownHook(this.shutDownHook);
        }
    }

    private void flushTraceContext() {
        ArrayList<TraceContext> contexts = new ArrayList<TraceContext>(this.batchSize);
        for (int i = 0; i < this.batchSize; ++i) {
            TraceContext context = null;
            try {
                context = this.traceContextQueue.poll(5L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (context == null) break;
            contexts.add(context);
        }
        if (contexts.size() > 0) {
            AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
            this.traceExecuter.submit(request);
        }
    }

    public static Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
        HashSet<String> brokerSet = new HashSet<String>();
        String realTopic = NamespaceUtil.wrapNamespace(producer.getDefaultMQProducer().getNamespace(), topic);
        TopicPublishInfo topicPublishInfo = (TopicPublishInfo)producer.getTopicPublishInfoTable().get(realTopic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            producer.getTopicPublishInfoTable().putIfAbsent(realTopic, new TopicPublishInfo());
            producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(realTopic);
            topicPublishInfo = (TopicPublishInfo)producer.getTopicPublishInfoTable().get(realTopic);
        }
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
                brokerSet.add(queue.getBrokerName());
            }
        }
        return brokerSet;
    }

    public static Set<String> tryGetMessageQueueBrokerSet(DefaultMQPushConsumerImpl consumer, String topic) {
        HashSet<String> brokerSet = new HashSet<String>();
        try {
            String realTopic = NamespaceUtil.wrapNamespace(consumer.getDefaultMQPushConsumer().getNamespace(), topic);
            Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(realTopic);
            for (MessageQueue queue : messageQueues) {
                brokerSet.add(queue.getBrokerName());
            }
        }
        catch (MQClientException e) {
            CLIENT_LOG.info("fetch message queue failed, the topic is {}", (Object)topic);
        }
        return brokerSet;
    }

    public static Set<String> tryGetMessageQueueBrokerSet(DefaultMQPullConsumerImpl consumer, String topic) {
        HashSet<String> brokerSet = new HashSet<String>();
        try {
            String realTopic = NamespaceUtil.wrapNamespace(consumer.getDefaultMQPullConsumer().getNamespace(), topic);
            Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(realTopic);
            for (MessageQueue queue : messageQueues) {
                brokerSet.add(queue.getBrokerName());
            }
        }
        catch (MQClientException e) {
            CLIENT_LOG.info("fetch message queue failed, the topic is {}", (Object)topic);
        }
        return brokerSet;
    }

    class AsyncAppenderRequest
    implements Runnable {
        List<TraceContext> contextList;

        public AsyncAppenderRequest(List<TraceContext> contextList) {
            this.contextList = contextList != null ? contextList : new ArrayList<TraceContext>(1);
        }

        @Override
        public void run() {
            this.sendTraceData(this.contextList);
        }

        public void sendTraceData(List<TraceContext> contextList) {
            HashMap transBeanMap = new HashMap(16);
            String currentRegionId = null;
            for (TraceContext traceContext : contextList) {
                currentRegionId = traceContext.getRegionId();
                if (currentRegionId == null || traceContext.getTraceBeans().isEmpty()) continue;
                String topic = traceContext.getTraceBeans().get(0).getTopic();
                String key = topic + '\u0001' + currentRegionId;
                ArrayList<TraceTransferBean> transBeanList = (ArrayList<TraceTransferBean>)transBeanMap.get(key);
                if (transBeanList == null) {
                    transBeanList = new ArrayList<TraceTransferBean>();
                    transBeanMap.put(key, transBeanList);
                }
                TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(traceContext);
                traceData.setBrokerSet(traceContext.getBrokerSet());
                transBeanList.add(traceData);
            }
            for (Map.Entry entry : transBeanMap.entrySet()) {
                String[] key = ((String)entry.getKey()).split(String.valueOf('\u0001'));
                this.flushData((List)entry.getValue(), key[0], key[1]);
            }
        }

        private void flushData(List<TraceTransferBean> transBeanList, String topic, String currentRegionId) {
            if (transBeanList.size() == 0) {
                return;
            }
            StringBuilder buffer = new StringBuilder(1024);
            int count = 0;
            HashSet<String> keySet = new HashSet<String>();
            HashSet<String> brokerSet = new HashSet();
            for (TraceTransferBean bean : transBeanList) {
                brokerSet = bean.getBrokerSet();
                keySet.addAll(bean.getTransKey());
                buffer.append(bean.getTransData());
                ++count;
                if (buffer.length() < InnerTraceProducer.this.traceProducer.getMaxMessageSize()) continue;
                this.sendTraceDataByMQ(keySet, buffer.toString(), topic, currentRegionId, brokerSet);
                buffer.delete(0, buffer.length());
                keySet.clear();
                count = 0;
            }
            if (count > 0) {
                this.sendTraceDataByMQ(keySet, buffer.toString(), topic, currentRegionId, brokerSet);
            }
            transBeanList.clear();
        }

        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String currentRegionId, Set<String> dataBrokerSet) {
            String topic = "RMQ_SYS_TRACE_TOPIC";
            Message message = new Message(topic, data.getBytes());
            message.setKeys(keySet);
            try {
                Set<String> traceBrokerSet = InnerTraceProducer.tryGetMessageQueueBrokerSet(InnerTraceProducer.this.traceProducer.getDefaultMQProducerImpl(), topic);
                dataBrokerSet.retainAll(traceBrokerSet);
                SendCallback callback = new SendCallback(){

                    @Override
                    public void onSuccess(SendResult sendResult) {
                    }

                    @Override
                    public void onException(Throwable e) {
                        CLIENT_LOG.info("send trace data ,the traceData is " + data);
                    }
                };
                if (dataBrokerSet.isEmpty()) {
                    InnerTraceProducer.this.traceProducer.send(message, callback, 5000L);
                } else {
                    InnerTraceProducer.this.traceProducer.send(message, new MessageQueueSelector(){

                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Set brokerSet = (Set)arg;
                            ArrayList<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
                            for (MessageQueue queue : mqs) {
                                if (!brokerSet.contains(queue.getBrokerName())) continue;
                                filterMqs.add(queue);
                            }
                            int index = InnerTraceProducer.this.sendWhichQueue.getAndIncrement();
                            int pos = Math.abs(index) % filterMqs.size();
                            if (pos < 0) {
                                pos = 0;
                            }
                            return (MessageQueue)filterMqs.get(pos);
                        }
                    }, dataBrokerSet, callback);
                }
            }
            catch (Exception e) {
                CLIENT_LOG.info("send trace data,the traceData is" + data);
            }
        }
    }

    class AsyncRunnable
    implements Runnable {
        private boolean stopped;

        AsyncRunnable() {
        }

        @Override
        public void run() {
            while (!this.stopped) {
                try {
                    InnerTraceProducer.this.flushTraceContext();
                    if (!InnerTraceProducer.this.stopped) continue;
                    this.stopped = true;
                }
                catch (Throwable throwable) {}
            }
        }
    }
}

