/*
 * Decompiled with CFR 0.152.
 */
package com.ohaotian.plugin.mq.proxy.ext.redismq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.ohaotian.plugin.mq.proxy.CacheStore;
import com.ohaotian.plugin.mq.proxy.ProxyMessage;
import com.ohaotian.plugin.mq.proxy.ext.ProxyMqTransactionChecker;
import com.ohaotian.plugin.mq.proxy.ext.redismq.RedisMqMessageSender;
import com.ohaotian.plugin.mq.proxy.ext.redismq.RedisTransactionListener;
import com.ohaotian.plugin.mq.proxy.status.ProxyTransactionStatus;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class RedisMqTransactionCheckListener
implements RedisTransactionListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private ProxyMqTransactionChecker mqTransactionChecker = new ProxyMqTransactionChecker();
    private final JedisPool jedisPool;
    private String defaultQueue;
    private Timer timer = new Timer();
    private final RedisMqMessageSender messageSender;
    private final ThreadLocal<ObjectMapper> objectMapperThreadLocal = new ThreadLocal<ObjectMapper>(){

        @Override
        protected ObjectMapper initialValue() {
            return new ObjectMapper();
        }
    };

    public void startup() {
        int period = 10000;
        int maxQueue = 16;
        for (int i = 1; i <= maxQueue; ++i) {
            String checkQueue = "QUEUE-" + i;
            if (i == 1) {
                this.defaultQueue = checkQueue;
            }
            this.timer.schedule((TimerTask)new QueueCheckTask(this.jedisPool, checkQueue, i == maxQueue ? null : "QUEUE-" + (i + 1)), period, (long)(period * i));
        }
    }

    public void shtudown() {
        this.timer.cancel();
    }

    public RedisMqTransactionCheckListener(JedisPool jedisPool, RedisMqMessageSender messageSender) {
        this.jedisPool = jedisPool;
        this.messageSender = messageSender;
    }

    public void addCheckList(ProxyMessage message) {
        this.pushCheckList(this.defaultQueue, message);
    }

    public void pushCheckList(String checkQueue, ProxyMessage message) {
        Jedis jedis = null;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("push [" + message + "] to QUEUE[" + checkQueue + "]");
            }
            jedis = this.jedisPool.getResource();
            jedis.lpush(checkQueue, new String[]{this.objectMapperThreadLocal.get().writeValueAsString((Object)message)});
        }
        catch (Exception e) {
            throw new IllegalArgumentException("add check queue[" + checkQueue + "] error", e);
        }
        finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }

    public ProxyMessage popCheckList(String checkQueue) {
        Jedis jedis = null;
        ProxyMessage message = null;
        try {
            jedis = this.jedisPool.getResource();
            String value = jedis.lpop(checkQueue);
            if (value != null) {
                message = (ProxyMessage)this.objectMapperThreadLocal.get().readValue(value, ProxyMessage.class);
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("add check queue[" + checkQueue + "] error", e);
        }
        finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return message;
    }

    public ProxyTransactionStatus checkLocalTransactionState(ProxyMessage msg) {
        return this.mqTransactionChecker.check(msg.getMessageId(), null, msg.getSubject(), msg.getTag());
    }

    public void setCacheStore(CacheStore cacheStore) {
        this.mqTransactionChecker.setCacheStore(cacheStore);
    }

    class QueueCheckTask
    extends TimerTask {
        private final JedisPool jedisPool;
        private final String checkQueue;
        private final String nextQueue;

        QueueCheckTask(JedisPool jedisPool, String checkQueue, String nextQueue) {
            this.jedisPool = jedisPool;
            this.checkQueue = checkQueue;
            this.nextQueue = nextQueue;
        }

        public void run() {
            ProxyMessage message;
            for (int i = 0; i < 1000 && (message = RedisMqTransactionCheckListener.this.popCheckList(this.checkQueue)) != null; ++i) {
                try {
                    ProxyTransactionStatus transactionStatus = RedisMqTransactionCheckListener.this.checkLocalTransactionState(message);
                    if (RedisMqTransactionCheckListener.this.logger.isDebugEnabled()) {
                        RedisMqTransactionCheckListener.this.logger.debug("Check[" + message + "] status[" + transactionStatus + "]");
                    }
                    if (transactionStatus == ProxyTransactionStatus.UNKNOW) {
                        if (this.nextQueue != null) {
                            RedisMqTransactionCheckListener.this.pushCheckList(this.nextQueue, message);
                            continue;
                        }
                        RedisMqTransactionCheckListener.this.logger.error("max check reached for message[" + message + "]");
                        continue;
                    }
                    if (transactionStatus != ProxyTransactionStatus.COMMIT) continue;
                    RedisMqTransactionCheckListener.this.messageSender.send(message);
                    continue;
                }
                catch (Throwable e) {
                    if (this.nextQueue != null) {
                        RedisMqTransactionCheckListener.this.pushCheckList(this.nextQueue, message);
                        continue;
                    }
                    RedisMqTransactionCheckListener.this.logger.error("max check reached for message[" + message + "]");
                }
            }
        }
    }
}

