package com.taobao.notify.client.impl;

import com.taobao.common.store.Store;
import com.taobao.common.store.journal.JournalStore;
import com.taobao.notify.client.NotifyClient;
import com.taobao.notify.client.manager.NotifyGroupManager;
import com.taobao.notify.codec.Deserializer;
import com.taobao.notify.codec.Serializer;
import com.taobao.notify.codec.impl.Hessian1Deserializer;
import com.taobao.notify.codec.impl.Hessian1Serializer;
import com.taobao.notify.common.StatConstants;
import com.taobao.notify.config.NotifyClientConfig;
import com.taobao.notify.message.Message;
import com.taobao.notify.remotingclient.SendResult;
import com.taobao.notify.remotingclient.SendResultType;
import com.taobao.notify.remotingclient.impl.LRUSoftMessageCache;
import com.taobao.notify.remotingclient.impl.ReliableAsynTraverseMessageTask;
import com.taobao.notify.utils.LoggerPrefix;
import com.taobao.notify.utils.UniqId;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/taobao/notify/client/impl/ReliableAsynSendManager.class */
public class ReliableAsynSendManager {
    private static final String LogPrefix = LoggerPrefix.makeLogPrefix(ReliableAsynSendManager.class);
    static final Logger logger = Logger.getLogger(ReliableAsynSendManager.class);
    private ExecutorService reliableAsynSendMessageWorkTP;
    private Store store;
    private ReliableAsynTraverseMessageTask reliableAsynTraverseMessageTask;
    private final NotifyClient notifyClient;
    private String localMessagePath;
    private String storeName;
    private boolean isForceToDisk;
    private long traverseKeepAliveTime;
    private int traverseCorePoolSize;
    private int traverseMaxPoolSize;
    private int traverseMaxQueueSize;
    private int maxStoreSize;
    private LRUSoftMessageCache cache;
    private long maxStoreFileCount;
    private NotifyGroupManager notifyGroupManager;
    private AtomicInteger messageTotalCount = new AtomicInteger(0);
    private AtomicInteger remainCommitMessageCount = new AtomicInteger(0);
    private volatile boolean isInit = false;
    private volatile boolean isSuspendBeforeInit = false;
    private final Serializer serializer = new Hessian1Serializer();
    private final Deserializer deserializer = new Hessian1Deserializer();

    public ReliableAsynSendManager(NotifyClient notifyClient, NotifyGroupManager notifyGroupManager, NotifyClientConfig notifyClientConfig) {
        this.traverseKeepAliveTime = 60L;
        this.traverseCorePoolSize = 10;
        this.traverseMaxPoolSize = 20;
        this.traverseMaxQueueSize = 10000;
        this.maxStoreSize = Integer.MAX_VALUE;
        this.cache = null;
        this.maxStoreFileCount = Long.MAX_VALUE;
        this.notifyClient = notifyClient;
        this.notifyGroupManager = notifyGroupManager;
        this.traverseCorePoolSize = notifyClientConfig.getReliableAsynSendMessageTPConfig().getCorePoolSize();
        this.traverseMaxPoolSize = notifyClientConfig.getReliableAsynSendMessageTPConfig().getMaxPoolSize();
        this.traverseMaxQueueSize = notifyClientConfig.getReliableAsynSendMessageTPConfig().getMaxQueueSize();
        this.traverseKeepAliveTime = notifyClientConfig.getReliableAsynSendMessageTPConfig().getKeepAliveTime();
        this.localMessagePath = notifyClientConfig.getLocalMessagePath();
        this.storeName = notifyClientConfig.getStoreName();
        this.isForceToDisk = notifyClientConfig.isForceToDisk();
        this.maxStoreSize = notifyClientConfig.getMaxStoreSize();
        this.maxStoreFileCount = notifyClientConfig.getMaxStoreFileCount();
        if (notifyClientConfig.isEnableReliableAsynSendCache()) {
            this.cache = new LRUSoftMessageCache(notifyClientConfig.getReliableAsynSendCacheLowWaterMark(), notifyClientConfig.getReliableAsynSendCacheHighWaterMark());
        }
    }

    public void setNotifyClientConfig(NotifyClientConfig notifyClientConfig) {
        if (!this.localMessagePath.equalsIgnoreCase(StatConstants.DEFAULT_CLIENT_STORE4J_PATH)) {
            logger.warn(LogPrefix + " 已经设置过可靠异步的存储路径无法重设 路径Path:[" + notifyClientConfig.getLocalMessagePath() + "]");
            return;
        }
        this.traverseCorePoolSize = notifyClientConfig.getReliableAsynSendMessageTPConfig().getCorePoolSize();
        this.traverseMaxPoolSize = notifyClientConfig.getReliableAsynSendMessageTPConfig().getMaxPoolSize();
        this.traverseMaxQueueSize = notifyClientConfig.getReliableAsynSendMessageTPConfig().getMaxQueueSize();
        this.traverseKeepAliveTime = notifyClientConfig.getReliableAsynSendMessageTPConfig().getKeepAliveTime();
        this.localMessagePath = notifyClientConfig.getLocalMessagePath();
        this.storeName = notifyClientConfig.getStoreName();
        this.isForceToDisk = notifyClientConfig.isForceToDisk();
        this.maxStoreFileCount = notifyClientConfig.getMaxStoreFileCount();
        this.maxStoreSize = notifyClientConfig.getMaxStoreSize();
        this.isSuspendBeforeInit = notifyClientConfig.isReliableAsynSendTaskStatus();
        if (notifyClientConfig.isPreInitializeReliableAsynSendManager()) {
            close();
            init();
        }
        if (!notifyClientConfig.isEnableReliableAsynSendCache()) {
            this.cache = null;
        } else if (null == this.cache) {
            this.cache = new LRUSoftMessageCache(notifyClientConfig.getReliableAsynSendCacheLowWaterMark(), notifyClientConfig.getReliableAsynSendCacheHighWaterMark());
        } else {
            this.cache.setHighWaterMark(notifyClientConfig.getReliableAsynSendCacheHighWaterMark());
            this.cache.setLowWaterMark(notifyClientConfig.getReliableAsynSendCacheLowWaterMark());
        }
        if (this.isInit) {
            this.reliableAsynTraverseMessageTask.setCorePoolSize(this.traverseCorePoolSize);
            this.reliableAsynTraverseMessageTask.setMaxPoolSize(this.traverseMaxPoolSize);
            this.reliableAsynTraverseMessageTask.setMaxQueueSize(this.traverseMaxQueueSize);
            this.reliableAsynTraverseMessageTask.setKeepAliveTime(this.traverseKeepAliveTime);
        }
    }

    public synchronized boolean init() {
        if (this.isInit) {
            return this.isInit;
        }
        try {
            File file = new File(this.localMessagePath);
            if (!file.exists()) {
                file.mkdirs();
            }
            try {
                this.store = new JournalStore(this.localMessagePath, "NotifyClientMessageStore_" + this.storeName, this.isForceToDisk, false, false);
                this.store.setMaxFileCount(this.maxStoreFileCount);
                this.reliableAsynTraverseMessageTask = new ReliableAsynTraverseMessageTask(this.notifyClient, this.notifyGroupManager, this, this.traverseCorePoolSize, this.traverseMaxPoolSize, this.traverseKeepAliveTime, this.traverseMaxQueueSize);
                this.reliableAsynTraverseMessageTask.setSuspend(this.isSuspendBeforeInit);
                this.reliableAsynSendMessageWorkTP = Executors.newSingleThreadExecutor();
                this.isInit = true;
                initMessageCount();
                this.reliableAsynSendMessageWorkTP.execute(this.reliableAsynTraverseMessageTask);
                return this.isInit;
            } catch (IOException e) {
                logger.error(LogPrefix + "初始化Store4j错误，路径为：" + this.localMessagePath, e);
                return this.isInit;
            }
        } catch (Exception e2) {
            logger.error("创建可靠异步目录不成功");
            throw new RuntimeException("创建可靠异步目录不成功");
        }
    }

    private void initMessageCount() {
        int i = 0;
        try {
            try {
                Iterator<byte[]> iterator = getIterator();
                while (iterator.hasNext()) {
                    MessageInStore4j messageInStore4j = getMessageInStore4j(iterator.next());
                    if (null != messageInStore4j && messageInStore4j.isEnabledSend()) {
                        i++;
                    }
                }
            } catch (IOException e) {
                logger.error(LogPrefix + "初始化消息计数器失败! 本次数据只记录自启动以来的消息条数!", e);
                this.remainCommitMessageCount.set(i);
                this.messageTotalCount.set(i);
            }
        } finally {
            this.remainCommitMessageCount.set(i);
            this.messageTotalCount.set(i);
        }
    }

    public boolean isInit() {
        return this.isInit;
    }

    public synchronized void close() {
        if (this.isInit) {
            this.isInit = false;
            this.reliableAsynTraverseMessageTask.setRun(false);
            this.reliableAsynSendMessageWorkTP.shutdown();
            this.reliableAsynTraverseMessageTask = null;
            this.reliableAsynSendMessageWorkTP = null;
            try {
                if (null != this.store) {
                    this.store.close();
                }
            } catch (IOException e) {
                logger.error(LogPrefix + "关闭Store4j文件失败", e);
            }
            this.store = null;
        }
    }

    public SendResult addMessage(Message message, boolean z) {
        SendResult sendResult = new SendResult();
        if (!init()) {
            sendResult.setSuccess(false);
            sendResult.setErrorMessage("添加Message到Store4j错误，Store4j初始化错误");
            sendResult.setRuntimeException(new RuntimeException("Store4j初始化错误"));
            sendResult.setSendResultType(SendResultType.ERROR);
            logger.error(LogPrefix + "添加Message到Store4j错误，Store4j初始化错误");
            return sendResult;
        }
        sendResult.setMessageId(UniqId.getInstance().bytes2string(message.getMessageId()));
        sendResult.setSuccess(true);
        sendResult.setSendResultType(SendResultType.SUCCESS);
        if (this.store.size() >= this.maxStoreSize) {
            sendResult.setSuccess(false);
            sendResult.setErrorMessage("Store4j中存储的消息量超过阀值");
            sendResult.setRuntimeException(new RuntimeException("Store4j中存储的消息量超过阀值"));
            sendResult.setSendResultType(SendResultType.ERROR);
            logger.error(LogPrefix + "Store4j中存储的消息量超过阀值");
            return sendResult;
        }
        try {
            MessageInStore4j messageInStore4j = new MessageInStore4j(message, z);
            this.store.add(message.getMessageId(), this.serializer.encodeObject(messageInStore4j));
            if (null != this.cache) {
                this.cache.put(message.getMessageId(), messageInStore4j);
            }
            if (z) {
                this.messageTotalCount.incrementAndGet();
                this.remainCommitMessageCount.incrementAndGet();
            }
        } catch (IOException e) {
            sendResult.setSuccess(false);
            sendResult.setSendResultType(SendResultType.EXCEPTION);
            sendResult.setErrorMessage("添加Message到Store4j错误");
            sendResult.setRuntimeException(new RuntimeException("消息存储在本地失败", e));
            logger.error(LogPrefix + "添加Message到Store4j错误，MessageID：" + sendResult.getMessageId(), e);
        }
        return sendResult;
    }

    public SendResult commitMessage(Message message, SendResult sendResult) {
        if (!init()) {
            sendResult.setSuccess(false);
            sendResult.setErrorMessage("更新Message到Store4j错误，Store4j初始化错误");
            sendResult.setRuntimeException(new RuntimeException("Store4j初始化错误"));
            logger.error(LogPrefix + "更新Message到Store4j错误，Store4j初始化错误");
            return sendResult;
        }
        try {
            MessageInStore4j messageInStore4j = null;
            if (null != this.cache) {
                messageInStore4j = this.cache.get(message.getMessageId());
            }
            if (null != messageInStore4j) {
                messageInStore4j.setEnabledSend(true);
            } else {
                messageInStore4j = new MessageInStore4j(message, true);
                if (null != this.cache) {
                    this.cache.put(message.getMessageId(), messageInStore4j);
                }
            }
            this.store.update(message.getMessageId(), this.serializer.encodeObject(messageInStore4j));
            this.messageTotalCount.incrementAndGet();
            this.remainCommitMessageCount.incrementAndGet();
        } catch (IOException e) {
            sendResult.setSuccess(false);
            sendResult.setErrorMessage("更新Message到Store4j错误");
            sendResult.setRuntimeException(new RuntimeException("更新在本地的Message失败", e));
            logger.error(LogPrefix + "更新Message到Store4j错误，MessageID：" + sendResult.getMessageId(), e);
        }
        return sendResult;
    }

    public SendResult rollbackMessage(Message message, SendResult sendResult) {
        if (!init()) {
            sendResult.setSuccess(false);
            sendResult.setErrorMessage("回滚Store4j中消息错误，Store4j初始化错误");
            sendResult.setRuntimeException(new RuntimeException("Store4j初始化错误"));
            logger.error(LogPrefix + "回滚Store4j中消息错误，Store4j初始化错误");
            return sendResult;
        }
        try {
            this.store.remove(message.getMessageId());
            if (null != this.cache) {
                this.cache.remove(message.getMessageId());
            }
        } catch (IOException e) {
            sendResult.setSuccess(false);
            sendResult.setErrorMessage("回滚消息时，删除Store4j中的Message错误");
            sendResult.setRuntimeException(new RuntimeException("回滚消息时，删除存储在本地的Message失败", e));
            logger.error(LogPrefix + "回滚消息时，删除Store4j中的Message错误，MessageID：" + sendResult.getMessageId(), e);
        }
        return sendResult;
    }

    public void removeMessage(byte[] bArr) {
        if (init()) {
            try {
                if (!this.store.remove(bArr)) {
                    logger.error("无法删除数据：" + bArr);
                }
                if (null != this.cache) {
                    this.cache.remove(bArr);
                }
                this.remainCommitMessageCount.decrementAndGet();
            } catch (IOException e) {
                logger.error(LogPrefix + "删除消息时，删除Store4j中的Message错误，MessageID：" + UniqId.getInstance().bytes2string(bArr), e);
            }
        }
    }

    public MessageInStore4j getMessageInStore4j(byte[] bArr) {
        if (!init()) {
            return null;
        }
        MessageInStore4j messageInStore4j = null;
        if (null != this.cache) {
            messageInStore4j = this.cache.get(bArr);
        }
        if (null != messageInStore4j) {
            return messageInStore4j;
        }
        try {
            byte[] bArr2 = this.store.get(bArr);
            if (null == bArr2) {
                logger.warn(LogPrefix + "获取一个空的messageId: " + UniqId.getInstance().bytes2string(bArr));
                return null;
            }
            try {
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                try {
                    Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
                    MessageInStore4j messageInStore4j2 = (MessageInStore4j) this.deserializer.decodeObject(bArr2);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    if (null == messageInStore4j2) {
                        logger.warn(LogPrefix + "反序列化MessageInStore4j有误，null == messageInStore4j  MsgId:[" + bArr + "]");
                        return null;
                    }
                    if (null != this.cache) {
                        this.cache.put(bArr, messageInStore4j2);
                    }
                    return messageInStore4j2;
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    throw th;
                }
            } catch (Exception e) {
                logger.warn(LogPrefix + "反序列化MessageInStore4j有误  MsgId:[" + bArr + "]", e);
                return null;
            }
        } catch (Throwable th2) {
            logger.error(LogPrefix + "获取Store4j中消息错误", th2);
            return null;
        }
    }

    public Iterator<byte[]> getIterator() throws IOException {
        if (init()) {
            return this.store.iterator();
        }
        return null;
    }

    public void suspendRaliableAsynTask() {
        if (this.isInit) {
            this.reliableAsynTraverseMessageTask.setSuspend(true);
        }
        this.isSuspendBeforeInit = true;
    }

    public void resumeReliableAsynTask() {
        if (this.isInit) {
            this.reliableAsynTraverseMessageTask.setSuspend(false);
        }
        this.isSuspendBeforeInit = false;
    }

    public boolean isSuspendRaliableAsynTask() {
        return this.isInit ? this.reliableAsynTraverseMessageTask.isSuspend() : this.isSuspendBeforeInit;
    }

    public int getMessageTotalCount() {
        return this.messageTotalCount.get();
    }

    public int getRemainCommitMessageCount() {
        return this.remainCommitMessageCount.get();
    }

    public int storeSize() {
        if (this.isInit) {
            return this.store.size();
        }
        return 0;
    }

    public Store getStore() {
        return this.store;
    }

    public void setStore(Store store) {
        this.store = store;
    }
}
