/*
 * Decompiled with CFR 0.152.
 */
package com.tongtech.jms.ra.core;

import com.tongtech.jms.ra.core.Activation;
import com.tongtech.jms.ra.core.ActivationBase;
import com.tongtech.jms.ra.core.Delivery;
import com.tongtech.jms.ra.core.DeliveryStats;
import com.tongtech.jms.ra.core.RAJMSActivationSpec;
import com.tongtech.jms.ra.core.RAJMSObjectFactory;
import com.tongtech.jms.ra.core.RAJMSResourceAdapter;
import com.tongtech.jms.ra.core.SyncDelivery;
import com.tongtech.jms.ra.localization.Localizer;
import com.tongtech.jms.ra.util.Exc;
import com.tongtech.jms.ra.util.Logger;
import com.tongtech.jms.ra.util.Str;
import com.tongtech.jms.ra.util.UrlParser;
import com.tongtech.jms.ra.util.XAssert;
import java.lang.reflect.Method;
import java.rmi.MarshalledObject;
import java.util.Properties;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.resource.ResourceException;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAResource;

public class TopicToQueueActivation
extends ActivationBase {
    private static Logger sLog = Logger.getLogger(TopicToQueueActivation.class);
    private ActivationBase mTopicToQueue;
    private ActivationBase mQueue;
    private String mQueuename;
    private static final Localizer LOCALE = Localizer.get();

    public TopicToQueueActivation(RAJMSResourceAdapter ra, MessageEndpointFactory epf, RAJMSActivationSpec spec) throws Exception {
        super(ra, epf, spec);
        UrlParser subscriberinfo = new UrlParser(spec.getSubscriptionName());
        Properties props = subscriberinfo.getQueryProperties();
        String subscribername = props.getProperty("subscribername");
        String defaultQueuename = "LOADBALQ_" + spec.getDestination() + "_" + subscribername;
        this.mQueuename = props.getProperty("queue", defaultQueuename);
        RAJMSActivationSpec topicSpec = this.copy(spec);
        topicSpec.setSubscriptionName(subscribername);
        topicSpec.setConcurrencyMode(RAJMSActivationSpec.DELIVERYCONCURRENCY_STRS[3]);
        topicSpec.setEndpointPoolMaxSize("1");
        if (!Str.empty(topicSpec.getMBeanName())) {
            String mbeanname = props.getProperty("mbeanname", topicSpec.getMBeanName() + "-LOADBALQ");
            topicSpec.setMBeanName(mbeanname);
        }
        topicSpec.setBatchSize(props.getProperty("batchsize", "10"));
        topicSpec.setHoldUntilAck("0");
        Properties options = new Properties();
        Str.deserializeProperties(Str.parseProperties("JMSJCA.sep=", topicSpec.getOptions()), options);
        options.setProperty("JMSJCA.concurrencymode", RAJMSActivationSpec.DELIVERYCONCURRENCY_STRS[3]);
        topicSpec.setOptions(Str.serializeProperties(options));
        RAJMSActivationSpec queueSpec = this.copy(spec);
        queueSpec.setDestinationType("javax.jms.Queue");
        queueSpec.setSubscriptionName(null);
        queueSpec.setDestination(this.mQueuename);
        queueSpec.setSubscriptionDurability("NonDurable");
        queueSpec.setClientId(null);
        MessageEndpointFactory topicEPF = new MessageEndpointFactory(){

            public MessageEndpoint createEndpoint(XAResource arg0) throws UnavailableException {
                return null;
            }

            public boolean isDeliveryTransacted(Method arg0) throws NoSuchMethodException {
                return false;
            }
        };
        this.mQueue = this.getObjectFactory().createActivation(ra, epf, queueSpec);
        this.mTopicToQueue = new Activation(ra, topicEPF, topicSpec){
            private String queueName;
            {
                this.queueName = TopicToQueueActivation.this.mQueue.getName();
            }

            @Override
            public Delivery createDelivery() throws Exception {
                TopicToQueueDelivery ret = new TopicToQueueDelivery(this, this.getStats(), TopicToQueueActivation.this.mQueuename);
                return ret;
            }

            @Override
            public String getName() {
                return super.getName() + " >> [" + this.queueName + "]";
            }

            @Override
            protected void logDeliveryInitiationException(int attemptPlusOne, int dt, Exception e) {
                if (e instanceof Exc.ConsumerCreationException) {
                    if (attemptPlusOne == 1) {
                        sLog.info(LOCALE.x("E093: [{0}]: message delivery could not be initiated due to a failure to create the subscriber. Assuming that this deployment is on a node in a cluster, there is likely another cluster node already receiving messages from this subscriber and delivering them to the load balancing queue where this deployment will receive them. The subscriber creation attempt will be retried periodically to detect when the active subscriber disconnects. Unsuccessful attempts to subscribe will not be logged. The subscriber could not created because of the following error: {3}", this.getName(), Integer.toString(attemptPlusOne), Integer.toString(dt), e.getCause()), e.getCause());
                    }
                } else {
                    super.logDeliveryInitiationException(attemptPlusOne, dt, e);
                }
            }
        };
    }

    private RAJMSActivationSpec copy(RAJMSActivationSpec tocopy) throws Exception {
        MarshalledObject<RAJMSActivationSpec> copier = new MarshalledObject<RAJMSActivationSpec>(tocopy);
        return copier.get();
    }

    @Override
    public void activate() throws Exception {
        try {
            this.mTopicToQueue.activate();
            this.mQueue.activate();
        }
        catch (Exception e) {
            this.deactivate();
            throw e;
        }
    }

    @Override
    public void deactivate() {
        this.mTopicToQueue.deactivate();
        this.mQueue.deactivate();
    }

    public class TopicToQueueDelivery
    extends SyncDelivery {
        private String mQueueName;

        public TopicToQueueDelivery(Activation a, DeliveryStats stats, String queuename) throws Exception {
            super(a, stats);
            XAssert.xassert(a.isTopic());
            XAssert.xassert(!a.isCMT());
            this.mQueueName = queuename;
        }

        @Override
        protected MessageEndpoint createMessageEndpoint(XAResource xa, Session s) throws Exception {
            return new Copier(s);
        }

        @Override
        protected Class getSessionClass() {
            return Session.class;
        }

        @Override
        protected int getDomain() {
            return 6;
        }

        private class Copier
        implements MessageEndpoint,
        MessageListener {
            private Session mSession;
            private MessageProducer mProducer;

            public Copier(Session s) throws Exception {
                this.mSession = s;
                RAJMSObjectFactory o = TopicToQueueDelivery.this.mActivation.getObjectFactory();
                Destination dest = o.createDestination(this.mSession, false, false, TopicToQueueDelivery.this.mActivation.getActivationSpec(), null, TopicToQueueDelivery.this.mActivation.getRA(), TopicToQueueDelivery.this.mQueueName);
                this.mProducer = o.createMessageProducer(this.mSession, TopicToQueueDelivery.this.mActivation.isCMT() && !TopicToQueueDelivery.this.mActivation.isXAEmulated(), false, dest, TopicToQueueDelivery.this.mActivation.getRA());
            }

            public void afterDelivery() throws ResourceException {
            }

            public void beforeDelivery(Method arg0) throws NoSuchMethodException, ResourceException {
            }

            public void release() {
                if (this.mProducer != null) {
                    try {
                        this.mProducer.close();
                        this.mProducer = null;
                    }
                    catch (JMSException e) {
                        sLog.warn(LOCALE.x("E094 = This {0} could not be closed properly: {1}", this.mProducer.getClass().getName(), (Object)e), e);
                    }
                }
            }

            public void onMessage(Message m) {
                try {
                    if (m.getObjectProperty("JMSJCA.EndOfBatch") == null) {
                        TopicToQueueDelivery.this.mActivation.getObjectFactory().send(false, this.mProducer, m, m.getJMSPriority(), m.getJMSDeliveryMode());
                    }
                }
                catch (JMSException e) {
                    throw Exc.rtexc(LOCALE.x("E149: Redirecting message failed: {0}", (Object)e), e);
                }
            }
        }
    }
}

