[jboss-cvs] JBoss Messaging SVN: r3601 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 21 10:52:25 EST 2008


Author: ataylor
Date: 2008-01-21 10:52:25 -0500 (Mon, 21 Jan 2008)
New Revision: 3601

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1221 - fix for memry leak when pooled connections from JCA are used, we now merge the transactions to make sure all the acks are caught

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-21 15:52:25 UTC (rev 3601)
@@ -21,15 +21,6 @@
  */
 package org.jboss.jms.tx;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.jboss.jms.delegate.Ack;
 import org.jboss.jms.delegate.DefaultAck;
 import org.jboss.jms.delegate.DeliveryInfo;
@@ -37,10 +28,15 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.impl.message.MessageFactory;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.*;
+
 /**
  * Holds the state of a transaction on the client side
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  */
 public class ClientTransaction
 {
@@ -67,14 +63,7 @@
    private List sessionStatesList;
 
    private boolean clientSide;
-   
-   private boolean hasPersistentAcks;
-   
-   private boolean failedOver;
-   
-   private boolean removeAcks;
 
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -101,7 +90,7 @@
 
       sessionTxState.addMessage(msg);
    }
-   
+
    public void addAck(String sessionId, DeliveryInfo info)
    {
       if (!clientSide)
@@ -111,28 +100,8 @@
       SessionTxState sessionTxState = getSessionTxState(sessionId);
 
       sessionTxState.addAck(info);
-      
-      if (info.getMessageProxy().getMessage().isReliable())
-      {
-         hasPersistentAcks = true;
-      }
-      
-      if (!info.isShouldAck())
-      {
-      	removeAcks = true;
-      }
    }
-   
-   public boolean hasPersistentAcks()
-   {
-      return hasPersistentAcks;
-   }
-   
-   public boolean isFailedOver()
-   {
-      return failedOver;
-   }
-   
+
    public void clearMessages()
    {
       if (!clientSide)
@@ -144,9 +113,9 @@
       {
          // This can be null if the tx was recreated on the client side due to recovery
 
-         for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
+         for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
          {
-            SessionTxState sessionTxState = (SessionTxState)i.next();
+            SessionTxState sessionTxState = (SessionTxState) i.next();
             sessionTxState.clearMessages();
          }
       }
@@ -170,7 +139,7 @@
       else
       {
          return sessionStatesMap == null ?
-            Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+                 Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
       }
    }
 
@@ -183,7 +152,7 @@
       {
          throw new IllegalStateException("Cannot call this method on the server side");
       }
-      
+
       // Note we have to do this in one go since there may be overlap between old and new session
       // IDs and we don't want to overwrite keys in the map.
 
@@ -191,19 +160,19 @@
 
       if (sessionStatesMap != null)
       {
-         for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
+         for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
          {
-            SessionTxState state = (SessionTxState)i.next();
-            
+            SessionTxState state = (SessionTxState) i.next();
+
             boolean handled = state.handleFailover(newServerID, oldSessionID, newSessionID);
 
             if (handled)
             {
-	            if (tmpMap == null)
-	            {
-	               tmpMap = new LinkedHashMap();
-	            }
-	            tmpMap.put(newSessionID, state);
+               if (tmpMap == null)
+               {
+                  tmpMap = new LinkedHashMap();
+               }
+               tmpMap.put(newSessionID, state);
             }
          }
       }
@@ -213,15 +182,13 @@
          // swap
          sessionStatesMap = tmpMap;
       }
-      
-      failedOver = true;
    }
 
    /**
     * May return an empty list, but never null.
     */
    public List getDeliveriesForSession(String sessionID)
-   {   	   	
+   {
       if (!clientSide)
       {
          throw new IllegalStateException("Cannot call this method on the server side");
@@ -232,24 +199,24 @@
          return Collections.EMPTY_LIST;
       }
       else
-      {         
-         SessionTxState state = (SessionTxState)sessionStatesMap.get(sessionID);
-   
+      {
+         SessionTxState state = (SessionTxState) sessionStatesMap.get(sessionID);
+
          if (state != null)
          {
             List list = state.getAcks();
-            
+
             return list;
          }
          else
          {
             return Collections.EMPTY_LIST;
          }
-      }            
+      }
    }
 
    // Streamable implementation ---------------------------------
-   
+
    public void write(DataOutputStream out) throws Exception
    {
       out.writeByte(state);
@@ -266,7 +233,7 @@
 
          while (iter.hasNext())
          {
-            SessionTxState state = (SessionTxState)iter.next();
+            SessionTxState state = (SessionTxState) iter.next();
 
             out.writeUTF(state.getSessionId());
 
@@ -278,10 +245,10 @@
 
             while (iter2.hasNext())
             {
-               JBossMessage m = (JBossMessage)iter2.next();
+               JBossMessage m = (JBossMessage) iter2.next();
 
                out.writeByte(m.getType());
-             
+
                m.write(out);
             }
 
@@ -291,16 +258,16 @@
 
             while (iter2.hasNext())
             {
-               DeliveryInfo ack = (DeliveryInfo)iter2.next();
-               
+               DeliveryInfo ack = (DeliveryInfo) iter2.next();
+
                //We don't want to send acks for things like non durable subs which will have been already acked
                if (ack.isShouldAck())
                {
-               	//We only need the delivery id written
-               	out.writeLong(ack.getMessageProxy().getDeliveryId());
+                  //We only need the delivery id written
+                  out.writeLong(ack.getMessageProxy().getDeliveryId());
                }
             }
-            
+
             //Marker for end of acks
             out.writeLong(Long.MIN_VALUE);
          }
@@ -315,7 +282,7 @@
       state = in.readByte();
 
       int numSessions = in.readInt();
-      
+
       //Read in as a list since we don't want the extra overhead of putting into a map
       //which won't be used on the server side
       sessionStatesList = new ArrayList(numSessions);
@@ -334,7 +301,7 @@
          {
             byte type = in.readByte();
 
-            JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
+            JBossMessage msg = (JBossMessage) MessageFactory.createMessage(type);
 
             msg.read(in);
 
@@ -342,10 +309,10 @@
          }
 
          long l;
-         
+
          while ((l = in.readLong()) != Long.MIN_VALUE)
          {
-         	sessionState.addAck(new DefaultAck(l));
+            sessionState.addAck(new DefaultAck(l));
          }
       }
    }
@@ -363,18 +330,71 @@
          sessionStatesMap = new LinkedHashMap();
       }
 
-      SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(sessionID);
+      SessionTxState sessionTxState = (SessionTxState) sessionStatesMap.get(sessionID);
 
       if (sessionTxState == null)
       {
          sessionTxState = new SessionTxState(sessionID);
-         
+
          sessionStatesMap.put(sessionID, sessionTxState);
       }
 
       return sessionTxState;
    }
 
+   /**
+    * merges in the state of another transaction
+    *
+    * @param toMerge the clientTransaction to merge
+    */
+   public void mergeIn(ClientTransaction toMerge)
+   {
+      state = toMerge.state;
+
+      //if there is anything to merge in merge them
+      if (toMerge.sessionStatesMap != null)
+      {
+         if(sessionStatesMap == null)
+         {
+            sessionStatesMap = new LinkedHashMap();
+         }
+         Iterator it = toMerge.sessionStatesMap.keySet().iterator();
+         while (it.hasNext())
+         {
+            Object key = it.next();
+            SessionTxState ss = (SessionTxState) toMerge.sessionStatesMap.get(key);
+            //if the sessionstate doesnt exist add it
+            if (sessionStatesMap.get(key) == null)
+            {
+               sessionStatesMap.put(key, ss);
+            }
+            //otherwise it already exist so we need to merge in the acks
+            else
+            {
+               SessionTxState orig = (SessionTxState) sessionStatesMap.get(key);
+               List acksToMerge = ss.getAcks();
+               if (acksToMerge != null)
+               {
+                  Iterator acksIt = acksToMerge.iterator();
+                  while (acksIt.hasNext())
+                  {
+                     //if the ack isnt there add it
+                     Ack ack = (Ack) acksIt.next();
+                     if (!orig.getAcks().contains(ack))
+                     {
+                        orig.getAcks().add(ack);
+                     }
+                  }
+               }
+            }
+
+         }
+         sessionStatesList = sessionStatesMap == null ?
+                 Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+      }
+
+   }
+
    // Inner Classes -------------------------------------------------
 
    public class SessionTxState
@@ -418,10 +438,10 @@
       {
          return sessionID;
       }
-      
+
       public void setAcks(List acks)
       {
-      	this.acks = acks;
+         this.acks = acks;
       }
 
       boolean handleFailover(int newServerID, String oldSessionID, String newSessionID)
@@ -432,13 +452,16 @@
             serverID = newServerID;
 
             // Remove any non persistent acks
-            for(Iterator i = acks.iterator(); i.hasNext(); )
+            for (Iterator i = acks.iterator(); i.hasNext();)
             {
-               DeliveryInfo di = (DeliveryInfo)i.next();
+               DeliveryInfo di = (DeliveryInfo) i.next();
 
                if (!di.getMessageProxy().getMessage().isReliable())
                {
-                  if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
+                  if (trace)
+                  {
+                     log.trace(this + " discarded non-persistent " + di + " on failover");
+                  }
                   i.remove();
                }
             }
@@ -446,7 +469,7 @@
          }
          else
          {
-         	return false;
+            return false;
          }
       }
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java	2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java	2008-01-21 15:52:25 UTC (rev 3601)
@@ -32,20 +32,21 @@
 
 /**
  * An XAResource implementation.
- * 
+ *
  * This defines the contract for the application server to interact with the resource manager.
- * 
+ *
  * It mainly delegates to the resource manager.
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
- * 
+ *
  * Parts based on JBoss MQ XAResource implementation by:
- * 
+ *
  * @author Hiram Chirino (Cojonudo14 at hotmail.com)
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
- * 
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ *
  * @version <tt>$Revision$</tt>
  *
  * $Id$
@@ -55,33 +56,33 @@
    // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MessagingXAResource.class);
-   
+
    // Attributes -----------------------------------------------------------------------------------
-   
+
    private boolean trace = log.isTraceEnabled();
 
    private ResourceManager rm;
-   
+
    private SessionState sessionState;
-   
+
    private ConnectionDelegate connection;
-   
+
    //For testing only
    private boolean preventJoining;
 
    // Static ---------------------------------------------------------------------------------------
-   
+
    // Constructors ---------------------------------------------------------------------------------
 
    public MessagingXAResource(ResourceManager rm, SessionState sessionState)
-   { 
+   {
       this.rm = rm;
-      
+
       this.sessionState = sessionState;
-      
+
       this.connection = (ConnectionDelegate)(sessionState.getParent()).getDelegate();
    }
-   
+
    // XAResource implementation --------------------------------------------------------------------
 
    public boolean setTransactionTimeout(int timeout) throws XAException
@@ -102,23 +103,23 @@
       {
          return false;
       }
-      
+
       if (!(xaResource instanceof MessagingXAResource))
       {
          return false;
       }
-      
+
       boolean same = ((MessagingXAResource)xaResource).rm.getServerID() == this.rm.getServerID();
-      
+
       if (trace) { log.trace("Calling isSameRM, result is " + same + " " + ((MessagingXAResource)xaResource).rm.getServerID() + " " + this.rm.getServerID()); }
-            
+
       return same;
    }
-   
+
    public void start(Xid xid, int flags) throws XAException
    {
       if (trace) { log.trace(this + " starting " + xid + ", flags: " + flags); }
-      
+
       // Recreate Xid. See JBMESSAGING-661 [JPL]
 
       if (!(xid instanceof MessagingXid))
@@ -127,21 +128,21 @@
       }
 
       boolean convertTx = false;
-      
+
       Object currentXid = sessionState.getCurrentTxId();
-      
+
       // Sanity check
       if (currentXid == null)
       {
          throw new MessagingXAException(XAException.XAER_RMFAIL, "Current xid is not set");
       }
-      
-      if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
+
+      if (sessionState.getCurrentTxId() instanceof LocalTx)
       {
          convertTx = true;
-         
+
          if (trace) { log.trace("Converting local tx into global tx branch"); }
-      }      
+      }
 
       //TODO why do we need this synchronized block?
       synchronized (this)
@@ -150,7 +151,7 @@
          {
             case TMNOFLAGS :
                if (convertTx)
-               {    
+               {
                   // If I commit/rollback the tx, then there is a short period of time between the
                   // AS (or whoever) calling commit on the tx and calling start to enrolling the
                   // session in a new tx. If the session has any listeners then in that period,
@@ -161,30 +162,37 @@
                   setCurrentTransactionId(rm.convertTx((LocalTx)sessionState.getCurrentTxId(), xid));
                }
                else
-               {                  
-                  setCurrentTransactionId(rm.startTx(xid));                 
+               {
+                  setCurrentTransactionId(rm.startTx(xid));
                }
                break;
             case TMJOIN :
-               if(!xid.equals(currentXid))
+               if(convertTx)
                {
-                  rm.removeTx(currentXid);
+                  setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
                }
-               setCurrentTransactionId(rm.joinTx(xid));
+               else
+               {
+                  setCurrentTransactionId(rm.joinTx(xid));
+               }
                break;
             case TMRESUME :
-               if(!xid.equals(currentXid))
+               if(convertTx)
                {
-                  rm.removeTx(currentXid);
+                  setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
                }
-               setCurrentTransactionId(rm.resumeTx(xid));
+               else
+               {
+                  setCurrentTransactionId(rm.resumeTx(xid));
+               }
+
                break;
             default:
                throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
          }
       }
    }
-   
+
    public void end(Xid xid, int flags) throws XAException
    {
       if (trace) { log.trace(this + " ending " + xid + ", flags: " + flags); }
@@ -198,12 +206,12 @@
 
       //TODO - why do we need this synchronized block?
       synchronized (this)
-      {         
-         unsetCurrentTransactionId(xid);    
-         
+      {
+         unsetCurrentTransactionId(xid);
+
          switch (flags)
          {
-            case TMSUSPEND :                                    
+            case TMSUSPEND :
                rm.suspendTx(xid);
                break;
             case TMFAIL :
@@ -213,11 +221,11 @@
                rm.endTx(xid, true);
                break;
             default :
-               throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);         
-         }      
+               throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
+         }
       }
    }
-   
+
    public int prepare(Xid xid) throws XAException
    {
       if (trace) { log.trace(this + " preparing " + xid); }
@@ -231,7 +239,7 @@
 
       return rm.prepare(xid, connection);
    }
-   
+
    public void commit(Xid xid, boolean onePhase) throws XAException
    {
       if (trace) { log.trace(this + " committing " + xid + (onePhase ? " (one phase)" : " (two phase)")); }
@@ -245,7 +253,7 @@
 
       rm.commit(xid, onePhase, connection);
    }
-   
+
    public void rollback(Xid xid) throws XAException
    {
       if (trace) { log.trace(this + " rolling back " + xid); }
@@ -259,7 +267,7 @@
 
       rm.rollback(xid, connection);
    }
-   
+
    public void forget(Xid xid) throws XAException
    {
       if (trace) { log.trace(this + " forgetting " + xid + " (currently an NOOP)"); }
@@ -270,19 +278,19 @@
       if (trace) { log.trace(this + " recovering, flags: " + flags); }
 
       Xid[] xids = rm.recover(flags, connection);
-      
+
       if (trace) { log.trace("Recovered txs: " + xids); }
-      
+
       return xids;
    }
-   
+
    // Public ---------------------------------------------------------------------------------------
 
    public String toString()
    {
       return "MessagingXAResource[" + sessionState.getDelegate().getID()+ "]";
    }
-   
+
    /*
     * This is used in testing to force isSameRM() to always return false
     * This allows us to test 2PC properly - since otherwise the transaction manager
@@ -295,18 +303,18 @@
    }
 
    // Package protected ----------------------------------------------------------------------------
-   
+
    // Protected ------------------------------------------------------------------------------------
-   
+
    // Private --------------------------------------------------------------------------------------
-   
+
    private void setCurrentTransactionId(Object xid)
    {
       if (trace) { log.trace(this + " setting current xid to " + xid + ",  previous " + sessionState.getCurrentTxId()); }
 
       sessionState.setCurrentTxId(xid);
    }
-   
+
    private void unsetCurrentTransactionId(Object xid)
    {
       if (xid == null)
@@ -328,10 +336,10 @@
          // a full explanation
          // So in other words - when the session is not enlisted in a global tx
          // it will always have a local xid set
-         
+
          sessionState.setCurrentTxId(rm.createLocalTx());
       }
    }
-   
+
    // Inner classes --------------------------------------------------------------------------------
 }
\ No newline at end of file

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-21 15:52:25 UTC (rev 3601)
@@ -21,19 +21,7 @@
   */
 package org.jboss.jms.tx;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.SessionDelegate;
@@ -44,19 +32,26 @@
 import org.jboss.jms.tx.ClientTransaction.SessionTxState;
 import org.jboss.logging.Logger;
 
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
 
 /**
  * The ResourceManager manages work done in both local and global (XA) transactions.
- * 
+ *
  * This is one instance of ResourceManager per JMS server. The ResourceManager instances are managed
  * by ResourceManagerFactory.
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:Konda.Madhu at uk.mizuho-sc.com">Madhu Konda</a>
  * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
  * @author <a href="mailto:Cojonudo14 at hotmail.com">Hiram Chirino</a>
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a> 
  * @version $Revision$
  *
  * $Id$
@@ -64,33 +59,35 @@
 public class ResourceManager
 {
    // Constants ------------------------------------------------------------------------------------
-   
+
    // Attributes -----------------------------------------------------------------------------------
-   
+
    private boolean trace = log.isTraceEnabled();
-   
+
    private ConcurrentHashMap transactions = new ConcurrentHashMap();
-   
-   private int serverID;   
-   
+
+   private ConcurrentHashMap convertedIds = new ConcurrentHashMap();
+
+   private int serverID;
+
    // Static ---------------------------------------------------------------------------------------
-   
+
    private static final Logger log = Logger.getLogger(ResourceManager.class);
-   
+
    // Constructors ---------------------------------------------------------------------------------
-   
+
    ResourceManager(int serverID)
-   {      
+   {
    	this.serverID = serverID;
    }
-    
+
    // Public ---------------------------------------------------------------------------------------
-   
+
    public int getServerID()
    {
      	return serverID;
    }
-   
+
    /*
     * Merge another resource manager into this one - used in failover
     */
@@ -98,7 +95,7 @@
    {
       transactions.putAll(other.transactions);
    }
-   
+
    /**
     * Remove a tx
     */
@@ -106,110 +103,110 @@
    {
       return removeTxInternal(xid);
    }
-            
+
    /**
     * Create a local tx.
     */
    public LocalTx createLocalTx()
    {
       ClientTransaction tx = new ClientTransaction();
-      
+
       LocalTx xid = getNextTxId();
-      
+
       transactions.put(xid, tx);
-      
+
       return xid;
    }
-   
+
    /**
     * Add a message to a transaction
-    * 
+    *
     * @param xid - The id of the transaction to add the message to
     * @param m The message
     */
    public void addMessage(Object xid, String sessionId, JBossMessage m)
    {
       if (trace) { log.trace("addding message " + m + " for xid " + xid); }
-      
+
       ClientTransaction tx = getTxInternal(xid);
-      
+
       tx.addMessage(sessionId, m);
    }
-   
+
    /*
     * Failover session from old session ID -> new session ID
     */
    public void handleFailover(int newServerID, String oldSessionID, String newSessionID)
-   {	
+   {
       for (Iterator i = transactions.values().iterator(); i.hasNext(); )
       {
          ClientTransaction tx = (ClientTransaction)i.next();
-         
+
          tx.handleFailover(newServerID, oldSessionID, newSessionID);
-      }                
-   }   
-   
+      }
+   }
+
    /*
     * Get all the deliveries corresponding to the session ID
     */
    public List getDeliveriesForSession(String sessionID)
    {
       List ackInfos = new ArrayList();
-           
+
       for (Iterator i = transactions.values().iterator(); i.hasNext(); )
       {
          ClientTransaction tx = (ClientTransaction)i.next();
-                 
+
          List acks = tx.getDeliveriesForSession(sessionID);
-         
+
          ackInfos.addAll(acks);
       }
-      
+
       return ackInfos;
    }
-   
-   
+
+
    /**
     * Add an acknowledgement to the transaction
-    * 
+    *
     * @param xid - The id of the transaction to add the message to
     * @param ackInfo Information describing the acknowledgement
     */
    public void addAck(Object xid, String sessionId, DeliveryInfo ackInfo) throws JMSException
    {
       if (trace) { log.trace("adding " + ackInfo + " to transaction " + xid); }
-      
+
       ClientTransaction tx = getTxInternal(xid);
-      
+
       if (tx == null)
       {
          throw new JMSException("There is no transaction with id " + xid);
       }
-      
+
       tx.addAck(sessionId, ackInfo);
    }
-         
+
    public void commitLocal(LocalTx xid, ConnectionDelegate connection) throws JMSException
    {
       if (trace) { log.trace("committing " + xid); }
-      
+
       ClientTransaction tx = this.getTxInternal(xid);
-      
+
       // Invalid xid
       if (tx == null)
       {
          throw new IllegalStateException("Cannot find transaction " + xid);
       }
-                  
+
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
-      
+
       try
       {
          connection.sendTransaction(request, false);
-         
+
          // If we get this far we can remove the transaction
-         
+
          if (this.removeTxInternal(xid) == null)
          {
             throw new IllegalStateException("Cannot find xid to remove " + xid);
@@ -224,149 +221,149 @@
       {
          // If a problem occurs during commit processing the session should be rolled back
          rollbackLocal(xid);
-         
+
          JMSException e = new MessagingTransactionRolledBackException(t.getMessage());
          e.initCause(t);
-         throw e;         
+         throw e;
       }
    }
-   
+
    public void rollbackLocal(Object xid) throws JMSException
    {
       if (trace) { log.trace("rolling back local xid " + xid); }
-      
+
       ClientTransaction ts = removeTxInternal(xid);
-      
+
       if (ts == null)
-      {      
-         throw new IllegalStateException("Cannot find transaction with xid:" + xid);         
+      {
+         throw new IllegalStateException("Cannot find transaction with xid:" + xid);
       }
-      
+
       // don't need messages for rollback
       // We don't clear the acks since we need to redeliver locally
       ts.clearMessages();
-      
+
       // for one phase rollback there is nothing to do on the server
-      
+
       redeliverMessages(ts);
    }
-   
+
    //Only used for testing
    public ClientTransaction getTx(Object xid)
    {
       return getTxInternal(xid);
    }
-   
+
    //Only used for testing
    public int size()
    {
       return transactions.size();
    }
-      
-   
+
+
    public boolean checkForAcksInSession(String sessionId)
-   {         
+   {
       Iterator iter = transactions.entrySet().iterator();
-      
+
       while (iter.hasNext())
-      {         
+      {
          Map.Entry entry = (Map.Entry)iter.next();
-      
+
          ClientTransaction tx = (ClientTransaction)entry.getValue();
-                  
+
          if (tx.getState() == ClientTransaction.TX_PREPARED)
-         {            
+         {
             List dels = tx.getDeliveriesForSession(sessionId);
-            
+
             if (!dels.isEmpty())
             {
                // There are outstanding prepared acks in this session
-               
+
                return true;
             }
          }
       }
       return false;
    }
-    
+
    // Protected ------------------------------------------------------------------------------------
-   
+
    // Package Private ------------------------------------------------------------------------------
-   
+
    Xid startTx(Xid xid) throws XAException
    {
       if (trace) { log.trace("starting " + xid); }
-      
+
       ClientTransaction state = getTxInternal(xid);
-      
+
       if (state != null)
       {
          throw new MessagingXAException(XAException.XAER_DUPID, "Transaction already exists with xid " + xid);
       }
-            
+
       transactions.put(xid, new ClientTransaction());
-      
+
       return xid;
    }
-   
+
    void endTx(Xid xid, boolean success) throws XAException
    {
       if (trace) { log.trace("ending " + xid + ", success=" + success); }
-        
+
       ClientTransaction state = getTxInternal(xid);
-      
+
       if (state == null)
-      {  
+      {
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
-      }        
-      
+      }
+
       state.setState(ClientTransaction.TX_ENDED);
    }
-   
+
    int prepare(Xid xid, ConnectionDelegate connection) throws XAException
    {
       if (trace) { log.trace("preparing " + xid); }
-      
+
       ClientTransaction state = getTxInternal(xid);
-      
+
       if (state == null)
-      { 
+      {
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
-      } 
-      
+      }
+
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.TWO_PHASE_PREPARE_REQUEST, xid, state);
-      
-      sendTransactionXA(request, connection);      
-      
+
+      sendTransactionXA(request, connection);
+
       state.setState(ClientTransaction.TX_PREPARED);
-      
+
       if (trace) { log.trace("State is now: " + state.getState()); }
-      
+
       return XAResource.XA_OK;
    }
-   
+
    void commit(Xid xid, boolean onePhase, ConnectionDelegate connection) throws XAException
    {
       if (trace) { log.trace("commiting xid " + xid + ", onePhase=" + onePhase); }
-      
+
       ClientTransaction tx = removeTxInternal(xid);
-      
+
       if (trace) { log.trace("got tx: " + tx + " state " + tx.getState()); }
-          
+
       if (onePhase)
       {
          //Invalid xid
          if (tx == null)
-         {       
+         {
             throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
          }
-         
+
          TransactionRequest request =
             new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
-         
-         request.state = tx;    
-         
+
+         request.state = tx;
+
          sendTransactionXA(request, connection);
       }
       else
@@ -374,7 +371,7 @@
          if (tx != null)
          {
             if (tx.getState() != ClientTransaction.TX_PREPARED)
-            {    
+            {
                throw new MessagingXAException(XAException.XAER_PROTO, "commit called for transaction, but it is not prepared");
             }
          }
@@ -384,173 +381,201 @@
             //may happen if we have recovered from failure and the transaction manager
             //is calling commit on the transaction as part of the recovery process.
          }
-                           
+
          TransactionRequest request =
             new TransactionRequest(TransactionRequest.TWO_PHASE_COMMIT_REQUEST, xid, null);
-         
-         request.xid = xid;      
-         
-         sendTransactionXA(request, connection);        
+
+         request.xid = xid;
+
+         sendTransactionXA(request, connection);
       }
-      
+
       if (tx != null)
       {
          tx.setState(ClientTransaction.TX_COMMITED);
       }
    }
-      
+
    void rollback(Xid xid, ConnectionDelegate connection) throws XAException
    {
       if (trace) { log.trace("rolling back xid " + xid); }
-      
+
       ClientTransaction tx = removeTxInternal(xid);
-      
+
       if (tx == null)
       {
          throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
       }
-      
+
       //It's possible we don't actually have the prepared tx here locally - this
       //may happen if we have recovered from failure and the transaction manager
       //is calling rollback on the transaction as part of the recovery process.
-      
+
       TransactionRequest request = null;
-      
+
       //don't need the messages
       if (tx != null)
       {
          tx.clearMessages();
       }
-             
+
       if ((tx == null) || tx.getState() == ClientTransaction.TX_PREPARED)
       {
          //2PC rollback
-         
+
          request = new TransactionRequest(TransactionRequest.TWO_PHASE_ROLLBACK_REQUEST, xid, tx);
-         
+
          if (trace) { log.trace("Sending rollback to server, tx:" + tx); }
-                                  
+
          sendTransactionXA(request, connection);
-      } 
+      }
       else
       {
-         //For one phase rollback there is nothing to do on the server 
-         
+         //For one phase rollback there is nothing to do on the server
+
          if (tx == null)
-         {     
+         {
             throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
          }
       }
-                  
+
       //we redeliver the messages
       //locally to their original consumers if they are still open or cancel them to the server
       //if the original consumers have closed
-      
+
       if (trace) { log.trace("Redelivering messages, tx:" + tx); }
-      
+
       try
       {
          if (tx != null)
          {
             redeliverMessages(tx);
-            
-            tx.setState(ClientTransaction.TX_ROLLEDBACK);  
+
+            tx.setState(ClientTransaction.TX_ROLLEDBACK);
          }
-         
+
       }
       catch (JMSException e)
       {
          log.error("Failed to redeliver", e);
-      }                               
+      }
    }
-  
-   
+
+
    Xid joinTx(Xid xid) throws XAException
    {
       if (trace) { log.trace("joining  " + xid); }
-      
+
       ClientTransaction state = getTxInternal(xid);
-      
+
       if (state == null)
-      {         
+      {
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
-      } 
-      
+      }
+
       return xid;
    }
-   
-   
-   
+
+
+
    Xid resumeTx(Xid xid) throws XAException
    {
       if (trace) { log.trace("resuming " + xid); }
-      
+
       ClientTransaction state = getTxInternal(xid);
-      
+
       if (state == null)
-      {       
+      {
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
       }
-      
+
       return xid;
    }
-   
+
    Xid suspendTx(Xid xid) throws XAException
    {
       if (trace) { log.trace("suspending " + xid); }
 
       ClientTransaction state = getTxInternal(xid);
-      
+
       if (state == null)
-      {       
+      {
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
       }
-      
+
       return xid;
    }
 
    Xid convertTx(LocalTx localTx, Xid xid) throws XAException
    {
       if (trace) { log.trace("converting " + localTx + " to " + xid); }
-      
+
       //Sanity check
-      
+
       ClientTransaction newTx = getTxInternal(xid);
 
       if (newTx != null)
-      {        
+      {
          throw new MessagingXAException(XAException.XAER_DUPID, "Transaction already exists:" + xid);
       }
 
       //Remove the local tx
-      
+
       ClientTransaction local = removeTxInternal(localTx);
 
       if (local == null)
-      {        
+      {
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + localTx);
       }
-      
+
       // Add the local back in with the new xid
-      
+
       transactions.put(xid, local);
-      
+
       return xid;
    }
- 
-   
+
+
+
+
+   Xid convertOnJoinTx(LocalTx localTx, Xid xid) throws XAException
+   {
+      if (trace) { log.trace("converting " + localTx + " to " + xid); }
+
+      //Sanity check
+
+      ClientTransaction newTx = getTxInternal(xid);
+
+      if (newTx == null)
+      {
+         throw new MessagingXAException(XAException.XAER_DUPID, "Transaction doesnt exist:" + xid);
+      }
+
+      //Remove the local tx
+
+      ClientTransaction local = removeTxInternal(localTx);
+
+      if (local == null)
+      {
+         throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + localTx);
+      }
+
+      newTx.mergeIn(local);
+      return xid;
+   }
+
    Xid[] recover(int flags, ConnectionDelegate conn) throws XAException
    {
       if (trace) { log.trace("calling recover with flags: " + flags); }
-      
+
       if (flags == XAResource.TMSTARTRSCAN)
       {
          try
          {
             Xid[] txs = conn.getPreparedTransactions();
-            
+
             if (trace) { log.trace("Got " + txs.length + " transactions from server"); }
-            
+
             //populate with TxState --MK
             for (int i = 0; i < txs.length;i++)
             {
@@ -558,9 +583,9 @@
                if (!transactions.containsKey(txs[i]))
                {
                   ClientTransaction tx = new ClientTransaction();
-   
+
                   tx.setState(ClientTransaction.TX_PREPARED);
-   
+
                   transactions.put(txs[i], tx);
                }
             }
@@ -577,58 +602,58 @@
          return new Xid[0];
       }
    }
-   
+
    // Private --------------------------------------------------------------------------------------
-   
+
    private ClientTransaction getTxInternal(Object xid)
    {
       if (trace) { log.trace("getting transaction for " + xid); }
-      
+
       return (ClientTransaction)transactions.get(xid);
    }
-   
+
    private ClientTransaction removeTxInternal(Object xid)
    {
       return (ClientTransaction)transactions.remove(xid);
    }
-   
+
    /*
     * Rollback has occurred so we need to redeliver any unacked messages corresponding to the acks
     * is in the transaction.
-    * 
+    *
     */
    private void redeliverMessages(ClientTransaction ts) throws JMSException
    {
       List sessionStates = ts.getSessionStates();
-      
+
       //Need to do this in reverse order
-      
+
       Collections.reverse(sessionStates);
-      
+
       for (Iterator i = sessionStates.iterator(); i.hasNext();)
       {
          SessionTxState state = (SessionTxState)i.next();
-         
+
          List acks = state.getAcks();
-         
+
          if (!acks.isEmpty())
          {
             DeliveryInfo info = (DeliveryInfo)acks.get(0);
-            
+
             MessageProxy mp = info.getMessageProxy();
-            
+
             SessionDelegate del = mp.getSessionDelegate();
-            
+
             del.redeliver(acks);
          }
       }
    }
-   
+
    private synchronized LocalTx getNextTxId()
    {
       return new LocalTx();
    }
-     
+
    private void sendTransactionXA(TransactionRequest request, ConnectionDelegate connection)
       throws XAException
    {
@@ -640,26 +665,26 @@
       {
          MessagingXAException xaEx = new MessagingXAException(XAException.XA_RBROLLBACK, "A security exception happend!", security);
          log.error(xaEx, xaEx);
-         throw xaEx; 
+         throw xaEx;
       }
       catch (Throwable t)
       {
          //Catch anything else
-         
+
          //We assume that any error is recoverable - and the recovery manager should retry again
          //either after the network connection has been repaired (if that was the problem), or
          //the server has been fixed.
-         
+
          //(In some cases it will not be possible to fix so the user will have to manually resolve the tx)
-         
+
          //Therefore we throw XA_RETRY
          //Note we DO NOT throw XAER_RMFAIL or XAER_RMERR since both if these will cause the Arjuna
          //tx mgr NOT to retry and the transaction will have to be resolve manually.
-         
+
          throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending the transaction", t);
       }
    }
-   
+
    // Inner Classes --------------------------------------------------------------------------------
-  
+
 }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java	2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java	2008-01-21 15:52:25 UTC (rev 3601)
@@ -52,6 +52,7 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
@@ -64,7 +65,7 @@
    // Static --------------------------------------------------------
 
    // Attributes ----------------------------------------------------
-  
+
    protected TransactionManager tm;
 
    protected Transaction suspendedTx;
@@ -82,15 +83,15 @@
    // TestCase overrides -------------------------------------------
 
    public void setUp() throws Exception
-   { 
+   {
       super.setUp();
-      
-      ResourceManagerFactory.instance.clear();      
 
+      ResourceManagerFactory.instance.clear();
+
       //Also need a local tx mgr if test is running remote
       if (ServerManagement.isRemote())
       {
-         sc = new ServiceContainer("transaction");         
+         sc = new ServiceContainer("transaction");
 
          //Don't drop the tables again!
          sc.start(false);
@@ -101,7 +102,7 @@
       tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
 
       assertTrue(tm instanceof TransactionManagerImple);
-     
+
       if (!ServerManagement.isRemote())
       {
          suspendedTx = tm.suspend();
@@ -109,7 +110,7 @@
    }
 
    public void tearDown() throws Exception
-   {      
+   {
       if (TxUtils.isUncommitted(tm))
       {
          //roll it back
@@ -138,13 +139,30 @@
       {
          sc.stop();
       }
-      
+
       super.tearDown();
    }
 
    // Public --------------------------------------------------------
+   /*
+   resource is not enlisted in tx
 
-   public void testMemoryLeakForLocalTXs() throws Exception
+   do some work with tx W1
+
+   enlist resource in tx
+
+   do some more work W2
+
+   delist resource from tx
+
+   prepare tx
+
+   commit tx
+
+   validate that both sets of work W1 and W2 get applied.
+    */
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsWithWork() throws Exception
    {
       XAConnection conn = null;
 
@@ -162,26 +180,156 @@
 
          ResourceManager rm = state.getResourceManager();
 
-         XASession xaSession = conn.createXASession();
+         //Create a session
+         JBossSession sess1 = (JBossSession) conn.createXASession();
+         DummyListener listener = new DummyListener();
+         sess1.setMessageListener(listener);
+         conn.start();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         tm.begin();
 
+         Transaction trans = tm.getTransaction();
+         trans.enlistResource(res1);
+         trans.delistResource(res1, XAResource.TMSUCCESS);
+         MessageProducer p = sess1.createProducer(queue1);
+         MessageConsumer cons = sess1.createConsumer(queue1);
+         conn.start();
+         //send 10 messages
+         for(int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess1.createTextMessage("delistedwork" + i);
+            p.send(message);
+         }
+         //now receive 5
+         for(int i = 0; i < 5; i++)
+         {
+            TextMessage textMessage = (TextMessage) cons.receive();
+            assertEquals("delistedwork" + i, textMessage.getText());
+         }
+         //once we enlist ensure that the 5 acks are merged ok, the first timne we do this there is nothing to merge in the global tx
+         //so all acks are just copied
+         trans.enlistResource(res1);
+         SessionState sstate = (SessionState)((DelegateSupport)sess1.getDelegate()).getState();
+         ClientTransaction clientTransaction = rm.getTx(sstate.getCurrentTxId());
+         assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+         ClientTransaction.SessionTxState sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+         assertEquals("wrong number of acks",5, sessionTxState.getAcks().size());
+
+         trans.delistResource(res1, XAResource.TMSUCCESS);
+         for(int i = 5; i < 10; i++)
+         {
+            TextMessage textMessage = (TextMessage) cons.receive();
+            assertEquals("delistedwork" + i, textMessage.getText());
+         }
+         //now reenlist and make sure that there are now 10 acks, this time around a merge will be done with the first 5 acks
+         //
+         trans.enlistResource(res1);
+
+         clientTransaction = rm.getTx(sstate.getCurrentTxId());
+         assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+         sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+         assertEquals("wrong number of acks",10, sessionTxState.getAcks().size());
+
+         tm.commit();
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+
+      }
+
+   }
+
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnJoinOnePhase() throws Exception
+   {
+      XAConnection conn = null;
+
+      Transaction tx1 = null;
+      try
+      {
+
+         conn = cf.createXAConnection();
+
+         JBossConnection jbConn = (JBossConnection)conn;
+
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
          //Create a session
          XASession sess1 = conn.createXASession();
-         XAResource res1 = sess1.getXAResource();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMJOIN);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.commit(xid, true);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
 
+      }
 
-         tm.begin();
+   }
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnJoinTwoPhase() throws Exception
+   {
+      XAConnection conn = null;
 
-         tx1 = tm.getTransaction();
+      Transaction tx1 = null;
+      try
+      {
 
-         tx1.enlistResource(res1);
-         int sizeBefore = rm.size();
-         tx1.delistResource(res1, XAResource.TMSUCCESS);
+         conn = cf.createXAConnection();
 
-         tx1.enlistResource(res1);
-         int sizeAfter = rm.size();
-         assertTrue(sizeBefore == sizeAfter);
-         tx1.commit();
+         JBossConnection jbConn = (JBossConnection)conn;
 
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
+         //Create a session
+         XASession sess1 = conn.createXASession();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMJOIN);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.prepare(xid);
+         res1.commit(xid, false);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
       }
       finally
       {
@@ -193,8 +341,103 @@
       }
 
    }
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnResumeOnePhase() throws Exception
+   {
+      XAConnection conn = null;
 
-   
+      Transaction tx1 = null;
+      try
+      {
+
+         conn = cf.createXAConnection();
+
+         JBossConnection jbConn = (JBossConnection)conn;
+
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
+         //Create a session
+         XASession sess1 = conn.createXASession();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMRESUME);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.commit(xid, true);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+
+      }
+
+   }
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnJResumeTwoPhase() throws Exception
+   {
+      XAConnection conn = null;
+
+      Transaction tx1 = null;
+      try
+      {
+
+         conn = cf.createXAConnection();
+
+         JBossConnection jbConn = (JBossConnection)conn;
+
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
+         //Create a session
+         XASession sess1 = conn.createXASession();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMRESUME);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.prepare(xid);
+         res1.commit(xid, false);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+
+      }
+
+   }
    /* If there is no global tx present the send must behave as non transacted.
     * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
     * http://jira.jboss.com/jira/browse/JBMESSAGING-410
@@ -218,7 +461,7 @@
          XAConnection xconn = xcf.createXAConnection();
 
          XASession xs = xconn.createXASession();
-         
+
          MessageProducer p = xs.createProducer(queue1);
          Message m = xs.createTextMessage("one");
 
@@ -250,9 +493,9 @@
    /*
     * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
     * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
-    * 
+    *
     * There is one exception to this:
-    * 
+    *
     * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
     * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
     * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
@@ -261,7 +504,7 @@
     * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
     * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
     * is converted to the global tx brach.
-    * 
+    *
     * We are testing the exceptional case here without a global tx here
     *
     * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
@@ -353,14 +596,14 @@
          }
       }
    }
-   
-   
+
+
    /*
     * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
     * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
-    * 
+    *
     * There is one exception to this:
-    * 
+    *
     * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
     * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
     * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
@@ -369,7 +612,7 @@
     * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
     * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
     * is converted to the global tx brach.
-    * 
+    *
     * We are testing the standard case without a global tx here
     *
     * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
@@ -414,11 +657,11 @@
          TextMessage rm = (TextMessage)c.receive(1000);
 
          assertEquals("one", rm.getText());
-         
+
          // messages should be acked
          count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
          assertEquals(0, count.intValue());
-         
+
          xconn.close();
       }
       finally
@@ -430,14 +673,14 @@
          }
       }
    }
-   
 
+
    /*
     * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
     * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
-    * 
+    *
     * There is one exception to this:
-    * 
+    *
     * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
     * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
     * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
@@ -446,7 +689,7 @@
     * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
     * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
     * is converted to the global tx brach.
-    * 
+    *
     * We are testing the case with a global tx here
     *
     * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
@@ -543,7 +786,7 @@
     *     - Receive one message over a consumer created used a XASession
     *     - Call Recover
     *     - Receive the second message
-    *     - The queue should be empty after that 
+    *     - The queue should be empty after that
     *   Verifies if messages are sent ok and ack properly when recovery is called
     *      NOTE: To accomodate TCK tests where Session/Consumers are being used without transaction enlisting
     *            we are processing those cases as nonTransactional/AutoACK, however if the session is being used
@@ -1140,9 +1383,9 @@
          ServerManagement.startServerPeer();
 
          deployAndLookupAdministeredObjects();
-         
+
          conn1.close();
-         
+
          conn1 = cf.createXAConnection();
 
          XAResource res = conn1.createXASession().getXAResource();
@@ -1166,7 +1409,7 @@
       finally
       {
          removeAllMessages(queue1.getQueueName(), true, 0);
-      	
+
          if (conn1 != null)
          {
             try




More information about the jboss-cvs-commits mailing list