[jboss-cvs] JBoss Messaging SVN: r3600 - in branches/Branch_Stable: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 21 10:51:42 EST 2008


Author: ataylor
Date: 2008-01-21 10:51:41 -0500 (Mon, 21 Jan 2008)
New Revision: 3600

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
   branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
   branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1221 - fix for memry leak when pooled connections from JCA are used, we now merge the transactions to make sure all the acks are caught

Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java	2008-01-21 15:51:41 UTC (rev 3600)
@@ -34,8 +34,9 @@
 
 /**
  * Holds the state of a transaction on the client side
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  */
 public class ClientTransaction
 {
@@ -62,7 +63,7 @@
    private List sessionStatesList;
 
    private boolean clientSide;
-    
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -89,7 +90,7 @@
 
       sessionTxState.addMessage(msg);
    }
-   
+
    public void addAck(String sessionId, DeliveryInfo info)
    {
       if (!clientSide)
@@ -98,9 +99,9 @@
       }
       SessionTxState sessionTxState = getSessionTxState(sessionId);
 
-      sessionTxState.addAck(info);      
+      sessionTxState.addAck(info);
    }
-   
+
    public void clearMessages()
    {
       if (!clientSide)
@@ -112,9 +113,9 @@
       {
          // This can be null if the tx was recreated on the client side due to recovery
 
-         for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
+         for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
          {
-            SessionTxState sessionTxState = (SessionTxState)i.next();
+            SessionTxState sessionTxState = (SessionTxState) i.next();
             sessionTxState.clearMessages();
          }
       }
@@ -138,7 +139,7 @@
       else
       {
          return sessionStatesMap == null ?
-            Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+                 Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
       }
    }
 
@@ -151,7 +152,7 @@
       {
          throw new IllegalStateException("Cannot call this method on the server side");
       }
-      
+
       // Note we have to do this in one go since there may be overlap between old and new session
       // IDs and we don't want to overwrite keys in the map.
 
@@ -159,19 +160,19 @@
 
       if (sessionStatesMap != null)
       {
-         for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
+         for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
          {
-            SessionTxState state = (SessionTxState)i.next();
-            
+            SessionTxState state = (SessionTxState) i.next();
+
             boolean handled = state.handleFailover(newServerID, oldSessionID, newSessionID);
 
             if (handled)
             {
-	            if (tmpMap == null)
-	            {
-	               tmpMap = new LinkedHashMap();
-	            }
-	            tmpMap.put(newSessionID, state);
+               if (tmpMap == null)
+               {
+                  tmpMap = new LinkedHashMap();
+               }
+               tmpMap.put(newSessionID, state);
             }
          }
       }
@@ -187,7 +188,7 @@
     * May return an empty list, but never null.
     */
    public List getDeliveriesForSession(String sessionID)
-   {   	   	
+   {
       if (!clientSide)
       {
          throw new IllegalStateException("Cannot call this method on the server side");
@@ -198,24 +199,24 @@
          return Collections.EMPTY_LIST;
       }
       else
-      {         
-         SessionTxState state = (SessionTxState)sessionStatesMap.get(sessionID);
-   
+      {
+         SessionTxState state = (SessionTxState) sessionStatesMap.get(sessionID);
+
          if (state != null)
          {
             List list = state.getAcks();
-            
+
             return list;
          }
          else
          {
             return Collections.EMPTY_LIST;
          }
-      }            
+      }
    }
 
    // Streamable implementation ---------------------------------
-   
+
    public void write(DataOutputStream out) throws Exception
    {
       out.writeByte(state);
@@ -232,7 +233,7 @@
 
          while (iter.hasNext())
          {
-            SessionTxState state = (SessionTxState)iter.next();
+            SessionTxState state = (SessionTxState) iter.next();
 
             out.writeUTF(state.getSessionId());
 
@@ -244,10 +245,10 @@
 
             while (iter2.hasNext())
             {
-               JBossMessage m = (JBossMessage)iter2.next();
+               JBossMessage m = (JBossMessage) iter2.next();
 
                out.writeByte(m.getType());
-             
+
                m.write(out);
             }
 
@@ -257,16 +258,16 @@
 
             while (iter2.hasNext())
             {
-               DeliveryInfo ack = (DeliveryInfo)iter2.next();
-               
+               DeliveryInfo ack = (DeliveryInfo) iter2.next();
+
                //We don't want to send acks for things like non durable subs which will have been already acked
                if (ack.isShouldAck())
                {
-               	//We only need the delivery id written
-               	out.writeLong(ack.getMessageProxy().getDeliveryId());
+                  //We only need the delivery id written
+                  out.writeLong(ack.getMessageProxy().getDeliveryId());
                }
             }
-            
+
             //Marker for end of acks
             out.writeLong(Long.MIN_VALUE);
          }
@@ -281,7 +282,7 @@
       state = in.readByte();
 
       int numSessions = in.readInt();
-      
+
       //Read in as a list since we don't want the extra overhead of putting into a map
       //which won't be used on the server side
       sessionStatesList = new ArrayList(numSessions);
@@ -300,7 +301,7 @@
          {
             byte type = in.readByte();
 
-            JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
+            JBossMessage msg = (JBossMessage) MessageFactory.createMessage(type);
 
             msg.read(in);
 
@@ -308,10 +309,10 @@
          }
 
          long l;
-         
+
          while ((l = in.readLong()) != Long.MIN_VALUE)
          {
-         	sessionState.addAck(new DefaultAck(l));
+            sessionState.addAck(new DefaultAck(l));
          }
       }
    }
@@ -329,12 +330,12 @@
          sessionStatesMap = new LinkedHashMap();
       }
 
-      SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(sessionID);
+      SessionTxState sessionTxState = (SessionTxState) sessionStatesMap.get(sessionID);
 
       if (sessionTxState == null)
       {
          sessionTxState = new SessionTxState(sessionID);
-         
+
          sessionStatesMap.put(sessionID, sessionTxState);
       }
 
@@ -343,6 +344,7 @@
 
    /**
     * merges in the state of another transaction
+    *
     * @param toMerge the clientTransaction to merge
     */
    public void mergeIn(ClientTransaction toMerge)
@@ -350,46 +352,35 @@
       state = toMerge.state;
 
       //if there is anything to merge in merge them
-      if(toMerge.sessionStatesMap != null)
+      if (toMerge.sessionStatesMap != null)
       {
+         if(sessionStatesMap == null)
+         {
+            sessionStatesMap = new LinkedHashMap();
+         }
          Iterator it = toMerge.sessionStatesMap.keySet().iterator();
-         while(it.hasNext())
+         while (it.hasNext())
          {
             Object key = it.next();
             SessionTxState ss = (SessionTxState) toMerge.sessionStatesMap.get(key);
             //if the sessionstate doesnt exist add it
-            if(sessionStatesMap.get(key) != null)
+            if (sessionStatesMap.get(key) == null)
             {
                sessionStatesMap.put(key, ss);
             }
-            //otherwise it already exist so we need to merge in the messages and acks
+            //otherwise it already exist so we need to merge in the acks
             else
             {
-               //we have to merge in the messages as well
                SessionTxState orig = (SessionTxState) sessionStatesMap.get(key);
-               List messagesToMerge = ss.getMsgs();
-               if(messagesToMerge != null)
-               {
-                  Iterator msgsIt = messagesToMerge.iterator();
-                  while(msgsIt.hasNext())
-                  {
-                     //if the msg isnt there add it
-                     JBossMessage msg = (JBossMessage) msgsIt.next();
-                     if(!orig.getMsgs().contains(msg))
-                     {
-                        orig.getMsgs().add(msg);
-                     }
-                  }
-               }
                List acksToMerge = ss.getAcks();
-               if(acksToMerge != null)
+               if (acksToMerge != null)
                {
                   Iterator acksIt = acksToMerge.iterator();
-                  while(acksIt.hasNext())
+                  while (acksIt.hasNext())
                   {
                      //if the ack isnt there add it
                      Ack ack = (Ack) acksIt.next();
-                     if(!orig.getAcks().contains(ack))
+                     if (!orig.getAcks().contains(ack))
                      {
                         orig.getAcks().add(ack);
                      }
@@ -399,7 +390,7 @@
 
          }
          sessionStatesList = sessionStatesMap == null ?
-            Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+                 Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
       }
 
    }
@@ -447,10 +438,10 @@
       {
          return sessionID;
       }
-      
+
       public void setAcks(List acks)
       {
-      	this.acks = acks;
+         this.acks = acks;
       }
 
       boolean handleFailover(int newServerID, String oldSessionID, String newSessionID)
@@ -461,13 +452,16 @@
             serverID = newServerID;
 
             // Remove any non persistent acks
-            for(Iterator i = acks.iterator(); i.hasNext(); )
+            for (Iterator i = acks.iterator(); i.hasNext();)
             {
-               DeliveryInfo di = (DeliveryInfo)i.next();
+               DeliveryInfo di = (DeliveryInfo) i.next();
 
                if (!di.getMessageProxy().getMessage().isReliable())
                {
-                  if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
+                  if (trace)
+                  {
+                     log.trace(this + " discarded non-persistent " + di + " on failover");
+                  }
                   i.remove();
                }
             }
@@ -475,7 +469,7 @@
          }
          else
          {
-         	return false;
+            return false;
          }
       }
 

Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java	2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java	2008-01-21 15:51:41 UTC (rev 3600)
@@ -45,6 +45,7 @@
  * 
  * @author Hiram Chirino (Cojonudo14 at hotmail.com)
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * 
  * @version <tt>$Revision$</tt>
  *

Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java	2008-01-21 15:51:41 UTC (rev 3600)
@@ -51,6 +51,7 @@
  * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
  * @author <a href="mailto:Cojonudo14 at hotmail.com">Hiram Chirino</a>
  * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * @version $Revision$
  *
  * $Id$

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java	2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java	2008-01-21 15:51:41 UTC (rev 3600)
@@ -52,6 +52,7 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
@@ -143,7 +144,107 @@
    }
 
    // Public --------------------------------------------------------
+   /*
+   resource is not enlisted in tx
+
+   do some work with tx W1
+
+   enlist resource in tx
+
+   do some more work W2
+
+   delist resource from tx
+
+   prepare tx
+
+   commit tx
+
+   validate that both sets of work W1 and W2 get applied.
+    */
    //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+   public void testMemoryLeakForLocalTXsWithWork() throws Exception
+   {
+      XAConnection conn = null;
+
+      Transaction tx1 = null;
+      try
+      {
+
+         conn = cf.createXAConnection();
+
+         JBossConnection jbConn = (JBossConnection)conn;
+
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+         ConnectionState state = (ConnectionState)del.getState();
+
+         ResourceManager rm = state.getResourceManager();
+
+         //Create a session
+         JBossSession sess1 = (JBossSession) conn.createXASession();
+         DummyListener listener = new DummyListener();
+         sess1.setMessageListener(listener);
+         conn.start();
+         MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+         tm.begin();
+
+         Transaction trans = tm.getTransaction();
+         trans.enlistResource(res1);
+         trans.delistResource(res1, XAResource.TMSUCCESS);
+         MessageProducer p = sess1.createProducer(queue1);
+         MessageConsumer cons = sess1.createConsumer(queue1);
+         conn.start();
+         //send 10 messages
+         for(int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess1.createTextMessage("delistedwork" + i);
+            p.send(message);
+         }
+         //now receive 5
+         for(int i = 0; i < 5; i++)
+         {
+            TextMessage textMessage = (TextMessage) cons.receive();
+            assertEquals("delistedwork" + i, textMessage.getText());
+         }
+         //once we enlist ensure that the 5 acks are merged ok, the first timne we do this there is nothing to merge in the global tx
+         //so all acks are just copied
+         trans.enlistResource(res1);
+         SessionState sstate = (SessionState)((DelegateSupport)sess1.getDelegate()).getState();
+         ClientTransaction clientTransaction = rm.getTx(sstate.getCurrentTxId());
+         assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+         ClientTransaction.SessionTxState sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+         assertEquals("wrong number of acks",5, sessionTxState.getAcks().size());
+
+         trans.delistResource(res1, XAResource.TMSUCCESS);
+         for(int i = 5; i < 10; i++)
+         {
+            TextMessage textMessage = (TextMessage) cons.receive();
+            assertEquals("delistedwork" + i, textMessage.getText());
+         }
+         //now reenlist and make sure that there are now 10 acks, this time around a merge will be done with the first 5 acks
+         //
+         trans.enlistResource(res1);
+
+         clientTransaction = rm.getTx(sstate.getCurrentTxId());
+         assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+         sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+         assertEquals("wrong number of acks",10, sessionTxState.getAcks().size());
+
+         tm.commit();
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+
+      }
+
+   }
+
+   //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
    public void testMemoryLeakForLocalTXsOnJoinOnePhase() throws Exception
    {
       XAConnection conn = null;




More information about the jboss-cvs-commits mailing list