/*
 * Decompiled with CFR 0.152.
 */
package com.tongtech.jmsclient;

import com.tongtech.jmsclient.ReadChannel;
import com.tongtech.log.Logger;
import com.tongtech.log.LoggerFactory;
import com.tongtech.tmqi.AdministeredObject;
import com.tongtech.tmqi.io.ReadOnlyPacket;
import com.tongtech.tmqi.io.ReadWritePacket;
import com.tongtech.tmqi.jmsclient.BrowserConsumer;
import com.tongtech.tmqi.jmsclient.ConnectionImpl;
import com.tongtech.tmqi.jmsclient.Consumer;
import com.tongtech.tmqi.jmsclient.FlowControl;
import com.tongtech.tmqi.jmsclient.MessageImpl;
import com.tongtech.tmqi.jmsclient.SessionImpl;
import com.tongtech.tmqi.jmsclient.SessionQueue;
import java.util.logging.Level;
import javax.jms.JMSException;

public class ReadMessageChannel
extends ReadChannel
implements Runnable {
    static Logger logger = LoggerFactory.getLogger(ReadMessageChannel.class);
    protected static final String iReadMessageChannel = "iReadMessageChannel-";
    public Thread readMessageChannelThread = null;

    public ReadMessageChannel(ConnectionImpl connection) {
        super(connection);
        this.init();
    }

    private void init() {
        this.connection.flowControl = this.flowControl = new FlowControl(this.connection);
        this.flowControl.start();
        this.readMessageChannelThread = new Thread(this);
        if (this.connection.hasDaemonThreads()) {
            this.readMessageChannelThread.setDaemon(true);
        }
        this.readMessageChannelThread.setName(iReadMessageChannel + this.connection.getLocalID());
        this.readMessageChannelThread.start();
    }

    private void dispatch(ReadWritePacket pkt) throws JMSException {
        switch (pkt.getPacketType()) {
            case 137: {
                this.processFlowPaused(pkt);
                break;
            }
            case 160: {
                this.receivedGoodByeReply = true;
                this.close();
                break;
            }
            case 1: 
            case 2: 
            case 3: 
            case 4: 
            case 5: 
            case 6: 
            case 7: {
                this.processJMSMessage(pkt);
                break;
            }
            case 120: {
                this.processStopReply(pkt);
                break;
            }
            default: {
                if (this.isClosed) break;
                String errString = AdministeredObject.cr.getKString("W2000");
                logger.warn(errString);
                pkt.dump(logger);
                this.checkConnectionState();
            }
        }
    }

    @Override
    public synchronized void close() {
        if (this.isClosed) {
            return;
        }
        this.connection.sayGoodbye(new Integer(0));
        this.isClosed = true;
        this.flowControl.close();
    }

    private void processFlowPaused(ReadWritePacket pkt) {
        long consumerId = pkt.getConsumerID();
        Consumer consumer = this.interestTable.getConsumer(new Long(consumerId));
        if (consumer != null) {
            this.flowControl.requestResume(consumer);
        }
    }

    protected void processJMSMessage(ReadWritePacket pkt) throws JMSException {
        SessionQueue sessionQ = null;
        Consumer consumer = null;
        Long sessionId = null;
        this.flowControl.messageReceived();
        if (pkt.getFlowPaused()) {
            this.flowControl.requestConnectionFlowResume();
        }
        long id = pkt.getConsumerID();
        consumer = this.interestTable.getConsumer(new Long(id));
        if (logger.isTraceEnabled()) {
            logger.trace("++ReceivedMessage, id={}, consumer={}", new Long(id), (Object)consumer);
        }
        if (consumer != null) {
            sessionId = consumer.getReadQueueId();
            if (sessionId != null) {
                sessionQ = this.readQTable.get(sessionId);
                if (sessionQ != null) {
                    this.flowControl.messageReceived(consumer);
                    if (pkt.getConsumerFlow()) {
                        this.flowControl.requestResume(consumer);
                    }
                    if (consumer instanceof BrowserConsumer) {
                        this.deliverToBrowserConsumer((BrowserConsumer)consumer, pkt);
                    } else {
                        sessionQ.enqueueNotify(pkt);
                    }
                } else {
                    String errorString = AdministeredObject.cr.getKString("W2001");
                    String pktstr = errorString + "\n" + pkt.toVerboseString();
                    ConnectionImpl.connectionLogger.log(Level.WARNING, pktstr);
                }
            } else {
                if (logger.isTraceEnabled()) {
                    logger.trace("ERROR: NO session (null) for packet: ");
                }
                pkt.dump(logger);
                String msg = "No Session for pkt: \n" + pkt.toVerboseString();
                ConnectionImpl.connectionLogger.log(Level.FINE, msg);
            }
        } else {
            if (logger.isTraceEnabled()) {
                logger.trace("ERROR: NO consumer for packet: ");
            }
            pkt.dump(logger);
            String msg = "No consumer for pkt: \n" + pkt.toVerboseString();
            ConnectionImpl.connectionLogger.log(Level.FINE, msg);
        }
    }

    protected void deliverToBrowserConsumer(BrowserConsumer consumer, ReadOnlyPacket pkt) throws JMSException {
        MessageImpl message = this.protocolHandler.getJMSMessage(pkt);
        SessionImpl session = consumer.session;
        message.setSession(session);
        consumer.onMessage(message);
    }

    private void checkConnectionState() {
        try {
            if (this.protocolHandler.isClosed()) {
                logger.warn("Fatal Error: ReadChannel closing due to protocol handler closed.");
                this.close();
            }
        }
        catch (Exception e) {
            logger.info("", e);
        }
    }

    @Override
    public void run() {
        ReadWritePacket packet = null;
        while (!this.isClosed) {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Waiting for read message...{}", new Boolean(this.isClosed));
                }
                packet = this.protocolHandler.readPacket(1);
                this.dispatch(packet);
            }
            catch (JMSException e) {
                if (logger.isTraceEnabled()) {
                    logger.trace("ReadMessageChannel[connection closed={}, received goodbye-reply={}] : {}", new Object[]{new Boolean(this.connection.isClosed), new Boolean(this.receivedGoodByeReply), e.getMessage()}, (Object)e);
                }
                if (this.isFatalErrorSet) {
                    this.fatalError(this.savedError);
                    System.out.println("ReadMessageChannel exit 1...");
                    return;
                }
                if (this.connection.isClosed || this.receivedGoodByeReply) {
                    this.connection.connectionIsBroken = true;
                    this.closeIOAndNotify();
                    System.out.println("ReadMessageChannel exit 2...");
                    return;
                }
                if (this.isBrokerNonResponsive) {
                    this.isBrokerNonResponsive = false;
                    this.connection.triggerConnectionClosedEvent("E207", null);
                } else {
                    this.connection.triggerConnectionClosedEvent("E206", e);
                }
                this.recover2(e);
            }
            catch (Exception ex) {
                logger.warn("no message!!");
                try {
                    Thread.sleep(1000L);
                }
                catch (Exception exception) {
                }
            }
            catch (Throwable error) {
                this.fatalError(error);
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("ReadMessageChannel exit 4...");
        }
    }

    protected void processStopReply(ReadWritePacket pkt) throws JMSException {
        long ackId = pkt.getConsumerID();
        SessionQueue ackQ = this.ackQTable.get(new Long(ackId));
        if (ackQ != null) {
            ackQ.enqueueNotify(pkt);
        } else if (!(this.connection.connectionIsBroken || this.connection.reconnecting || this.connection.isCloseCalled || 124 == pkt.getPacketType() || 128 == pkt.getPacketType())) {
            String errorString = AdministeredObject.cr.getKString("W2001");
            String pktstr = errorString + "\n" + pkt.toVerboseString();
            logger.trace("{} PacketType:{}", (Object)pktstr, (Object)new Integer(pkt.getPacketType()));
        }
    }
}

