[jboss-cvs] JBoss Messaging SVN: r3586 - in branches/Branch_Stable: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 17 11:32:40 EST 2008


Author: ataylor
Date: 2008-01-17 11:32:40 -0500 (Thu, 17 Jan 2008)
New Revision: 3586

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
   branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
   branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1221 - fix for memry leak when pooled connections from JCA are used

Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-17 16:32:40 UTC (rev 3586)
@@ -21,15 +21,6 @@
  */
 package org.jboss.jms.tx;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.jboss.jms.delegate.Ack;
 import org.jboss.jms.delegate.DefaultAck;
 import org.jboss.jms.delegate.DeliveryInfo;
@@ -37,6 +28,10 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.impl.message.MessageFactory;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.*;
+
 /**
  * Holds the state of a transaction on the client side
  * 
@@ -375,6 +370,71 @@
       return sessionTxState;
    }
 
+   /**
+    * merges in the state of another transaction
+    * @param toMerge the clientTransaction to merge
+    */
+   public void mergeIn(ClientTransaction toMerge)
+   {
+      state = toMerge.state;
+      hasPersistentAcks = toMerge.hasPersistentAcks;
+      removeAcks = toMerge.removeAcks;
+      failedOver = toMerge.failedOver;
+      //if there is anything to merge in merge them
+      if(toMerge.sessionStatesMap != null)
+      {
+         Iterator it = toMerge.sessionStatesMap.keySet().iterator();
+         while(it.hasNext())
+         {
+            Object key = it.next();
+            SessionTxState ss = (SessionTxState) toMerge.sessionStatesMap.get(key);
+            //if the sessionstate doesnt exist add it
+            if(sessionStatesMap.get(key) != null)
+            {
+               sessionStatesMap.put(key, ss);
+            }
+            //otherwise it already exist so we need to merge in the messages and acks
+            else
+            {
+               //we have to merge in the messages as well
+               SessionTxState orig = (SessionTxState) sessionStatesMap.get(key);
+               List messagesToMerge = ss.getMsgs();
+               if(messagesToMerge != null)
+               {
+                  Iterator msgsIt = messagesToMerge.iterator();
+                  while(msgsIt.hasNext())
+                  {
+                     //if the msg isnt there add it
+                     JBossMessage msg = (JBossMessage) msgsIt.next();
+                     if(!orig.getMsgs().contains(msg))
+                     {
+                        orig.getMsgs().add(msg);
+                     }
+                  }
+               }
+               List acksToMerge = ss.getAcks();
+               if(acksToMerge != null)
+               {
+                  Iterator acksIt = acksToMerge.iterator();
+                  while(acksIt.hasNext())
+                  {
+                     //if the ack isnt there add it
+                     Ack ack = (Ack) acksIt.next();
+                     if(!orig.getAcks().contains(ack))
+                     {
+                        orig.getAcks().add(ack);
+                     }
+                  }
+               }
+            }
+
+         }
+         sessionStatesList = sessionStatesMap == null ?
+            Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+      }
+
+   }
+
    // Inner Classes -------------------------------------------------
 
    public class SessionTxState

Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java	2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java	2008-01-17 16:32:40 UTC (rev 3586)
@@ -136,7 +136,7 @@
          throw new MessagingXAException(XAException.XAER_RMFAIL, "Current xid is not set");
       }
       
-      if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
+      if (sessionState.getCurrentTxId() instanceof LocalTx)
       {
          convertTx = true;
          
@@ -166,18 +166,25 @@
                }
                break;
             case TMJOIN :
-               if(!xid.equals(currentXid))
+               if(convertTx)
                {
-                  rm.removeTx(currentXid);  
+                  setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
                }
-               setCurrentTransactionId(rm.joinTx(xid));
+               else
+               {
+                  setCurrentTransactionId(rm.joinTx(xid));
+               }
                break;
             case TMRESUME :
-               if(!xid.equals(currentXid))
+               if(convertTx)
                {
-                  rm.removeTx(currentXid);
+                  setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
                }
-               setCurrentTransactionId(rm.resumeTx(xid));
+               else
+               {
+                  setCurrentTransactionId(rm.resumeTx(xid));   
+               }
+
                break;
             default:
                throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);

Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-17 16:32:40 UTC (rev 3586)
@@ -21,19 +21,7 @@
   */
 package org.jboss.jms.tx;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.SessionDelegate;
@@ -44,7 +32,13 @@
 import org.jboss.jms.tx.ClientTransaction.SessionTxState;
 import org.jboss.logging.Logger;
 
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
 
 /**
  * The ResourceManager manages work done in both local and global (XA) transactions.
@@ -70,8 +64,10 @@
    private boolean trace = log.isTraceEnabled();
    
    private ConcurrentHashMap transactions = new ConcurrentHashMap();
+
+   private ConcurrentHashMap convertedIds = new ConcurrentHashMap();
    
-   private int serverID;   
+   private int serverID;
    
    // Static ---------------------------------------------------------------------------------------
    
@@ -537,8 +533,36 @@
       
       return xid;
    }
- 
+
    
+   
+
+   Xid convertOnJoinTx(LocalTx localTx, Xid xid) throws XAException
+   {
+      if (trace) { log.trace("converting " + localTx + " to " + xid); }
+
+      //Sanity check
+
+      ClientTransaction newTx = getTxInternal(xid);
+
+      if (newTx == null)
+      {
+         throw new MessagingXAException(XAException.XAER_DUPID, "Transaction doesnt exist:" + xid);
+      }
+
+      //Remove the local tx
+
+      ClientTransaction local = removeTxInternal(localTx);
+
+      if (local == null)
+      {
+         throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + localTx);
+      }
+
+      newTx.mergeIn(local);
+      return xid;
+   }
+   
    Xid[] recover(int flags, ConnectionDelegate conn) throws XAException
    {
       if (trace) { log.trace("calling recover with flags: " + flags); }

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java	2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java	2008-01-17 16:32:40 UTC (rev 3586)
@@ -143,8 +143,56 @@
    }
 
    // Public --------------------------------------------------------
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnJoinOnePhase() throws Exception
+   {
+      XAConnection conn = null;
 
-   public void testMemoryLeakForLocalTXs() throws Exception
+      Transaction tx1 = null;
+      try
+      {
+
+         conn = cf.createXAConnection();
+
+         JBossConnection jbConn = (JBossConnection)conn;
+
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
+         //Create a session
+         XASession sess1 = conn.createXASession();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMJOIN);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.commit(xid, true);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+
+      }
+
+   }
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnJoinTwoPhase() throws Exception
    {
       XAConnection conn = null;
 
@@ -162,26 +210,122 @@
 
          ResourceManager rm = state.getResourceManager();
 
-         XASession xaSession = conn.createXASession();
+         //Create a session
+         XASession sess1 = conn.createXASession();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMJOIN);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.prepare(xid);
+         res1.commit(xid, false);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
 
+      }
+
+   }
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnResumeOnePhase() throws Exception
+   {
+      XAConnection conn = null;
+
+      Transaction tx1 = null;
+      try
+      {
+
+         conn = cf.createXAConnection();
+
+         JBossConnection jbConn = (JBossConnection)conn;
+
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
          //Create a session
          XASession sess1 = conn.createXASession();
-         XAResource res1 = sess1.getXAResource();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMRESUME);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.commit(xid, true);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
 
+      }
 
-         tm.begin();
+   }
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsOnJResumeTwoPhase() throws Exception
+   {
+      XAConnection conn = null;
 
-         tx1 = tm.getTransaction();
+      Transaction tx1 = null;
+      try
+      {
 
-         tx1.enlistResource(res1);
-         int sizeBefore = rm.size();
-         tx1.delistResource(res1, XAResource.TMSUCCESS);
+         conn = cf.createXAConnection();
 
-         tx1.enlistResource(res1);
-         int sizeAfter = rm.size();
-         assertTrue(sizeBefore == sizeAfter);
-         tx1.commit();
+         JBossConnection jbConn = (JBossConnection)conn;
 
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
+         //Create a session
+         XASession sess1 = conn.createXASession();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+         byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+         int rmSizeBeforeStart = rm.size();
+         Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+         res1.start(xid, XAResource.TMNOFLAGS);
+         res1.end(xid, XAResource.TMSUCCESS);
+         int rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.start(xid, XAResource.TMRESUME);
+         res1.end(xid, XAResource.TMSUCCESS);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+         res1.prepare(xid);
+         res1.commit(xid, false);
+         rmAfter = rm.size();
+         assertTrue(rmSizeBeforeStart == rmAfter);
       }
       finally
       {
@@ -193,8 +337,6 @@
       }
 
    }
-
-   
    /* If there is no global tx present the send must behave as non transacted.
     * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
     * http://jira.jboss.com/jira/browse/JBMESSAGING-410




More information about the jboss-cvs-commits mailing list