[jboss-cvs] JBoss Messaging SVN: r3600 - in branches/Branch_Stable: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 21 10:51:42 EST 2008
Author: ataylor
Date: 2008-01-21 10:51:41 -0500 (Mon, 21 Jan 2008)
New Revision: 3600
Modified:
branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1221 - fix for memry leak when pooled connections from JCA are used, we now merge the transactions to make sure all the acks are caught
Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-21 15:51:41 UTC (rev 3600)
@@ -34,8 +34,9 @@
/**
* Holds the state of a transaction on the client side
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
*/
public class ClientTransaction
{
@@ -62,7 +63,7 @@
private List sessionStatesList;
private boolean clientSide;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -89,7 +90,7 @@
sessionTxState.addMessage(msg);
}
-
+
public void addAck(String sessionId, DeliveryInfo info)
{
if (!clientSide)
@@ -98,9 +99,9 @@
}
SessionTxState sessionTxState = getSessionTxState(sessionId);
- sessionTxState.addAck(info);
+ sessionTxState.addAck(info);
}
-
+
public void clearMessages()
{
if (!clientSide)
@@ -112,9 +113,9 @@
{
// This can be null if the tx was recreated on the client side due to recovery
- for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
+ for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
{
- SessionTxState sessionTxState = (SessionTxState)i.next();
+ SessionTxState sessionTxState = (SessionTxState) i.next();
sessionTxState.clearMessages();
}
}
@@ -138,7 +139,7 @@
else
{
return sessionStatesMap == null ?
- Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+ Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
}
}
@@ -151,7 +152,7 @@
{
throw new IllegalStateException("Cannot call this method on the server side");
}
-
+
// Note we have to do this in one go since there may be overlap between old and new session
// IDs and we don't want to overwrite keys in the map.
@@ -159,19 +160,19 @@
if (sessionStatesMap != null)
{
- for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
+ for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
{
- SessionTxState state = (SessionTxState)i.next();
-
+ SessionTxState state = (SessionTxState) i.next();
+
boolean handled = state.handleFailover(newServerID, oldSessionID, newSessionID);
if (handled)
{
- if (tmpMap == null)
- {
- tmpMap = new LinkedHashMap();
- }
- tmpMap.put(newSessionID, state);
+ if (tmpMap == null)
+ {
+ tmpMap = new LinkedHashMap();
+ }
+ tmpMap.put(newSessionID, state);
}
}
}
@@ -187,7 +188,7 @@
* May return an empty list, but never null.
*/
public List getDeliveriesForSession(String sessionID)
- {
+ {
if (!clientSide)
{
throw new IllegalStateException("Cannot call this method on the server side");
@@ -198,24 +199,24 @@
return Collections.EMPTY_LIST;
}
else
- {
- SessionTxState state = (SessionTxState)sessionStatesMap.get(sessionID);
-
+ {
+ SessionTxState state = (SessionTxState) sessionStatesMap.get(sessionID);
+
if (state != null)
{
List list = state.getAcks();
-
+
return list;
}
else
{
return Collections.EMPTY_LIST;
}
- }
+ }
}
// Streamable implementation ---------------------------------
-
+
public void write(DataOutputStream out) throws Exception
{
out.writeByte(state);
@@ -232,7 +233,7 @@
while (iter.hasNext())
{
- SessionTxState state = (SessionTxState)iter.next();
+ SessionTxState state = (SessionTxState) iter.next();
out.writeUTF(state.getSessionId());
@@ -244,10 +245,10 @@
while (iter2.hasNext())
{
- JBossMessage m = (JBossMessage)iter2.next();
+ JBossMessage m = (JBossMessage) iter2.next();
out.writeByte(m.getType());
-
+
m.write(out);
}
@@ -257,16 +258,16 @@
while (iter2.hasNext())
{
- DeliveryInfo ack = (DeliveryInfo)iter2.next();
-
+ DeliveryInfo ack = (DeliveryInfo) iter2.next();
+
//We don't want to send acks for things like non durable subs which will have been already acked
if (ack.isShouldAck())
{
- //We only need the delivery id written
- out.writeLong(ack.getMessageProxy().getDeliveryId());
+ //We only need the delivery id written
+ out.writeLong(ack.getMessageProxy().getDeliveryId());
}
}
-
+
//Marker for end of acks
out.writeLong(Long.MIN_VALUE);
}
@@ -281,7 +282,7 @@
state = in.readByte();
int numSessions = in.readInt();
-
+
//Read in as a list since we don't want the extra overhead of putting into a map
//which won't be used on the server side
sessionStatesList = new ArrayList(numSessions);
@@ -300,7 +301,7 @@
{
byte type = in.readByte();
- JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
+ JBossMessage msg = (JBossMessage) MessageFactory.createMessage(type);
msg.read(in);
@@ -308,10 +309,10 @@
}
long l;
-
+
while ((l = in.readLong()) != Long.MIN_VALUE)
{
- sessionState.addAck(new DefaultAck(l));
+ sessionState.addAck(new DefaultAck(l));
}
}
}
@@ -329,12 +330,12 @@
sessionStatesMap = new LinkedHashMap();
}
- SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(sessionID);
+ SessionTxState sessionTxState = (SessionTxState) sessionStatesMap.get(sessionID);
if (sessionTxState == null)
{
sessionTxState = new SessionTxState(sessionID);
-
+
sessionStatesMap.put(sessionID, sessionTxState);
}
@@ -343,6 +344,7 @@
/**
* merges in the state of another transaction
+ *
* @param toMerge the clientTransaction to merge
*/
public void mergeIn(ClientTransaction toMerge)
@@ -350,46 +352,35 @@
state = toMerge.state;
//if there is anything to merge in merge them
- if(toMerge.sessionStatesMap != null)
+ if (toMerge.sessionStatesMap != null)
{
+ if(sessionStatesMap == null)
+ {
+ sessionStatesMap = new LinkedHashMap();
+ }
Iterator it = toMerge.sessionStatesMap.keySet().iterator();
- while(it.hasNext())
+ while (it.hasNext())
{
Object key = it.next();
SessionTxState ss = (SessionTxState) toMerge.sessionStatesMap.get(key);
//if the sessionstate doesnt exist add it
- if(sessionStatesMap.get(key) != null)
+ if (sessionStatesMap.get(key) == null)
{
sessionStatesMap.put(key, ss);
}
- //otherwise it already exist so we need to merge in the messages and acks
+ //otherwise it already exist so we need to merge in the acks
else
{
- //we have to merge in the messages as well
SessionTxState orig = (SessionTxState) sessionStatesMap.get(key);
- List messagesToMerge = ss.getMsgs();
- if(messagesToMerge != null)
- {
- Iterator msgsIt = messagesToMerge.iterator();
- while(msgsIt.hasNext())
- {
- //if the msg isnt there add it
- JBossMessage msg = (JBossMessage) msgsIt.next();
- if(!orig.getMsgs().contains(msg))
- {
- orig.getMsgs().add(msg);
- }
- }
- }
List acksToMerge = ss.getAcks();
- if(acksToMerge != null)
+ if (acksToMerge != null)
{
Iterator acksIt = acksToMerge.iterator();
- while(acksIt.hasNext())
+ while (acksIt.hasNext())
{
//if the ack isnt there add it
Ack ack = (Ack) acksIt.next();
- if(!orig.getAcks().contains(ack))
+ if (!orig.getAcks().contains(ack))
{
orig.getAcks().add(ack);
}
@@ -399,7 +390,7 @@
}
sessionStatesList = sessionStatesMap == null ?
- Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+ Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
}
}
@@ -447,10 +438,10 @@
{
return sessionID;
}
-
+
public void setAcks(List acks)
{
- this.acks = acks;
+ this.acks = acks;
}
boolean handleFailover(int newServerID, String oldSessionID, String newSessionID)
@@ -461,13 +452,16 @@
serverID = newServerID;
// Remove any non persistent acks
- for(Iterator i = acks.iterator(); i.hasNext(); )
+ for (Iterator i = acks.iterator(); i.hasNext();)
{
- DeliveryInfo di = (DeliveryInfo)i.next();
+ DeliveryInfo di = (DeliveryInfo) i.next();
if (!di.getMessageProxy().getMessage().isReliable())
{
- if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
+ if (trace)
+ {
+ log.trace(this + " discarded non-persistent " + di + " on failover");
+ }
i.remove();
}
}
@@ -475,7 +469,7 @@
}
else
{
- return false;
+ return false;
}
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java 2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/MessagingXAResource.java 2008-01-21 15:51:41 UTC (rev 3600)
@@ -45,6 +45,7 @@
*
* @author Hiram Chirino (Cojonudo14 at hotmail.com)
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
*
* @version <tt>$Revision$</tt>
*
Modified: branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-21 15:51:41 UTC (rev 3600)
@@ -51,6 +51,7 @@
* @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
* @author <a href="mailto:Cojonudo14 at hotmail.com">Hiram Chirino</a>
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
* @version $Revision$
*
* $Id$
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java 2008-01-21 11:28:38 UTC (rev 3599)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/XATest.java 2008-01-21 15:51:41 UTC (rev 3600)
@@ -52,6 +52,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
@@ -143,7 +144,107 @@
}
// Public --------------------------------------------------------
+ /*
+ resource is not enlisted in tx
+
+ do some work with tx W1
+
+ enlist resource in tx
+
+ do some more work W2
+
+ delist resource from tx
+
+ prepare tx
+
+ commit tx
+
+ validate that both sets of work W1 and W2 get applied.
+ */
//http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsWithWork() throws Exception
+ {
+ XAConnection conn = null;
+
+ Transaction tx1 = null;
+ try
+ {
+
+ conn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ //Create a session
+ JBossSession sess1 = (JBossSession) conn.createXASession();
+ DummyListener listener = new DummyListener();
+ sess1.setMessageListener(listener);
+ conn.start();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ tm.begin();
+
+ Transaction trans = tm.getTransaction();
+ trans.enlistResource(res1);
+ trans.delistResource(res1, XAResource.TMSUCCESS);
+ MessageProducer p = sess1.createProducer(queue1);
+ MessageConsumer cons = sess1.createConsumer(queue1);
+ conn.start();
+ //send 10 messages
+ for(int i = 0; i < 10; i++)
+ {
+ TextMessage message = sess1.createTextMessage("delistedwork" + i);
+ p.send(message);
+ }
+ //now receive 5
+ for(int i = 0; i < 5; i++)
+ {
+ TextMessage textMessage = (TextMessage) cons.receive();
+ assertEquals("delistedwork" + i, textMessage.getText());
+ }
+ //once we enlist ensure that the 5 acks are merged ok, the first timne we do this there is nothing to merge in the global tx
+ //so all acks are just copied
+ trans.enlistResource(res1);
+ SessionState sstate = (SessionState)((DelegateSupport)sess1.getDelegate()).getState();
+ ClientTransaction clientTransaction = rm.getTx(sstate.getCurrentTxId());
+ assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+ ClientTransaction.SessionTxState sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+ assertEquals("wrong number of acks",5, sessionTxState.getAcks().size());
+
+ trans.delistResource(res1, XAResource.TMSUCCESS);
+ for(int i = 5; i < 10; i++)
+ {
+ TextMessage textMessage = (TextMessage) cons.receive();
+ assertEquals("delistedwork" + i, textMessage.getText());
+ }
+ //now reenlist and make sure that there are now 10 acks, this time around a merge will be done with the first 5 acks
+ //
+ trans.enlistResource(res1);
+
+ clientTransaction = rm.getTx(sstate.getCurrentTxId());
+ assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+ sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+ assertEquals("wrong number of acks",10, sessionTxState.getAcks().size());
+
+ tm.commit();
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ }
+
+ }
+
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
public void testMemoryLeakForLocalTXsOnJoinOnePhase() throws Exception
{
XAConnection conn = null;
More information about the jboss-cvs-commits
mailing list