[jboss-cvs] JBoss Messaging SVN: r3586 - in branches/Branch_Stable: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 17 11:32:40 EST 2008
Author: ataylor
Date: 2008-01-17 11:32:40 -0500 (Thu, 17 Jan 2008)
New Revision: 3586
Modified:
branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1221 - fix for memry leak when pooled connections from JCA are used
Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-17 16:32:40 UTC (rev 3586)
@@ -21,15 +21,6 @@
*/
package org.jboss.jms.tx;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
import org.jboss.jms.delegate.Ack;
import org.jboss.jms.delegate.DefaultAck;
import org.jboss.jms.delegate.DeliveryInfo;
@@ -37,6 +28,10 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.impl.message.MessageFactory;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.*;
+
/**
* Holds the state of a transaction on the client side
*
@@ -375,6 +370,71 @@
return sessionTxState;
}
+ /**
+ * merges in the state of another transaction
+ * @param toMerge the clientTransaction to merge
+ */
+ public void mergeIn(ClientTransaction toMerge)
+ {
+ state = toMerge.state;
+ hasPersistentAcks = toMerge.hasPersistentAcks;
+ removeAcks = toMerge.removeAcks;
+ failedOver = toMerge.failedOver;
+ //if there is anything to merge in merge them
+ if(toMerge.sessionStatesMap != null)
+ {
+ Iterator it = toMerge.sessionStatesMap.keySet().iterator();
+ while(it.hasNext())
+ {
+ Object key = it.next();
+ SessionTxState ss = (SessionTxState) toMerge.sessionStatesMap.get(key);
+ //if the sessionstate doesnt exist add it
+ if(sessionStatesMap.get(key) != null)
+ {
+ sessionStatesMap.put(key, ss);
+ }
+ //otherwise it already exist so we need to merge in the messages and acks
+ else
+ {
+ //we have to merge in the messages as well
+ SessionTxState orig = (SessionTxState) sessionStatesMap.get(key);
+ List messagesToMerge = ss.getMsgs();
+ if(messagesToMerge != null)
+ {
+ Iterator msgsIt = messagesToMerge.iterator();
+ while(msgsIt.hasNext())
+ {
+ //if the msg isnt there add it
+ JBossMessage msg = (JBossMessage) msgsIt.next();
+ if(!orig.getMsgs().contains(msg))
+ {
+ orig.getMsgs().add(msg);
+ }
+ }
+ }
+ List acksToMerge = ss.getAcks();
+ if(acksToMerge != null)
+ {
+ Iterator acksIt = acksToMerge.iterator();
+ while(acksIt.hasNext())
+ {
+ //if the ack isnt there add it
+ Ack ack = (Ack) acksIt.next();
+ if(!orig.getAcks().contains(ack))
+ {
+ orig.getAcks().add(ack);
+ }
+ }
+ }
+ }
+
+ }
+ sessionStatesList = sessionStatesMap == null ?
+ Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+ }
+
+ }
+
// Inner Classes -------------------------------------------------
public class SessionTxState
Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java 2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java 2008-01-17 16:32:40 UTC (rev 3586)
@@ -136,7 +136,7 @@
throw new MessagingXAException(XAException.XAER_RMFAIL, "Current xid is not set");
}
- if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
+ if (sessionState.getCurrentTxId() instanceof LocalTx)
{
convertTx = true;
@@ -166,18 +166,25 @@
}
break;
case TMJOIN :
- if(!xid.equals(currentXid))
+ if(convertTx)
{
- rm.removeTx(currentXid);
+ setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
}
- setCurrentTransactionId(rm.joinTx(xid));
+ else
+ {
+ setCurrentTransactionId(rm.joinTx(xid));
+ }
break;
case TMRESUME :
- if(!xid.equals(currentXid))
+ if(convertTx)
{
- rm.removeTx(currentXid);
+ setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
}
- setCurrentTransactionId(rm.resumeTx(xid));
+ else
+ {
+ setCurrentTransactionId(rm.resumeTx(xid));
+ }
+
break;
default:
throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-17 16:32:40 UTC (rev 3586)
@@ -21,19 +21,7 @@
*/
package org.jboss.jms.tx;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.SessionDelegate;
@@ -44,7 +32,13 @@
import org.jboss.jms.tx.ClientTransaction.SessionTxState;
import org.jboss.logging.Logger;
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
/**
* The ResourceManager manages work done in both local and global (XA) transactions.
@@ -70,8 +64,10 @@
private boolean trace = log.isTraceEnabled();
private ConcurrentHashMap transactions = new ConcurrentHashMap();
+
+ private ConcurrentHashMap convertedIds = new ConcurrentHashMap();
- private int serverID;
+ private int serverID;
// Static ---------------------------------------------------------------------------------------
@@ -537,8 +533,36 @@
return xid;
}
-
+
+
+
+ Xid convertOnJoinTx(LocalTx localTx, Xid xid) throws XAException
+ {
+ if (trace) { log.trace("converting " + localTx + " to " + xid); }
+
+ //Sanity check
+
+ ClientTransaction newTx = getTxInternal(xid);
+
+ if (newTx == null)
+ {
+ throw new MessagingXAException(XAException.XAER_DUPID, "Transaction doesnt exist:" + xid);
+ }
+
+ //Remove the local tx
+
+ ClientTransaction local = removeTxInternal(localTx);
+
+ if (local == null)
+ {
+ throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + localTx);
+ }
+
+ newTx.mergeIn(local);
+ return xid;
+ }
+
Xid[] recover(int flags, ConnectionDelegate conn) throws XAException
{
if (trace) { log.trace("calling recover with flags: " + flags); }
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java 2008-01-17 15:14:58 UTC (rev 3585)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java 2008-01-17 16:32:40 UTC (rev 3586)
@@ -143,8 +143,56 @@
}
// Public --------------------------------------------------------
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnJoinOnePhase() throws Exception
+ {
+ XAConnection conn = null;
- public void testMemoryLeakForLocalTXs() throws Exception
+ Transaction tx1 = null;
+ try
+ {
+
+ conn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ //Create a session
+ XASession sess1 = conn.createXASession();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMJOIN);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.commit(xid, true);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ }
+
+ }
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnJoinTwoPhase() throws Exception
{
XAConnection conn = null;
@@ -162,26 +210,122 @@
ResourceManager rm = state.getResourceManager();
- XASession xaSession = conn.createXASession();
+ //Create a session
+ XASession sess1 = conn.createXASession();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMJOIN);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.prepare(xid);
+ res1.commit(xid, false);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnResumeOnePhase() throws Exception
+ {
+ XAConnection conn = null;
+
+ Transaction tx1 = null;
+ try
+ {
+
+ conn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
//Create a session
XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMRESUME);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.commit(xid, true);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
- tm.begin();
+ }
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnJResumeTwoPhase() throws Exception
+ {
+ XAConnection conn = null;
- tx1 = tm.getTransaction();
+ Transaction tx1 = null;
+ try
+ {
- tx1.enlistResource(res1);
- int sizeBefore = rm.size();
- tx1.delistResource(res1, XAResource.TMSUCCESS);
+ conn = cf.createXAConnection();
- tx1.enlistResource(res1);
- int sizeAfter = rm.size();
- assertTrue(sizeBefore == sizeAfter);
- tx1.commit();
+ JBossConnection jbConn = (JBossConnection)conn;
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ //Create a session
+ XASession sess1 = conn.createXASession();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMRESUME);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.prepare(xid);
+ res1.commit(xid, false);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
}
finally
{
@@ -193,8 +337,6 @@
}
}
-
-
/* If there is no global tx present the send must behave as non transacted.
* See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
* http://jira.jboss.com/jira/browse/JBMESSAGING-410
More information about the jboss-cvs-commits
mailing list