[jboss-svn-commits] JBL Code SVN: r35463 - in labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta: tests/src/org/jboss/internal/soa/esb/rosetta/pooling and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Oct 8 08:42:14 EDT 2010


Author: kevin.conner at jboss.com
Date: 2010-10-08 08:42:13 -0400 (Fri, 08 Oct 2010)
New Revision: 35463

Modified:
   labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
   labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
   labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
   labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
   labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
Log:
Track session users, producers/consumers and change caching of XA sessions: JBESB-3201

Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java	2010-10-08 12:42:13 UTC (rev 35463)
@@ -71,6 +71,7 @@
 	private static int CONFIGURED_POOL_SIZE = DEFAULT_POOL_SIZE;
 	private static int CONFIGURED_SLEEP = DEFAULT_SLEEP;
 
+	static final int XA_TRANSACTED = -1 ;
 	/**
 	 * The executor used to create sessions.
 	 */
@@ -245,11 +246,12 @@
             final JmsXASession currentSession = getXASession() ;
             if (currentSession != null)
             {
+                currentSession.incrementReferenceCount() ;
                 return currentSession ;
             }
         }
 
-        final int mode = (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode) ;
+        final int mode = (transacted ? XA_TRANSACTED : acknowledgeMode) ;
         final long end = System.currentTimeMillis() + (sleepTime * 1000) ;
         boolean emitExpiry = LOGGER.isDebugEnabled() ;
 
@@ -512,7 +514,8 @@
         return getSessionsInPool(Session.AUTO_ACKNOWLEDGE) +
             getSessionsInPool(Session.CLIENT_ACKNOWLEDGE) +
             getSessionsInPool(Session.DUPS_OK_ACKNOWLEDGE) +
-            getSessionsInPool(Session.SESSION_TRANSACTED) ;
+            getSessionsInPool(Session.SESSION_TRANSACTED) +
+            getSessionsInPool(XA_TRANSACTED) ;
     }
     
     /**
@@ -596,7 +599,7 @@
      * @param session The XA session.
      * @throws ConnectionException if there is no transaction active.
      */
-    synchronized void associateTransaction(final JmsXASession session)
+    void associateTransaction(final JmsXASession session)
         throws ConnectionException
     {
         JmsSessionPool sessionPool = findOwnerPool(session);
@@ -610,7 +613,7 @@
      * Disassociate the JMS XA Session from a transaction.
      * @param session The XA session.
      */
-    synchronized void disassociateTransaction(final JmsXASession session)
+    void disassociateTransaction(final JmsXASession session)
     {
         JmsSessionPool sessionPool = findOwnerPool(session);
 
@@ -618,7 +621,22 @@
             sessionPool.disassociateTransaction(session);
         }
     }
+    
+    /**
+     * Check that the session is associated with the current transaction.
+     * @param session The XA session.
+     */
+    boolean isAssociated(final JmsXASession session)
+    {
+        JmsSessionPool sessionPool = findOwnerPool(session);
 
+        if(sessionPool != null) {
+            return sessionPool.isAssociated(session);
+        } else {
+            return false ;
+        }
+    }
+
     JmsSessionPool findOwnerPool(final JmsSession session) {
         return session.getSessionPool() ;
     }
@@ -715,10 +733,12 @@
             freeSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
             freeSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
             freeSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+            freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
 
             inUseSessionsMap.put(Session.AUTO_ACKNOWLEDGE, new ArrayList<JmsSession>() );
             inUseSessionsMap.put(Session.CLIENT_ACKNOWLEDGE, new ArrayList<JmsSession>() );
             inUseSessionsMap.put(Session.DUPS_OK_ACKNOWLEDGE, new ArrayList<JmsSession>() );
+            inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
         }
 
         public synchronized void removeSessionPool() {
@@ -746,6 +766,11 @@
             if (freeSessions.size() > 0)
             {
                 final JmsSession session = freeSessions.remove(freeSessions.size()-1);
+                if (transacted)
+                {
+                    final JmsXASession xaSession = (JmsXASession)session ;
+                    xaSession.incrementReferenceCount() ;
+                }
                 inUseSessions.add(session);
                 return session ;
             } else if (getSessionsInPool() < maxSessionsPerConnection) {
@@ -795,8 +820,8 @@
                     {
                         final XAConnectionFactory factory = (XAConnectionFactory)factoryConnection ;
                         jmsConnection = useJMSSecurity ? factory.createXAConnection(username,password): factory.createXAConnection();
-                        freeSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
-                        inUseSessionsMap.put(Session.SESSION_TRANSACTED, new ArrayList<JmsSession>() );
+                        freeSessionsMap.put(XA_TRANSACTED, new ArrayList<JmsSession>() );
+                        inUseSessionsMap.put(XA_TRANSACTED, new ArrayList<JmsSession>() );
                     }
                     else if (factoryConnection instanceof ConnectionFactory)
                     {
@@ -848,7 +873,9 @@
                     final JmsSession session ;
 
                     if (transacted) {
-                        session = new JmsXASession(JmsConnectionPool.this, JmsSessionPool.this, ((XAConnection)currentConnection).createXASession(), currentID, acknowledgeMode);
+                        final JmsXASession xaSession = new JmsXASession(JmsConnectionPool.this, JmsSessionPool.this, ((XAConnection)currentConnection).createXASession(), currentID, acknowledgeMode);
+                        xaSession.incrementReferenceCount() ;
+                        session = xaSession ;
                     } else {
                         session = new JmsSession(JmsConnectionPool.this, JmsSessionPool.this, currentConnection.createSession(transacted, acknowledgeMode), currentID, acknowledgeMode);
                     }
@@ -1004,7 +1031,7 @@
          * @param session The XA session.
          * @throws ConnectionException if there is no transaction active.
          */
-        synchronized void associateTransaction(final JmsXASession session)
+        void associateTransaction(final JmsXASession session)
             throws ConnectionException
         {
             final Object tx = getTransaction() ;
@@ -1012,20 +1039,51 @@
             {
                 throw new ConnectionException("No active transaction") ;
             }
-            transactionsToSessions.put(tx, session) ;
-            sessionsToTransactions.put(session, tx) ;
+            synchronized (this)
+            {
+                transactionsToSessions.put(tx, session) ;
+                sessionsToTransactions.put(session, tx) ;
+            }
         }
 
         /**
          * Disassociate the JMS XA Session from a transaction.
          * @param session The XA session.
          */
-        synchronized void disassociateTransaction(final JmsXASession session)
+        void disassociateTransaction(final JmsXASession session)
         {
-            final Object tx = sessionsToTransactions.remove(session) ;
-            transactionsToSessions.remove(tx) ;
+            final Object tx ;
+            synchronized(this)
+            {
+                tx = sessionsToTransactions.remove(session) ;
+                transactionsToSessions.remove(tx) ;
+            }
         }
 
+        /**
+         * Check that the session is associated with the current transaction.
+         * @param session The XA session.
+         */
+        boolean isAssociated(final JmsXASession session)
+        {
+            try
+            {
+                final Object current = TransactionStrategy.getTransactionStrategy(true).getTransaction() ;
+                final Object tx ;
+                synchronized(this)
+                {
+                    tx = sessionsToTransactions.get(session) ;
+                }
+                if (tx != null)
+                {
+                    return tx.equals(current) ;
+                }
+            }
+            catch (final TransactionStrategyException tse) {} // ignore
+            
+            return false ;
+        }
+
         public void releaseInUseSession(JmsSession session) {
             final int mode = session.getRequestedAcknowledgeMode() ;
 
@@ -1053,7 +1111,7 @@
 
                     final ArrayList<JmsSession> sessions = (freeSessionsMap == null ? null : freeSessionsMap.get(mode));
                     if (sessions != null) {
-                        sessions.add(session) ;
+                        sessions.add(session.duplicateSession()) ;
                     }
                 }
                 session.releaseResources() ;

Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsSession.java	2010-10-08 12:42:13 UTC (rev 35463)
@@ -63,21 +63,29 @@
      */
     private final JmsSessionPool sessionPool ;
     /**
-     * The session delegate.
+     * The underlying session.
      */
     private final Session session ;
     /**
+     * The session delegate.
+     */
+    private final Session sessionDelegate ;
+    /**
      * The pool instance id.
      */
     private final long id ;
     /**
      * The session acknowledge mode.
      */
-    private final int acknowledgeMode ;
+    protected final int acknowledgeMode ;
     /**
      * The requested acknowledge mode for this session.
      */
     private final int requestedAcknowledgeMode ;
+    /**
+     * The flag indicating whether this session is invalid.
+     */
+    protected volatile boolean invalid ;
 
     /**
      * Flag indicating whether this session is suspect or not.
@@ -98,6 +106,27 @@
     private HashSet<MessageProducer> messageProducerSet ;
     
     /**
+     * Duplicate the session wrapper.
+     * @param connectionPool The connection pool associated with this session.
+     * @param sessionPool The session pool associated with this session.
+     * @param session The wrapped session delegate.
+     * @param id The pool instance id.
+     * @param requestedAcknowledgeMode The requested acknowledge mode for this session.
+     * @param acknowledgeMode The original acknowledge mode for this session.
+     */
+    JmsSession(final JmsConnectionPool connectionPool, final JmsSessionPool sessionPool, final Session session, final long id, final int requestedAcknowledgeMode,
+            final int acknowledgeMode)
+    {
+        this.connectionPool = connectionPool ;
+        this.sessionPool = sessionPool ;
+        this.id = id ;
+        this.session = session ;
+        this.sessionDelegate = (Session)getExceptionHandler(connectionPool, Session.class, session) ;
+        this.requestedAcknowledgeMode = requestedAcknowledgeMode ;
+        this.acknowledgeMode = acknowledgeMode ;
+    }
+    
+    /**
      * Create the session wrapper.
      * @param connectionPool The connection pool associated with this session.
      * @param sessionPool The session pool associated with this session.
@@ -112,7 +141,8 @@
         this.connectionPool = connectionPool ;
         this.sessionPool = sessionPool ;
         this.id = id ;
-        this.session = (Session)getExceptionHandler(connectionPool, Session.class, session) ;
+        this.session = session ;
+        this.sessionDelegate = (Session)getExceptionHandler(connectionPool, Session.class, session) ;
         this.requestedAcknowledgeMode = requestedAcknowledgeMode ;
         acknowledgeMode = session.getAcknowledgeMode() ;
         // Workaround for JBESB-1873
@@ -129,132 +159,132 @@
     
     public void close() throws JMSException
     {
-        session.close();
+        sessionDelegate.close();
     }
 
     public void commit() throws JMSException
     {
         setSuspect(true) ;
-        session.commit();
+        sessionDelegate.commit();
         setSuspect(false) ;
     }
 
     public QueueBrowser createBrowser(Queue arg0, String arg1)
             throws JMSException
     {
-        return trackQueueBrowser(session.createBrowser(arg0, arg1));
+        return trackQueueBrowser(sessionDelegate.createBrowser(arg0, arg1));
     }
 
     public QueueBrowser createBrowser(Queue arg0) throws JMSException
     {
-        return trackQueueBrowser(session.createBrowser(arg0));
+        return trackQueueBrowser(sessionDelegate.createBrowser(arg0));
     }
 
     public BytesMessage createBytesMessage() throws JMSException
     {
         associate() ;
-        return session.createBytesMessage();
+        return sessionDelegate.createBytesMessage();
     }
 
     public MessageConsumer createConsumer(Destination arg0, String arg1,
             boolean arg2) throws JMSException
     {
-        return trackMessageConsumer(session.createConsumer(arg0, arg1, arg2));
+        return trackMessageConsumer(sessionDelegate.createConsumer(arg0, arg1, arg2));
     }
 
     public MessageConsumer createConsumer(Destination arg0, String arg1)
             throws JMSException
     {
-        return trackMessageConsumer(session.createConsumer(arg0, arg1));
+        return trackMessageConsumer(sessionDelegate.createConsumer(arg0, arg1));
     }
 
     public MessageConsumer createConsumer(Destination arg0) throws JMSException
     {
-        return trackMessageConsumer(session.createConsumer(arg0));
+        return trackMessageConsumer(sessionDelegate.createConsumer(arg0));
     }
 
     public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1,
             String arg2, boolean arg3) throws JMSException
     {
-        return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1, arg2, arg3));
+        return trackTopicSubscriber(sessionDelegate.createDurableSubscriber(arg0, arg1, arg2, arg3));
     }
 
     public TopicSubscriber createDurableSubscriber(Topic arg0, String arg1)
             throws JMSException
     {
-        return trackTopicSubscriber(session.createDurableSubscriber(arg0, arg1));
+        return trackTopicSubscriber(sessionDelegate.createDurableSubscriber(arg0, arg1));
     }
 
     public MapMessage createMapMessage() throws JMSException
     {
         associate() ;
-        return session.createMapMessage();
+        return sessionDelegate.createMapMessage();
     }
 
     public Message createMessage() throws JMSException
     {
         associate() ;
-        return session.createMessage();
+        return sessionDelegate.createMessage();
     }
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
         associate() ;
-        return session.createObjectMessage();
+        return sessionDelegate.createObjectMessage();
     }
 
     public ObjectMessage createObjectMessage(Serializable arg0)
             throws JMSException
     {
         associate() ;
-        return session.createObjectMessage(arg0);
+        return sessionDelegate.createObjectMessage(arg0);
     }
 
     public MessageProducer createProducer(Destination arg0) throws JMSException
     {
-        return trackMessageProducer(session.createProducer(arg0));
+        return trackMessageProducer(sessionDelegate.createProducer(arg0));
     }
 
     public Queue createQueue(String arg0) throws JMSException
     {
         associate() ;
-        return session.createQueue(arg0);
+        return sessionDelegate.createQueue(arg0);
     }
 
     public StreamMessage createStreamMessage() throws JMSException
     {
         associate() ;
-        return session.createStreamMessage();
+        return sessionDelegate.createStreamMessage();
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
     {
         associate() ;
-        return session.createTemporaryQueue();
+        return sessionDelegate.createTemporaryQueue();
     }
 
     public TemporaryTopic createTemporaryTopic() throws JMSException
     {
         associate() ;
-        return session.createTemporaryTopic();
+        return sessionDelegate.createTemporaryTopic();
     }
 
     public TextMessage createTextMessage() throws JMSException
     {
         associate() ;
-        return session.createTextMessage();
+        return sessionDelegate.createTextMessage();
     }
 
     public TextMessage createTextMessage(String arg0) throws JMSException
     {
         associate() ;
-        return session.createTextMessage(arg0);
+        return sessionDelegate.createTextMessage(arg0);
     }
 
     public Topic createTopic(String arg0) throws JMSException
     {
         associate() ;
-        return session.createTopic(arg0);
+        return sessionDelegate.createTopic(arg0);
     }
 
     public int getAcknowledgeMode() throws JMSException
@@ -265,43 +295,43 @@
     public MessageListener getMessageListener() throws JMSException
     {
         associate() ;
-        return session.getMessageListener();
+        return sessionDelegate.getMessageListener();
     }
 
     public boolean getTransacted() throws JMSException
     {
         associate() ;
-        return session.getTransacted();
+        return sessionDelegate.getTransacted();
     }
 
     public void recover() throws JMSException
     {
         associate() ;
-        session.recover();
+        sessionDelegate.recover();
     }
 
     public void rollback() throws JMSException
     {
         setSuspect(true) ;
-        session.rollback();
+        sessionDelegate.rollback();
         setSuspect(false) ;
     }
 
     public void run()
     {
-        session.run();
+        sessionDelegate.run();
     }
 
     public void setMessageListener(MessageListener arg0) throws JMSException
     {
         associate() ;
-        session.setMessageListener(arg0);
+        sessionDelegate.setMessageListener(arg0);
     }
 
     public void unsubscribe(String arg0) throws JMSException
     {
         associate() ;
-        session.unsubscribe(arg0);
+        sessionDelegate.unsubscribe(arg0);
     }
 
     private synchronized QueueBrowser trackQueueBrowser(QueueBrowser queueBrowser)
@@ -423,12 +453,21 @@
     protected void associate()
         throws JMSException
     {
+        if (invalid)
+        {
+            throw new JMSException("Session no longer valid") ;
+        }
     }
     
     protected JmsSessionPool getSessionPool()
     {
         return sessionPool ;
     }
+
+    protected Session getSession()
+    {
+        return session ;
+    }
     
     protected void setSuspect(final boolean suspect)
     {
@@ -444,6 +483,17 @@
     {
         return requestedAcknowledgeMode ;
     }
+    
+    public boolean isInvalid()
+    {
+        return invalid ;
+    }
+    
+    public JmsSession duplicateSession()
+    {
+        invalid = true ;
+        return new JmsSession(connectionPool, sessionPool, session, id, requestedAcknowledgeMode, acknowledgeMode) ;
+    }
 
     /**
      * Wrap the object in an exception handler.

Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsXASession.java	2010-10-08 12:42:13 UTC (rev 35463)
@@ -25,14 +25,16 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.QueueBrowser;
+import javax.jms.Session;
 import javax.jms.TopicSubscriber;
 import javax.jms.XASession;
-import javax.transaction.Status;
 import javax.transaction.Synchronization;
 import javax.transaction.xa.XAResource;
 
@@ -53,22 +55,42 @@
     /**
      * The session delegate.
      */
-    private final XASession session ;
+    private final XASession xaSessionDelegate ;
     
     /**
-     * Flag indicating whether this session is associated with a transaction.
+     * Cleanup actions
      */
-    private boolean associated ;
+    private enum Cleanup { close, release }
     
     /**
-     * Cleanup actions
+     * The cleanup action for the synchronization.
      */
-    private enum Cleanup { close, release, none }
+    private Cleanup cleanupAction = Cleanup.close ;
+    /**
+     * Flag representing whether the handlers are valid or not.
+     */
+    private AtomicBoolean handlerValid = new AtomicBoolean(true) ;
+    /**
+     * Use count for tracking references.
+     */
+    private AtomicInteger refCount = new AtomicInteger(0) ;
     
     /**
-     * The cleanup action for the synchronization.
+     * Duplicate the session wrapper.
+     * @param connectionPool The current connection pool
+     * @param sessionPool The current session pool
+     * @param session The underlying session.
+     * @param id The pool instance id.
+     * @param requestedAcknowledgeMode The requested acknowledge mode for this session.
+     * @param acknowledgeMode The original acknowledge mode for this session.
      */
-    private Cleanup cleanupAction = Cleanup.none ;
+    JmsXASession(final JmsConnectionPool connectionPool, final JmsSessionPool sessionPool, final Session session,
+        final long id, final int requestedAcknowledgeMode, final int acknowledgeMode)
+    {
+        super(connectionPool, sessionPool, session, id, requestedAcknowledgeMode, acknowledgeMode) ;
+        this.connectionPool = connectionPool ;
+        this.xaSessionDelegate = (XASession)getExceptionHandler(connectionPool, XASession.class, session) ;
+    }
     
     /**
      * Create the session wrapper.
@@ -84,7 +106,7 @@
     {
         super(connectionPool, sessionPool, session, id, requestedAcknowledgeMode) ;
         this.connectionPool = connectionPool ;
-        this.session = (XASession)getExceptionHandler(connectionPool, XASession.class, session) ;
+        this.xaSessionDelegate = (XASession)getExceptionHandler(connectionPool, XASession.class, session) ;
     }
     
     @Override
@@ -111,67 +133,119 @@
     @Override
     protected MessageProducer getMessageProducer(MessageProducer messageProducer)
     {
-        final InvocationHandler handler = new AssociationHandler(messageProducer) ;
+        final InvocationHandler handler = new AssociationHandler(messageProducer, handlerValid) ;
         return (MessageProducer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageProducer.class}, handler );
     }
     
     @Override
     protected MessageConsumer getMessageConsumer(MessageConsumer messageConsumer)
     {
-        final InvocationHandler handler = new AssociationHandler(messageConsumer) ;
+        final InvocationHandler handler = new AssociationHandler(messageConsumer, handlerValid) ;
         return (MessageConsumer)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {MessageConsumer.class}, handler);
     }
     
     @Override
     protected QueueBrowser getQueueBrowser(QueueBrowser queueBrowser)
     {
-        final InvocationHandler handler = new AssociationHandler(queueBrowser) ;
+        final InvocationHandler handler = new AssociationHandler(queueBrowser, handlerValid) ;
         return (QueueBrowser)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {QueueBrowser.class}, handler);
     }
     
     @Override
     protected TopicSubscriber getTopicSubscriber(TopicSubscriber topicSubscriber)
     {
-        final InvocationHandler handler = new AssociationHandler(topicSubscriber) ;
+        final InvocationHandler handler = new AssociationHandler(topicSubscriber, handlerValid) ;
         return (TopicSubscriber)Proxy.newProxyInstance(getClass().getClassLoader(), new Class[] {TopicSubscriber.class}, handler);
     }
     
-    protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+    private void releaseSession()
     {
-        if (associated)
+        try
         {
-            cleanupAction = Cleanup.close ;
+            connectionPool.handleReleaseSession(this) ;
         }
-        else
+        finally
         {
+            handlerValid.set(false) ;
+            handlerValid = new AtomicBoolean(true) ;
+        }
+    }
+    
+    private void closeSession()
+    {
+        try
+        {
             connectionPool.handleCloseSession(this) ;
         }
+        finally
+        {
+            handlerValid.set(false) ;
+            handlerValid = new AtomicBoolean(true) ;
+        }
     }
+
+    public JmsSession duplicateSession()
+    {
+        invalid = true ;
+        return new JmsXASession(connectionPool, getSessionPool(), getSession(), getId(), getRequestedAcknowledgeMode(), acknowledgeMode) ;
+    }
     
+    protected void incrementReferenceCount()
+    {
+        refCount.incrementAndGet() ;
+    }
+    
+    protected synchronized void handleCloseSession(final JmsConnectionPool jmsConnectionPool)
+    {
+        final boolean associated = connectionPool.isAssociated(this) ;
+        final int count = refCount.decrementAndGet() ;
+        if (!associated && (count == 0))
+        {
+            if (cleanupAction == Cleanup.release)
+            {
+                releaseSession() ;
+            }
+            else
+            {
+                closeSession() ;
+            }
+        }
+    }
+    
     protected synchronized void handleReleaseSession(JmsConnectionPool jmsConnectionPool)
     {
+        final boolean associated = connectionPool.isAssociated(this) ;
+        final int count = refCount.decrementAndGet() ;
         if (associated)
         {
             cleanupAction = Cleanup.release ;
         }
-        else
+        else if (count == 0)
         {
-            connectionPool.handleReleaseSession(this) ;
+            releaseSession() ;
         }
     }
     
     protected synchronized void associate()
         throws JMSException
     {
-        if (!associated)
+        if (invalid)
         {
-            cleanupAction = Cleanup.none ;
+            throw new JMSException("Session no longer valid") ;
+        }
+        if (!connectionPool.isAssociated(this))
+        {
             final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
             try
             {
+                if (!transactionStrategy.isActive())
+                {
+                    throw new JMSException("No active transaction") ;
+                }
+                
                 transactionStrategy.registerSynchronization(this) ;
                 setSuspect(true) ;
-                final XAResource resource = session.getXAResource() ;
+                final XAResource resource = xaSessionDelegate.getXAResource() ;
                 transactionStrategy.enlistResource(resource) ;
             }
             catch (final TransactionStrategyException tse)
@@ -192,7 +266,6 @@
             }
             
             setSuspect(false) ;
-            associated = true ;
         }
     }
     
@@ -203,22 +276,17 @@
     public synchronized void afterCompletion(final int result)
     {
         connectionPool.disassociateTransaction(this) ;
-        switch (cleanupAction)
+        if (refCount.get() == 0)
         {
-        case close:
-            if (result == Status.STATUS_COMMITTED)
+            if (cleanupAction == Cleanup.close)
             {
-                connectionPool.handleCloseSession(this) ;
-                break ;
+                closeSession() ;
             }
-            // fall through
-        case release:
-            connectionPool.handleReleaseSession(this) ;
-            break ;
-        case none:
-            // Reference held by caller
+            else
+            {
+                releaseSession() ;
+            }
         }
-        associated = false ;
     }
     
     /**
@@ -231,27 +299,55 @@
          * The target instance.
          */
         private final Object target ;
+        /**
+         * Flag representing validity of the handler.
+         */
+        private final AtomicBoolean handlerValid ;
         
         /**
          * Construct the handler using the specified target.
          * @param target The target instance.
+         * @param handlerValid The flag representing whether the handler is still valid.
          */
-        public AssociationHandler(final Object target)
+        public AssociationHandler(final Object target, final AtomicBoolean handlerValid)
         {
             this.target = target ;
+            this.handlerValid = handlerValid ;
         }
         
         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
         {
-            associate() ;
-            try
+            final String methodName = method.getName() ;
+            if ("hashCode".equals(methodName))
             {
-                return method.invoke(target, args);
+                return hashCode() ;
             }
-            catch (final InvocationTargetException ite)
+            else if ("equals".equals(methodName))
             {
-                throw ite.getCause() ;
+                return this.equals(args[0]) ;
             }
+            else
+            {
+                if (handlerValid.get())
+                {
+                    if (!"close".equals(methodName))
+                    {
+                        associate() ;
+                    }
+                    try
+                    {
+                        return method.invoke(target, args);
+                    }
+                    catch (final InvocationTargetException ite)
+                    {
+                        throw ite.getCause() ;
+                    }
+                }
+                else
+                {
+                    throw new JMSException("Instance is no longer valid") ;
+                }
+            }
         }
     }
 }

Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java	2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolUnitTest.java	2010-10-08 12:42:13 UTC (rev 35463)
@@ -24,6 +24,7 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,6 +43,7 @@
 import javax.jms.XAConnectionFactory;
 import javax.jms.XASession;
 import javax.naming.Context;
+import javax.transaction.Status;
 import javax.transaction.Synchronization;
 import javax.transaction.xa.XAResource;
 
@@ -136,7 +138,7 @@
         
         final JmsSession session2 = pool.getSession() ;
         Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
-        Assert.assertSame("Same session returned", session, session2) ;
+        Assert.assertSame("Same session returned", session.getSession(), session2.getSession()) ;
     }
     
     @Test
@@ -360,7 +362,7 @@
         {
             final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
             // transactional sessions are requested with transacted acknowledge mode
-            final int acknowledgeMode = Session.SESSION_TRANSACTED ;
+            final int acknowledgeMode = JmsConnectionPool.XA_TRANSACTED ;
             Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
             Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
             final JmsSession session = pool.getSession() ;
@@ -621,6 +623,98 @@
         }
     }
     
+    @Test
+    public void testXaSessionReuse()
+        throws Exception
+    {
+        final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
+        final MockJTATransactionStrategy jtaTransactionStrategy = new MockJTATransactionStrategy() ;
+        TransactionStrategy.setTransactionStrategy(jtaTransactionStrategy) ;
+        try
+        {
+            final int acknowledgeMode = JmsConnectionPool.XA_TRANSACTED ;
+            
+            final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
+            
+            Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+            Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+            
+            jtaTransactionStrategy.begin() ;
+            
+            final JmsSession firstSession = pool.getSession() ;
+            
+            Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+            Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+
+            jtaTransactionStrategy.terminateTransaction(true) ;
+            
+            // make sure it is still in use
+            Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+            Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+            
+            // should throw exception, no transaction
+            try
+            {
+                firstSession.createProducer(null) ;
+                Assert.fail("exception expected, no active transaction") ;
+            }
+            catch (final JMSException jmse) {}
+            
+            Assert.assertEquals("number of synchronizations", 0, jtaTransactionStrategy.numSynchronizations()) ;
+            Assert.assertEquals("number of resources", 0, jtaTransactionStrategy.numResources()) ;
+            
+            jtaTransactionStrategy.begin() ;
+            
+            // no exception as it should enlist
+            final MessageProducer firstProducer = firstSession.createProducer(null) ;
+            
+            Assert.assertEquals("number of synchronizations", 1, jtaTransactionStrategy.numSynchronizations()) ;
+            Assert.assertEquals("number of resources", 1, jtaTransactionStrategy.numResources()) ;
+            
+            final JmsSession secondSession = pool.getSession() ;
+            
+            Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+            Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+            
+            final MessageProducer secondProducer = secondSession.createProducer(null) ;
+            
+            Assert.assertEquals("number of synchronizations", 1, jtaTransactionStrategy.numSynchronizations()) ;
+            Assert.assertEquals("number of resources", 1, jtaTransactionStrategy.numResources()) ;
+            
+            pool.closeSession(secondSession) ;
+            
+            Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+            Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+            
+            secondProducer.close() ;
+            
+            jtaTransactionStrategy.terminateTransaction(true) ;
+            // Shouldn't be back in the pool as there is still one reference open
+            Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+            Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+            
+            // should throw exception, no transaction
+            try
+            {
+                firstProducer.getDestination() ;
+                Assert.fail("exception expected, no active transaction") ;
+            }
+            catch (final JMSException jmse) {}
+            
+            // close should be okay
+            firstProducer.close() ;
+            
+            pool.closeSession(firstSession) ;
+
+            Assert.assertEquals("current pool free count", 1, pool.getFreeSessionsInPool(acknowledgeMode)) ;
+            Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
+        }
+        finally
+        {
+            TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
+        }
+    }
+    
     private Map<String, String> getPoolEnv()
     {
         final Map<String, String> env = new HashMap<String, String>() ;
@@ -704,6 +798,18 @@
                 return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {XASession.class},
                         new MockSessionInvocationHandler(Session.SESSION_TRANSACTED)) ;
             }
+            else if ("toString".equals(methodName))
+            {
+                return toString() ;
+            }
+            else if ("hashCode".equals(methodName))
+            {
+                return hashCode() ;
+            }
+            else if ("equals".equals(methodName))
+            {
+                return equals(args[0]) ;
+            }
             else
             {
                 System.out.println("Connection method " + method.getName() + " called") ;
@@ -784,6 +890,18 @@
                 return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageProducer.class},
                         new MockNullInvocationHandler()) ;
             }
+            else if ("toString".equals(methodName))
+            {
+                return toString() ;
+            }
+            else if ("hashCode".equals(methodName))
+            {
+                return hashCode() ;
+            }
+            else if ("equals".equals(methodName))
+            {
+                return equals(args[0]) ;
+            }
             else
             {
                 System.out.println("Session method " + method.getName() + " called") ;
@@ -859,6 +977,77 @@
         }
     }
     
+    private static final class MockJTATransactionStrategy extends NullTransactionStrategy
+    {
+        private Object tx ;
+        private ArrayList<XAResource> resources = new ArrayList<XAResource>() ;
+        private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>() ;
+
+        public void begin()
+            throws TransactionStrategyException
+        {
+            tx = new Object() ;
+        }
+
+        @Override
+        public boolean isActive()
+            throws TransactionStrategyException
+        {
+            return (tx != null) ;
+        }
+
+        @Override
+        public void registerSynchronization(final Synchronization sync)
+                throws TransactionStrategyException
+        {
+            synchronizations.add(sync) ;
+        }
+
+        @Override
+        public void enlistResource(final XAResource resource)
+                throws TransactionStrategyException
+        {
+            resources.add(resource) ;
+        }
+
+        @Override
+        public Object getTransaction()
+            throws TransactionStrategyException
+        {
+            return tx ;
+        }
+
+        public void terminateTransaction(final boolean commit)
+        {
+            final int status = (commit ? Status.STATUS_COMMITTED : Status.STATUS_ROLLEDBACK) ;
+            
+            for(Synchronization sync: synchronizations)
+            {
+                sync.beforeCompletion() ;
+            }
+            
+            tx = null ;
+            resources.clear() ;
+            
+            for(Synchronization sync: synchronizations)
+            {
+                sync.afterCompletion(status) ;
+            }
+            
+            synchronizations.clear();
+        }
+        
+        public int numSynchronizations()
+        {
+            return synchronizations.size() ;
+        }
+        
+        public int numResources()
+        {
+            return resources.size() ;
+        }
+    }
+    
     public static junit.framework.Test suite()
     {
         return new JUnit4TestAdapter(JmsConnectionPoolUnitTest.class);

Modified: labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java	2010-10-08 12:13:15 UTC (rev 35462)
+++ labs/jbossesb/branches/JBESB_4_9_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/rosetta/pooling/MaxSessionsPerConnectionUnitTest.java	2010-10-08 12:42:13 UTC (rev 35463)
@@ -83,7 +83,7 @@
 
             // Now try another session.  Should be same session instance as last time.
             // Make sure it's returned to the pool on closing...
-            assertTrue(session1 == session2);
+            assertSame("Underlying session", session1.getSession(), session2.getSession());
             assertEquals(1, sessPools.size());
             assertEquals(0, sessPools.get(0).getFreeSessionsInPool(Session.AUTO_ACKNOWLEDGE));
             assertEquals(1, sessPools.get(0).getInUseSessionsInPool(Session.AUTO_ACKNOWLEDGE));



More information about the jboss-svn-commits mailing list