/*
 * Decompiled with CFR 0.152.
 */
package com.tongtech.tmqi.jmsclient;

import com.tongtech.jms.FileMessage;
import com.tongtech.log.Logger;
import com.tongtech.log.LoggerFactory;
import com.tongtech.remote.protocol.command.MessageId;
import com.tongtech.tmqi.AdministeredObject;
import com.tongtech.tmqi.Destination;
import com.tongtech.tmqi.io.ReadOnlyPacket;
import com.tongtech.tmqi.io.SysMessageID;
import com.tongtech.tmqi.jmsclient.Consumer;
import com.tongtech.tmqi.jmsclient.ExceptionHandler;
import com.tongtech.tmqi.jmsclient.FileMessageStatus;
import com.tongtech.tmqi.jmsclient.MessageImpl;
import com.tongtech.tmqi.jmsclient.ReceiveQueue;
import com.tongtech.tmqi.jmsclient.SessionImpl;
import com.tongtech.tmqi.jmsclient.SessionQueue;
import com.tongtech.tmqi.jmsclient.SessionReader;
import com.tongtech.tmqi.jmsclient.Traceable;
import com.tongtech.tmqi.util.TupleOutput;
import java.io.File;
import java.io.PrintStream;
import java.util.Hashtable;
import java.util.Vector;
import java.util.logging.Level;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

public class MessageConsumerImpl
extends Consumer
implements MessageConsumer,
Traceable,
FileMessageStatus {
    protected MessageListener messageListener = null;
    protected SessionImpl session = null;
    protected ReceiveQueue receiveQueue = null;
    static Logger logger = LoggerFactory.getLogger(MessageConsumerImpl.class);
    private boolean syncReadFlag = true;
    private SysMessageID lastDeliveredID = null;
    private volatile FileMessageStatus snapshot;
    private StringBuffer TranMsgFileName = null;

    public MessageConsumerImpl(SessionImpl session, javax.jms.Destination dest, String messageSelector, boolean noLocal) throws JMSException {
        super(session.getConnection(), dest, messageSelector, noLocal);
        this.session = session;
        this.init();
    }

    public MessageConsumerImpl(SessionImpl session, javax.jms.Destination dest) throws JMSException {
        super(session.getConnection(), dest);
        this.session = session;
    }

    protected void init() throws JMSException {
        try {
            this.checkConsumerCreation();
            this.receiveQueue = new ReceiveQueue();
            if (!this.session.getTransacted()) {
                this.acknowledgeMode = this.session.acknowledgeMode;
            }
            this.TranMsgFileName = new StringBuffer();
            if (logger.isTraceEnabled()) {
                logger.trace("start add interest.");
            }
            this.addInterest();
            if (logger.isTraceEnabled()) {
                logger.trace("I_CONSUMER_CREATED");
            }
        }
        catch (JMSException jmse) {
            ExceptionHandler.throwJMSException(jmse);
        }
    }

    private void addInterest() throws JMSException {
        this.session.checkConsumerCreation();
        this.registerInterest();
    }

    private void removeInterest() throws JMSException {
        this.session.removeMessageConsumer(this);
        this.deregisterInterest();
    }

    private synchronized void setSyncReadFlag(boolean flag) {
        this.syncReadFlag = flag;
    }

    protected synchronized boolean getSyncReadFlag() {
        return this.syncReadFlag;
    }

    protected void checkReceive() throws JMSException {
        JMSException jmse;
        String errorString;
        this.checkState();
        if (!this.getSyncReadFlag()) {
            errorString = AdministeredObject.cr.getKString("C4032");
            jmse = new JMSException(errorString, "C4032");
            ExceptionHandler.throwJMSException(jmse);
        }
        if (this.session.failoverOccurred && this.session.acknowledgeMode == 2) {
            errorString = AdministeredObject.cr.getKString("C4106");
            jmse = new JMSException(errorString, "C4106");
            ExceptionHandler.throwJMSException(jmse);
        }
        this.session.checkFailOver();
    }

    protected void stop() {
        this.receiveQueue.stop();
    }

    protected void stopNoWait() {
        this.receiveQueue.stopNoWait();
    }

    protected void start() {
        this.receiveQueue.start();
    }

    protected SessionQueue getReceiveQueue() {
        return this.receiveQueue;
    }

    @Override
    public SessionImpl getSession() {
        return this.session;
    }

    @Override
    public Long getReadQueueId() {
        return this.session.getSessionId();
    }

    @Override
    protected void onMessage(MessageImpl message) throws JMSException {
        logger.info("MessageConsumer onMessage {} , msgid={}", new Object[]{new Boolean(this.getSyncReadFlag()), message.getJMSMessageID()});
        if (this.getSyncReadFlag()) {
            this.receiveQueue.enqueueNotify(message);
        } else if (this.receiveQueue.isEmpty()) {
            this.deliverAndAcknowledge(message);
        } else {
            this.receiveQueue.enqueueNotify(message);
            this.onMessageToListenerFromReceiveQueue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onMessageToListenerFromReceiveQueue() throws JMSException {
        if (!this.receiveQueue.isEmpty()) {
            ReceiveQueue receiveQueue = this.receiveQueue;
            synchronized (receiveQueue) {
                while (!this.receiveQueue.isEmpty()) {
                    MessageImpl message = (MessageImpl)this.receiveQueue.dequeue();
                    if (message == null) continue;
                    this.deliverAndAcknowledge(message);
                }
                this.session.sessionQueue.listenerCount.decrementAndGet();
            }
        }
    }

    protected void deliverAndAcknowledge(MessageImpl message) throws JMSException {
        block5: {
            this.session.sessionReader.setCurrentMessage(message);
            TupleOutput out = new TupleOutput();
            out.writeLong(message.getInterestID());
            out.writeString(message.getJMSMessageID());
            out.writeBoolean(message.getPacket().getIsQueue());
            out.writeString(message.getPacket().getDestination());
            out.writeString(message.getPacket().getSrcNode());
            out.writeString(message.getPacket().getSubQueue());
            out.writeLong(message.getSeqId());
            message.setRealAckMsgInfo(out);
            try {
                this.messageListener.onMessage((Message)message);
            }
            catch (Exception e) {
                logger.warn("", e);
                if (this.session.getTransacted() || this.session.acknowledgeMode == 2) break block5;
                message.doAcknowledge = false;
                message.setJMSRedelivered(true);
                try {
                    this.messageListener.onMessage((Message)message);
                    message.doAcknowledge = true;
                }
                catch (Exception e1) {
                    logger.warn("", e1);
                }
            }
        }
        if (message.doAcknowledge && !message.consumerInRA) {
            logger.debug("message.doAcknowledge :{} ", new Boolean(message.doAcknowledge));
            this.session.acknowledge(message);
        }
        this.lastDeliveredID = message.getMessageID();
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkState();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkState();
        this.messageListener = listener;
        if (listener == null) {
            this.setSyncReadFlag(true);
        } else {
            this.setSyncReadFlag(false);
            if (this.receiveQueue.size() > 0) {
                this.session.sessionQueue.setListenerLateNotify();
            }
        }
    }

    public Message receive() throws JMSException {
        return this.receive(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receive(long timeout) throws JMSException {
        MessageImpl message = null;
        this.checkReceive();
        try {
            if (logger.isTraceEnabled()) {
                logger.trace("waiting for receive message 1 {},timeout:{}", this.receiveQueue, (Object)new Long(timeout));
            }
            message = (MessageImpl)this.receiveQueue.dequeueWait(timeout);
            if (logger.isTraceEnabled()) {
                logger.trace("received message {}", message == null);
            }
            if (message != null) {
                this.session.acknowledge(message);
                this.lastDeliveredID = message.getMessageID();
                if (this.session.isTransacted && message instanceof FileMessage) {
                    FileMessage fileMsg = (FileMessage)((Object)message);
                    this.setTranMsgFileName(fileMsg.getFile());
                }
            } else if (this.session.connection.connectionIsBroken) {
                String errorString = AdministeredObject.cr.getKString("C4063");
                com.tongtech.jms.JMSException jmse = new com.tongtech.jms.JMSException(errorString, "C4063");
                if (this.session.connection.readChannel != null) {
                    this.session.connection.readChannel.savedJMSException(jmse);
                }
                ExceptionHandler.throwJMSException(jmse);
            }
        }
        catch (Error e) {
            e.printStackTrace();
            ExceptionHandler.logError(e);
        }
        finally {
            this.receiveQueue.setReceiveInProcess(false);
        }
        return message;
    }

    public Message receiveNoWait() throws JMSException {
        MessageImpl message = null;
        this.checkReceive();
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("receiveQueue is Locked:{}", new Boolean(this.receiveQueue.getIsLocked()));
            }
            if (this.receiveQueue.getIsLocked()) {
                Message message2 = null;
                return message2;
            }
            this.receiveQueue.setReceiveInProcess(true);
            message = (MessageImpl)this.receiveQueue.dequeue();
            if (message != null) {
                this.session.acknowledge(message);
                this.lastDeliveredID = message.getMessageID();
            }
        }
        finally {
            this.receiveQueue.setReceiveInProcess(false);
        }
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        int reduceFlowCount = 0;
        ReceiveQueue receiveQueue = this.receiveQueue;
        synchronized (receiveQueue) {
            if (this.isClosed) {
                return;
            }
            this.isClosed = true;
        }
        try {
            if (Thread.currentThread() != this.session.sessionReader.sessionThread) {
                this.session.sessionQueue.stop(true);
            }
            this.stop();
            reduceFlowCount = this.receiveQueue.size();
            this.receiveQueue.close();
            if (!this.session.connection.isBroken() && !this.session.connection.recoverInProcess) {
                if (this.session.dupsOkAckOnTimeout) {
                    this.session.syncedDupsOkCommitAcknowledge();
                }
                this.removeInterest();
            }
            if (this.session.isTransacted || this.session.acknowledgeMode == 2 || this.session.acknowledgeMode == 3) {
                this.session.removeUnAckedMessages(this.interestId);
            }
            this.deleteFileOfRollbackMsg();
            this.removeUndeliveredMessages();
            if (Thread.currentThread() == this.session.sessionReader.sessionThread) {
                this.session.sessionReader.currentMessage.doAcknowledge = false;
            } else {
                this.session.sessionQueue.start();
            }
            if (logger.isTraceEnabled()) {
                logger.trace("message consumer closed ...");
            }
            if (logger.isTraceEnabled()) {
                logger.trace("{}", this);
            }
            this.messageListener = null;
            this.isClosed = true;
        }
        finally {
            this.session.resetConnectionFlowControl(reduceFlowCount);
            if (SessionImpl.sessionLogger.isLoggable(Level.FINE)) {
                this.logLifeCycle("I301");
            }
        }
    }

    public void deleteFileOfRollbackMsg() throws JMSException {
        Object[] msgs = this.receiveQueue.toArray();
        for (int i = 0; i < msgs.length; ++i) {
            FileMessage fileMsg;
            File f;
            if (msgs[i] == null || !(msgs[i] instanceof FileMessage) || !(f = new File((fileMsg = (FileMessage)msgs[i]).getFile())).exists() || !f.isFile()) continue;
            f.delete();
        }
    }

    protected void removeUndeliveredMessages() throws JMSException {
        int reduceFlowCount = 0;
        Object[] obj = this.session.sessionQueue.toArray();
        int size = obj.length;
        if (size > 0) {
            int i;
            Vector<ReadOnlyPacket> removeq = new Vector<ReadOnlyPacket>();
            long consumerID = this.interestId;
            for (i = 0; i < size; ++i) {
                FileMessage fileMsg;
                File f;
                ReadOnlyPacket pkt = (ReadOnlyPacket)obj[i];
                if (pkt == null || pkt.getConsumerID() != consumerID) continue;
                removeq.addElement(pkt);
                SessionReader sessionReader = new SessionReader(this.session);
                MessageImpl msg = sessionReader.getJMSMessage(pkt);
                if (!(msg instanceof FileMessage) || !(f = new File((fileMsg = (FileMessage)((Object)msg)).getFile())).exists() || !f.isFile()) continue;
                f.delete();
            }
            reduceFlowCount = removeq.size();
            for (i = 0; i < removeq.size(); ++i) {
                if (logger.isTraceEnabled()) {
                    logger.trace("removing msg from sessionq: {}", removeq.elementAt(i));
                }
                this.session.sessionQueue.remove(removeq.elementAt(i));
            }
            this.session.resetConnectionFlowControl(reduceFlowCount);
        }
    }

    public SysMessageID getLastDeliveredID() {
        return this.lastDeliveredID;
    }

    @Override
    public void dump(PrintStream ps) {
    }

    @Override
    protected Hashtable getDebugState(boolean verbose) {
        Hashtable ht = super.getDebugState(verbose);
        ht.put("# pending", String.valueOf(this.receiveQueue.size()));
        ht.put("syncReadFlag", String.valueOf(this.syncReadFlag));
        if (verbose) {
            ht.put("receiveQueue", this.receiveQueue);
        }
        return ht;
    }

    protected void setTranMsgFileName(String fileName) {
        this.TranMsgFileName.append(fileName);
        this.TranMsgFileName.append(";");
    }

    public String getTranMsgFileName() {
        return this.TranMsgFileName.toString();
    }

    public void clearTranMsgFileName() {
        int sb_length = this.TranMsgFileName.length();
        this.TranMsgFileName.delete(0, sb_length);
    }

    public Object TEST_GetAttribute(String name) {
        if (name.startsWith("FlowControl")) {
            return this.session.readChannel.getFlowControl().TEST_GetAttribute(name, this);
        }
        return null;
    }

    public void logLifeCycle(String key) {
        if (SessionImpl.sessionLogger.isLoggable(Level.FINE)) {
            SessionImpl.sessionLogger.log(Level.FINE, key, this);
        }
    }

    @Override
    public String toString() {
        String destName = null;
        try {
            destName = ((Destination)this.destination).getName();
        }
        catch (Exception exception) {
            // empty catch block
        }
        return this.session.toString() + ", ConsumerID=" + this.getInterestId() + ", DestName=" + destName;
    }

    public void setFileProgressSnapshot(FileMessageStatus snapshot) {
        this.snapshot = snapshot;
    }

    public FileMessageStatus getFileProgress() {
        return this.snapshot;
    }

    @Override
    public int getFileProgressPercentage() {
        if (this.snapshot != null) {
            return this.snapshot.getFileProgressPercentage();
        }
        return 0;
    }

    @Override
    public MessageId getMessageId() {
        if (this.snapshot != null) {
            return this.snapshot.getMessageId();
        }
        return null;
    }

    @Override
    public boolean isFileTransporting() {
        return this.snapshot.isFileTransporting();
    }
}

