[jboss-cvs] JBoss Messaging SVN: r2143 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/client/container and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 2 10:21:32 EST 2007


Author: timfox
Date: 2007-02-02 10:21:32 -0500 (Fri, 02 Feb 2007)
New Revision: 2143

Modified:
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java
   branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
Backported http://jira.jboss.com/jira/browse/JBMESSAGING-797 http://jira.jboss.com/jira/browse/JBMESSAGING-638 http://jira.jboss.com/jira/browse/JBMESSAGING-721



Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -40,6 +40,7 @@
 import javax.naming.Reference;
 
 import org.jboss.aop.Advised;
+import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.container.JmsClientAspectXMLLoader;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.delegate.ConnectionDelegate;
@@ -226,20 +227,24 @@
          {
             if (!configLoaded)
             {
-               // Load the client side aspect stack configuration from the server and apply it
-               
-               delegate.init();
-                           
-               byte[] clientAOPConfig = delegate.getClientAOPConfig();
-               
-               // Remove interceptor since we don't want it on the front of the stack
-               ((Advised)delegate)._getInstanceAdvisor().removeInterceptor(delegate.getName());
-               
-               JmsClientAspectXMLLoader loader = new JmsClientAspectXMLLoader();
-               
-               loader.deployXML(clientAOPConfig);
-               
-               configLoaded = true;               
+               // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797               
+               synchronized (AspectManager.instance())
+               {                  
+                  // Load the client side aspect stack configuration from the server and apply it
+                  
+                  delegate.init();
+                              
+                  byte[] clientAOPConfig = delegate.getClientAOPConfig();
+                  
+                  // Remove interceptor since we don't want it on the front of the stack
+                  ((Advised)delegate)._getInstanceAdvisor().removeInterceptor(delegate.getName());
+                  
+                  JmsClientAspectXMLLoader loader = new JmsClientAspectXMLLoader();
+                  
+                  loader.deployXML(clientAOPConfig);
+                  
+                  configLoaded = true; 
+               }
             }
          }   
       }

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -132,11 +132,9 @@
       
       if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
           ackMode == Session.AUTO_ACKNOWLEDGE ||
-          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
-          state.getCurrentTxId() == null)
+          ackMode == Session.DUPS_OK_ACKNOWLEDGE)
       {
-         // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK, and
-         // also for XA sessions not enrolled in a global transaction.
+         // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK
 
          SessionDelegate del = (SessionDelegate)mi.getTargetObject();
          
@@ -186,11 +184,10 @@
       }
       
       if (ackMode == Session.AUTO_ACKNOWLEDGE ||
-          ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
-          ackMode != Session.CLIENT_ACKNOWLEDGE && state.getCurrentTxId() == null)
+          ackMode == Session.DUPS_OK_ACKNOWLEDGE)
       {
          // We acknowledge immediately on a non-transacted session that does not want to
-         // CLIENT_ACKNOWLEDGE, or an XA session not enrolled in a global transaction.
+         // CLIENT_ACKNOWLEDGE
 
          SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
 

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -80,13 +80,13 @@
          xaResource = new MessagingXAResource(parent.getResourceManager(), this);                            
       }
 
-      // If session is transacted and XA, the currentTxId will be updated when the XAResource will
-      // be enrolled with a global transaction.
+      // Note we create the transaction even if XA - XA transactions must behave like
+      // local tx when not enlisted in a global tx
 
-      if (transacted & !xa)
+      if (transacted)
       {
          // Create a local tx
-         currentTxId = parent.getResourceManager().createLocalTx();        
+         currentTxId = parent.getResourceManager().createLocalTx();
       }
 
       executor = new QueuedExecutor(new LinkedQueue());

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -29,6 +29,7 @@
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
+import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.server.ConnectionFactoryManager;
@@ -93,8 +94,14 @@
          new ClientConnectionFactoryDelegate(id, locatorURI, serverPeer.getVersion(),
                                              serverPeer.getServerPeerID(), clientPing);
 
-      ConnectionFactoryAdvised connFactoryAdvised = new ConnectionFactoryAdvised(endpoint);
+      ConnectionFactoryAdvised connFactoryAdvised;
       
+      // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797               
+      synchronized (AspectManager.instance())
+      {
+         connFactoryAdvised = new ConnectionFactoryAdvised(endpoint);
+      }
+      
       JMSDispatcher.instance.registerTarget(new Integer(id), connFactoryAdvised);
 
       endpoints.put(new Integer(id), endpoint);

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -33,6 +33,7 @@
 import javax.jms.InvalidClientIDException;
 import javax.transaction.xa.Xid;
 
+import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.remoting.CallbackServerFactory;
 import org.jboss.jms.delegate.SessionDelegate;
@@ -41,6 +42,7 @@
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.jms.server.SecurityManager;
 import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
 import org.jboss.jms.server.endpoint.advised.SessionAdvised;
 import org.jboss.jms.server.plugin.contract.ChannelMapper;
 import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -186,8 +188,16 @@
          // connection endpoint instance
          ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this);
          putSessionDelegate(sessionID, ep);
-         SessionAdvised sessionAdvised = new SessionAdvised(ep);
-         JMSDispatcher.instance.registerTarget(new Integer(sessionID), sessionAdvised);
+         
+         SessionAdvised advised;
+         
+         // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797               
+         synchronized (AspectManager.instance())
+         {
+            advised = new SessionAdvised(ep);
+         }
+         
+         JMSDispatcher.instance.registerTarget(new Integer(sessionID), advised);
 
          ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
                  

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -23,10 +23,12 @@
 
 import javax.jms.JMSException;
 
+import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.connectionfactory.JNDIBindings;
+import org.jboss.jms.server.endpoint.advised.BrowserAdvised;
 import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
 import org.jboss.jms.server.remoting.JMSDispatcher;
 import org.jboss.jms.util.ExceptionUtil;
@@ -124,9 +126,16 @@
                      defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
    
          int connectionID = endpoint.getConnectionID();
+         
+         ConnectionAdvised advised;
+         
+         // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797               
+         synchronized (AspectManager.instance())
+         { 
+            advised = new ConnectionAdvised(endpoint);
+         }         
    
-         ConnectionAdvised connAdvised = new ConnectionAdvised(endpoint);
-         JMSDispatcher.instance.registerTarget(new Integer(connectionID), connAdvised);
+         JMSDispatcher.instance.registerTarget(new Integer(connectionID), advised);
          
          log.debug("created and registered " + endpoint);
    

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -33,6 +33,7 @@
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 
+import org.jboss.aop.AspectManager;
 import org.jboss.jms.client.delegate.ClientBrowserDelegate;
 import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.delegate.BrowserDelegate;
@@ -43,6 +44,7 @@
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.endpoint.advised.BrowserAdvised;
+import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
 import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
 import org.jboss.jms.server.plugin.contract.ChannelMapper;
 import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -265,9 +267,16 @@
                                        subscription == null ? (Channel)coreDestination : subscription,
                                        this, selector, noLocal, jmsDestination, prefetchSize, dlq);
           
-         JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
-            
+         ConsumerAdvised advised;
          
+         // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797               
+         synchronized (AspectManager.instance())
+         {
+            advised = new ConsumerAdvised(ep);
+         }                  
+         
+         JMSDispatcher.instance.registerTarget(new Integer(consumerID), advised);            
+         
          ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize, maxDeliveryAttempts);
          
          if (subscription != null)
@@ -321,8 +330,16 @@
    	      new ServerBrowserEndpoint(this, browserID, (Channel)destination, messageSelector);
    	   
    	   putBrowserDelegate(browserID, ep);
+         
+         BrowserAdvised advised;
+         
+         // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797               
+         synchronized (AspectManager.instance())
+         { 
+            advised = new BrowserAdvised(ep);
+         }
    	   
-         JMSDispatcher.instance.registerTarget(new Integer(browserID), new BrowserAdvised(ep));
+         JMSDispatcher.instance.registerTarget(new Integer(browserID), advised);
    	   
    	   ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);
    	   

Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -27,6 +27,7 @@
 
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.delegate.ConnectionDelegate;
+import org.jboss.jms.util.MessagingXAException;
 import org.jboss.logging.Logger;
 
 /**
@@ -114,34 +115,30 @@
       if (trace) { log.trace(this + " committing " + xid + (onePhase ? " (one phase)" : " (two phase)")); }
       
       rm.commit(xid, onePhase, connection);
-
-      // leave the session in a 'clean' state, the currentTxId will be set when the XAResource will
-      // be enrolled with a new transaction.
-
-      setCurrentTransactionId(null);
    }
 
    public void end(Xid xid, int flags) throws XAException
    {
       if (trace) { log.trace(this + " ending " + xid + ", flags: " + flags); }
 
+      unsetCurrentTransactionId(xid);    
+      
       synchronized (this)
-      {
+      {         
          switch (flags)
          {
-            case TMSUSPEND :
-               unsetCurrentTransactionId(xid);                             
+            case TMSUSPEND :                                    
                rm.suspendTx(xid);
                break;
             case TMFAIL :
-               unsetCurrentTransactionId(xid);
                rm.endTx(xid, false);
                break;
             case TMSUCCESS :
-               unsetCurrentTransactionId(xid);
                rm.endTx(xid, true);
                break;
-         }
+            default :
+               throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);         
+         } 
       }
    }
    
@@ -179,17 +176,23 @@
       
       boolean convertTx = false;
       
-      if (sessionState.getCurrentTxId() != null)
+      Object currentXid = sessionState.getCurrentTxId();
+      
+      // Sanity check
+      if (currentXid == null)
       {
-         if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
-         {
-            convertTx = true;
-         }
+         throw new MessagingXAException(XAException.XAER_RMFAIL, "Current xid is not set");
       }
+      
+      if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
+      {
+         convertTx = true;
+         
+         if (trace) { log.trace("Converting local tx into global tx branch"); }
+      }      
 
       synchronized (this)
       {
-
          switch (flags)
          {
             case TMNOFLAGS :
@@ -200,6 +203,8 @@
                   // session in a new tx. If the session has any listeners then in that period,
                   // messages can be received asychronously but we want them to be received in the
                   // context of a tx, so we convert.
+                  // Also for an transacted delivery in a MDB we need to do this as discussed
+                  // in fallbackToLocalTx()
                   setCurrentTransactionId(rm.convertTx((LocalTx)sessionState.getCurrentTxId(), xid));
                }
                else
@@ -213,6 +218,8 @@
             case TMRESUME :
                setCurrentTransactionId(rm.resumeTx(xid));
                break;
+            default:
+               throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
          }
       }
    }
@@ -241,14 +248,14 @@
    
    // Private -------------------------------------------------------
    
-   private void setCurrentTransactionId(final Xid xid)
+   private void setCurrentTransactionId(Object xid)
    {
       if (trace) { log.trace(this + " setting current xid to " + xid + ",  previous " + sessionState.getCurrentTxId()); }
 
       sessionState.setCurrentTxId(xid);
    }
    
-   private void unsetCurrentTransactionId(final Xid xid)
+   private void unsetCurrentTransactionId(Object xid)
    {
       if (xid == null)
       {
@@ -261,6 +268,15 @@
       // recycled
       if (xid.equals(sessionState.getCurrentTxId()))
       {
+         // When a transaction association ends we fall back to acting as if in a local tx
+         // This is because for MDBs, the message is received before the global tx
+         // has started. Therefore we receive it in a local tx, then convert the work
+         // done into the global tx branch when the resource is enlisted.
+         // See Mark Little's book "Java Transaction Processing" Chapter 5 for
+         // a full explanation
+         // So in other words - when the session is not enlisted in a global tx
+         // it will always have a local xid set
+         
          sessionState.setCurrentTxId(rm.createLocalTx());
       }
    }

Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -158,209 +158,8 @@
       }
    }
 
-   /**
-    * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-410.
-    */
-   public void testSendNoGlobalTransaction() throws Exception
-   {
-      Transaction suspended = null;
+   
 
-      try
-      {
-         ServerManagement.deployQueue("MyQueue");
-
-         // make sure there's no active JTA transaction
-
-         suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
-         // send a message to the queue, using a JCA wrapper
-
-         Queue queue = (Queue)ic.lookup("queue/MyQueue");
-
-         ConnectionFactory mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
-
-         Connection conn = mcf.createConnection();
-
-         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer p = s.createProducer(queue);
-         p.setDeliveryMode(DeliveryMode.PERSISTENT);
-         Message m = s.createTextMessage("one");
-
-         p.send(m);
-
-         log.debug("message sent");
-
-         conn.close();
-
-         // receive the message
-         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-         conn = cf.createConnection();
-         conn.start();
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer c = s.createConsumer(queue);
-         TextMessage rm = (TextMessage)c.receive(1000);
-
-         assertEquals("one", rm.getText());
-
-         conn.close();
-      }
-      finally
-      {
-         ServerManagement.undeployQueue("MyQueue");
-
-         if (suspended != null)
-         {
-            TransactionManagerLocator.getInstance().locate().resume(suspended);
-         }
-      }
-   }
-
-   /**
-    * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-410. Use a cached connection that
-    * was initally enroled in a global transaction.
-    */
-   public void testSendNoGlobalTransaction2() throws Exception
-   {
-
-      Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
-      try
-      {
-
-         ConnectionFactory mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
-         Connection conn = mcf.createConnection();
-         conn.start();
-
-         UserTransaction ut = ServerManagement.getUserTransaction();
-
-         ut.begin();
-
-         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer p = s.createProducer(queue);
-         Message m = s.createTextMessage("one");
-
-         p.send(m);
-
-         ut.commit();
-
-         conn.close();
-
-         ConnectionFactory cf = (ConnectionFactory)ic.lookup("ConnectionFactory");
-         conn = cf.createConnection();
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         conn.start();
-
-         TextMessage rm = (TextMessage)s.createConsumer(queue).receive(500);
-
-         assertEquals("one", rm.getText());
-
-         conn.close();
-
-         // make sure there's no active JTA transaction
-
-         assertNull(TransactionManagerLocator.getInstance().locate().getTransaction());
-
-         // send a message to the queue, using a JCA wrapper
-
-         mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
-
-         conn = mcf.createConnection();
-
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         p = s.createProducer(queue);
-         p.setDeliveryMode(DeliveryMode.PERSISTENT);
-         m = s.createTextMessage("one");
-
-         p.send(m);
-
-         conn.close();
-
-         // receive the message
-         cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-         conn = cf.createConnection();
-         conn.start();
-         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer c = s.createConsumer(queue);
-         rm = (TextMessage)c.receive(1000);
-
-         assertEquals("one", rm.getText());
-
-         conn.close();
-      }
-      finally
-      {
-         if (suspended != null)
-         {
-            TransactionManagerLocator.getInstance().locate().resume(suspended);
-         }
-      }
-   }
-
-   /**
-    * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-520.
-    */
-   public void testReceiveNoGlobalTransaction() throws Exception
-   {
-      try
-      {
-         ServerManagement.deployQueue("MyQueue2");
-
-         // send a message to the queue
-
-         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-         Queue queue = (Queue)ic.lookup("queue/MyQueue2");
-         Connection conn = cf.createConnection();
-         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         MessageProducer p = s.createProducer(queue);
-         p.setDeliveryMode(DeliveryMode.PERSISTENT);
-         Message m = s.createTextMessage("one");
-         p.send(m);
-         conn.close();
-
-         // make sure there's no active JTA transaction
-
-         Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
-         try
-         {
-            // using a JCA wrapper
-
-            ConnectionFactory mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
-            conn = mcf.createConnection();
-            conn.start();
-
-            // no active JTA transaction here
-
-            s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer c = s.createConsumer(queue);
-
-            // this method should send an untransacted acknowledgment that should clear the delivery
-            TextMessage rm = (TextMessage)c.receive(1000);
-
-            assertEquals("one", rm.getText());
-
-            conn.close();
-
-            // now the queue should be empty
-            ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
-            Integer count = (Integer)ServerManagement.getAttribute(on, "MessageCount");
-            assertEquals(0, count.intValue());
-         }
-         finally
-         {
-
-            if (suspended != null)
-            {
-               TransactionManagerLocator.getInstance().locate().resume(suspended);
-            }
-         }
-      }
-      finally
-      {
-         ServerManagement.undeployQueue("MyQueue2");
-      }
-   }
-
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java	2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java	2007-02-02 15:21:32 UTC (rev 2143)
@@ -38,8 +38,12 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.tx.MessagingXAResource;
+import org.jboss.jms.tx.ResourceManager;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.tm.TransactionManagerLocator;
@@ -126,224 +130,348 @@
 
    // Public ---------------------------------------------------------------------------------------
 
-   //http://jira.jboss.com/jira/browse/JBMESSAGING-721
-   public void testConvertFromLocalTx() throws Exception
+   // See http://jira.jboss.com/jira/browse/JBMESSAGING-638
+   public void testResourceManagerMemoryLeakOnCommit() throws Exception
    {
-      if (ServerManagement.isRemote())
+
+      XAConnection xaConn = null;
+      
+      try
       {
-         return;
+         xaConn = cf.createXAConnection();
+         
+         JBossConnection jbConn = (JBossConnection)xaConn;
+         
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+         
+         ConnectionState state = (ConnectionState)del.getState();
+         
+         ResourceManager rm = state.getResourceManager();
+         
+         XASession xaSession = xaConn.createXASession();
+         
+         xaConn.start();
+         
+         XAResource res = xaSession.getXAResource();
+         
+         XAResource dummy = new DummyXAResource();
+         
+         for (int i = 0; i < 100; i++)
+         {
+            
+            tm.begin();
+                     
+            Transaction tx = tm.getTransaction();
+            
+            tx.enlistResource(res);
+            
+            tx.enlistResource(dummy);
+            
+            assertEquals(1, rm.size());
+            
+            tx.delistResource(res, XAResource.TMSUCCESS);
+            
+            tx.delistResource(dummy, XAResource.TMSUCCESS);
+            
+            tm.commit();
+         }                  
+         
+         assertEquals(1, rm.size());
+         
+         xaConn.close();
+         
+         xaConn = null;
+         
+         assertEquals(0, rm.size());
+
       }
+      finally
+      {
+         if (xaConn != null)
+         {
+            xaConn.close();
+         }
+      }
+   }
+   
+   //See http://jira.jboss.com/jira/browse/JBMESSAGING-638
+   public void testResourceManagerMemoryLeakOnRollback() throws Exception
+   { 
+      XAConnection xaConn = null;
+      
+      try
+      {
+         xaConn = cf.createXAConnection();
+         
+         JBossConnection jbConn = (JBossConnection)xaConn;
+         
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+         
+         ConnectionState state = (ConnectionState)del.getState();
+         
+         ResourceManager rm = state.getResourceManager();
+         
+         XASession xaSession = xaConn.createXASession();
+         
+         xaConn.start();
+         
+         XAResource res = xaSession.getXAResource();
+         
+         XAResource dummy = new DummyXAResource();
+         
+         for (int i = 0; i < 100; i++)
+         {            
+            tm.begin();
+                     
+            Transaction tx = tm.getTransaction();
+            
+            tx.enlistResource(res);
+            
+            tx.enlistResource(dummy);
+            
+            assertEquals(1, rm.size());
+            
+            tx.delistResource(res, XAResource.TMSUCCESS);
+            
+            tx.delistResource(dummy, XAResource.TMSUCCESS);
+            
+            tm.rollback();
+         }                  
+         
+         assertEquals(1, rm.size());
+         
+         xaConn.close();
+         
+         xaConn = null;
+         
+         assertEquals(0, rm.size());
 
+      }
+      finally
+      {
+         if (xaConn != null)
+         {
+            xaConn.close();
+         }
+      }
+   }
+   
+
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-721
+   public void testConvertFromLocalTx() throws Exception
+   {
       Connection conn = null;
-
+      
       XAConnection xaConn = null;
-
+      
       try
       {
-
-         // First send some messages to a queue
-
+      
+         //First send some messages to a queue
+         
          conn = cf.createConnection();
-
+         
          Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+         
          MessageProducer prod = sessSend.createProducer(queue);
-
+         
          TextMessage tm1 = sessSend.createTextMessage("message1");
-
+         
          TextMessage tm2 = sessSend.createTextMessage("message2");
-
+         
          prod.send(tm1);
-
+         
          prod.send(tm2);
-
-
+         
+         
          xaConn = cf.createXAConnection();
-
+         
          XASession xaSession = xaConn.createXASession();
-
+         
          xaConn.start();
-
+         
          MessageConsumer cons = xaSession.createConsumer(queue);
-
-         // Receive the two messages outside of a transaction
-
+         
+         //Receive the two messages outside of a transaction
+         
          TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm1);
-
+         
          assertEquals("message1", rm1.getText());
-
+         
          TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm2);
-
+         
          assertEquals("message2", rm2.getText());
-
+         
          Message rm3 = cons.receive(1000);
-
+         
          assertNull(rm3);
-
-         // Now we enlist the session in an xa transaction
-
-         log.info("enlisting");
+         
+         //Now we enlist the session in an xa transaction
+         
          XAResource res = xaSession.getXAResource();
-
+         
          tm.begin();
-
+         
          Transaction tx = tm.getTransaction();
          tx.enlistResource(res);
-
-         // This should cause the work done previously to be converted into work done in the XA
-         // transaction. This is what an MDB does. There is a difficulty in transactional delivery
-         // with an MDB. The message is received from the destination and then sent to the MDB
-         // container so it can call onMessage().
-         // For transactional delivery the receipt of the message should be in a transaction but by
-         // the time the MDB container is invoked the message has already been received it is too
-         // late - the message has already been received and passed on (see page 199 (chapter 5 JMS
-         // and Transactions, section "Application Server Integration" of Mark Little's book Java
-         // Transaction processing for a discussion of how different app serves deal with this).
-         // The way JBoss Messaging (and JBossMQ) deals with this is to convert any work done prior
-         // to when the XASession is enlisted in the transaction, into work done in the XA
-         // transaction
-
-         // Now rollback the tx - this should cause redelivery of the two messages
+         
+         //This should cause the work done previously to be converted into work done in the xa transaction
+         //this is what an MDB does
+         //There is a difficulty in transactional delivery with an MDB.
+         //The message is received from the destination and then sent to the mdb container so
+         //it can call onMessage.
+         //For transactional delivery the receipt of the message should be in a transaction but by the time
+         //the mdb container is invoked the message has already been received it is too late - the message
+         //has already been received and passed on (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration"
+         //of Mark Little's book Java Transaction processing
+         //for a discussion of how different app serves deal with this)
+         //The way jboss messaging (and jboss mq) deals with this is to convert any work done
+         //prior to when the xasession is enlisted in the tx, into work done in the xa tx
+         
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         
+         //Now rollback the tx - this should cause redelivery of the two messages
          tx.rollback();
-
+         
          rm1 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm1);
-
+         
          assertEquals("message1", rm1.getText());
-
+         
          rm2 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm2);
-
+         
          assertEquals("message2", rm2.getText());
-
+         
          rm3 = cons.receive(1000);
-
+         
          assertNull(rm3);
       }
       finally
-      {
+      {         
          if (conn != null)
          {
             conn.close();
          }
-
+         
          if (xaConn != null)
          {
             xaConn.close();
          }
       }
    }
-
+   
    //http://jira.jboss.com/jira/browse/JBMESSAGING-721
    public void testTransactionIdSetAfterCommit() throws Exception
    {
-      if (ServerManagement.isRemote()) return;
-
       Connection conn = null;
-
+      
       XAConnection xaConn = null;
-
+      
       try
       {
-
+      
          //First send some messages to a queue
-
+         
          conn = cf.createConnection();
-
+         
          Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+         
          MessageProducer prod = sessSend.createProducer(queue);
-
+         
          TextMessage tm1 = sessSend.createTextMessage("message1");
-
+         
          TextMessage tm2 = sessSend.createTextMessage("message2");
-
+         
          prod.send(tm1);
-
+         
          prod.send(tm2);
-
-
+         
+         
          xaConn = cf.createXAConnection();
-
+         
          XASession xaSession = xaConn.createXASession();
-
+         
          xaConn.start();
-
+         
          MessageConsumer cons = xaSession.createConsumer(queue);
-
+         
          //Now we enlist the session in an xa transaction
-
-         log.info("enlisting");
+         
          XAResource res = xaSession.getXAResource();
-
+         
          tm.begin();
-
+         
          Transaction tx = tm.getTransaction();
          tx.enlistResource(res);
-
+         
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         
          //Then we do a commit
          tm.commit();
-
-         //And enlist again
-
-         tx = tm.getTransaction();
-
-
-         tm.begin();
-
-         tx = tm.getTransaction();
-         tx.enlistResource(res);
-
-         //Then we receive the messages
-
+                              
+         //Then we receive the messages outside the tx
+         
          TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm1);
-
+         
          assertEquals("message1", rm1.getText());
-
+         
          TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm2);
-
+         
          assertEquals("message2", rm2.getText());
-
+         
          Message rm3 = cons.receive(1000);
-
+         
          assertNull(rm3);
-
+         
+         //And enlist again - this should convert the work done in the local tx
+         //into the global branch
+         
+         tx = tm.getTransaction();
+         
+         tm.begin();
+         
+         tx = tm.getTransaction();
+         tx.enlistResource(res);
+         
+         tx.delistResource(res, XAResource.TMSUCCESS);         
+               
          //Now rollback the tx - this should cause redelivery of the two messages
          tx.rollback();
-
+         
          rm1 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm1);
-
+         
          assertEquals("message1", rm1.getText());
-
+         
          rm2 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm2);
-
+         
          assertEquals("message2", rm2.getText());
-
+         
          rm3 = cons.receive(1000);
-
+         
          assertNull(rm3);
       }
       finally
-      {
+      {         
          if (conn != null)
          {
             conn.close();
          }
-
+         
          if (xaConn != null)
          {
             xaConn.close();
@@ -351,108 +479,109 @@
       }
 
    }
-
+   
    //http://jira.jboss.com/jira/browse/JBMESSAGING-721
    public void testTransactionIdSetAfterRollback() throws Exception
    {
-      if (ServerManagement.isRemote()) return;
-
       Connection conn = null;
-
+      
       XAConnection xaConn = null;
-
+      
       try
       {
-
+      
          //First send some messages to a queue
-
+         
          conn = cf.createConnection();
-
+         
          Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+         
          MessageProducer prod = sessSend.createProducer(queue);
-
+         
          TextMessage tm1 = sessSend.createTextMessage("message1");
-
+         
          TextMessage tm2 = sessSend.createTextMessage("message2");
-
+         
          prod.send(tm1);
-
+         
          prod.send(tm2);
-
-
+         
+         
          xaConn = cf.createXAConnection();
-
+         
          XASession xaSession = xaConn.createXASession();
-
+         
          xaConn.start();
-
+         
          MessageConsumer cons = xaSession.createConsumer(queue);
-
+         
          //Now we enlist the session in an xa transaction
-
-         log.info("enlisting");
+         
          XAResource res = xaSession.getXAResource();
-
+         
          tm.begin();
-
+         
          Transaction tx = tm.getTransaction();
          tx.enlistResource(res);
-
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         
          //Then we do a rollback
-         tm.rollback();
-
-         tm.begin();
-
-         //And enlist again
-
-         tx = tm.getTransaction();
-         tx.enlistResource(res);
-
-         //Then we receive the messages
-
+         tm.rollback();                 
+         
+         //Then we receive the messages outside the global tx
+         
          TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm1);
-
+         
          assertEquals("message1", rm1.getText());
-
+         
          TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm2);
-
+         
          assertEquals("message2", rm2.getText());
-
+         
          Message rm3 = cons.receive(1000);
-
+         
          assertNull(rm3);
-
+         
+         tm.begin();
+         
+         //And enlist again - the work should then be converted into the global tx branch
+         
+         tx = tm.getTransaction();
+         
+         tx.enlistResource(res);
+         
+         tx.delistResource(res, XAResource.TMSUCCESS);
+               
          //Now rollback the tx - this should cause redelivery of the two messages
          tx.rollback();
-
+         
          rm1 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm1);
-
+         
          assertEquals("message1", rm1.getText());
-
+         
          rm2 = (TextMessage)cons.receive(1000);
-
+         
          assertNotNull(rm2);
-
+         
          assertEquals("message2", rm2.getText());
-
+         
          rm3 = cons.receive(1000);
-
+         
          assertNull(rm3);
       }
       finally
-      {
+      {         
          if (conn != null)
          {
             conn.close();
          }
-
+         
          if (xaConn != null)
          {
             xaConn.close();




More information about the jboss-cvs-commits mailing list