[jboss-cvs] JBoss Messaging SVN: r3601 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 21 10:52:25 EST 2008
Author: ataylor
Date: 2008-01-21 10:52:25 -0500 (Mon, 21 Jan 2008)
New Revision: 3601
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1221 - fix for memry leak when pooled connections from JCA are used, we now merge the transactions to make sure all the acks are caught
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-21 15:52:25 UTC (rev 3601)
@@ -21,15 +21,6 @@
*/
package org.jboss.jms.tx;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
import org.jboss.jms.delegate.Ack;
import org.jboss.jms.delegate.DefaultAck;
import org.jboss.jms.delegate.DeliveryInfo;
@@ -37,10 +28,15 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.impl.message.MessageFactory;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.*;
+
/**
* Holds the state of a transaction on the client side
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox </a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
*/
public class ClientTransaction
{
@@ -67,14 +63,7 @@
private List sessionStatesList;
private boolean clientSide;
-
- private boolean hasPersistentAcks;
-
- private boolean failedOver;
-
- private boolean removeAcks;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -101,7 +90,7 @@
sessionTxState.addMessage(msg);
}
-
+
public void addAck(String sessionId, DeliveryInfo info)
{
if (!clientSide)
@@ -111,28 +100,8 @@
SessionTxState sessionTxState = getSessionTxState(sessionId);
sessionTxState.addAck(info);
-
- if (info.getMessageProxy().getMessage().isReliable())
- {
- hasPersistentAcks = true;
- }
-
- if (!info.isShouldAck())
- {
- removeAcks = true;
- }
}
-
- public boolean hasPersistentAcks()
- {
- return hasPersistentAcks;
- }
-
- public boolean isFailedOver()
- {
- return failedOver;
- }
-
+
public void clearMessages()
{
if (!clientSide)
@@ -144,9 +113,9 @@
{
// This can be null if the tx was recreated on the client side due to recovery
- for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext(); )
+ for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
{
- SessionTxState sessionTxState = (SessionTxState)i.next();
+ SessionTxState sessionTxState = (SessionTxState) i.next();
sessionTxState.clearMessages();
}
}
@@ -170,7 +139,7 @@
else
{
return sessionStatesMap == null ?
- Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+ Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
}
}
@@ -183,7 +152,7 @@
{
throw new IllegalStateException("Cannot call this method on the server side");
}
-
+
// Note we have to do this in one go since there may be overlap between old and new session
// IDs and we don't want to overwrite keys in the map.
@@ -191,19 +160,19 @@
if (sessionStatesMap != null)
{
- for(Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
+ for (Iterator i = sessionStatesMap.values().iterator(); i.hasNext();)
{
- SessionTxState state = (SessionTxState)i.next();
-
+ SessionTxState state = (SessionTxState) i.next();
+
boolean handled = state.handleFailover(newServerID, oldSessionID, newSessionID);
if (handled)
{
- if (tmpMap == null)
- {
- tmpMap = new LinkedHashMap();
- }
- tmpMap.put(newSessionID, state);
+ if (tmpMap == null)
+ {
+ tmpMap = new LinkedHashMap();
+ }
+ tmpMap.put(newSessionID, state);
}
}
}
@@ -213,15 +182,13 @@
// swap
sessionStatesMap = tmpMap;
}
-
- failedOver = true;
}
/**
* May return an empty list, but never null.
*/
public List getDeliveriesForSession(String sessionID)
- {
+ {
if (!clientSide)
{
throw new IllegalStateException("Cannot call this method on the server side");
@@ -232,24 +199,24 @@
return Collections.EMPTY_LIST;
}
else
- {
- SessionTxState state = (SessionTxState)sessionStatesMap.get(sessionID);
-
+ {
+ SessionTxState state = (SessionTxState) sessionStatesMap.get(sessionID);
+
if (state != null)
{
List list = state.getAcks();
-
+
return list;
}
else
{
return Collections.EMPTY_LIST;
}
- }
+ }
}
// Streamable implementation ---------------------------------
-
+
public void write(DataOutputStream out) throws Exception
{
out.writeByte(state);
@@ -266,7 +233,7 @@
while (iter.hasNext())
{
- SessionTxState state = (SessionTxState)iter.next();
+ SessionTxState state = (SessionTxState) iter.next();
out.writeUTF(state.getSessionId());
@@ -278,10 +245,10 @@
while (iter2.hasNext())
{
- JBossMessage m = (JBossMessage)iter2.next();
+ JBossMessage m = (JBossMessage) iter2.next();
out.writeByte(m.getType());
-
+
m.write(out);
}
@@ -291,16 +258,16 @@
while (iter2.hasNext())
{
- DeliveryInfo ack = (DeliveryInfo)iter2.next();
-
+ DeliveryInfo ack = (DeliveryInfo) iter2.next();
+
//We don't want to send acks for things like non durable subs which will have been already acked
if (ack.isShouldAck())
{
- //We only need the delivery id written
- out.writeLong(ack.getMessageProxy().getDeliveryId());
+ //We only need the delivery id written
+ out.writeLong(ack.getMessageProxy().getDeliveryId());
}
}
-
+
//Marker for end of acks
out.writeLong(Long.MIN_VALUE);
}
@@ -315,7 +282,7 @@
state = in.readByte();
int numSessions = in.readInt();
-
+
//Read in as a list since we don't want the extra overhead of putting into a map
//which won't be used on the server side
sessionStatesList = new ArrayList(numSessions);
@@ -334,7 +301,7 @@
{
byte type = in.readByte();
- JBossMessage msg = (JBossMessage)MessageFactory.createMessage(type);
+ JBossMessage msg = (JBossMessage) MessageFactory.createMessage(type);
msg.read(in);
@@ -342,10 +309,10 @@
}
long l;
-
+
while ((l = in.readLong()) != Long.MIN_VALUE)
{
- sessionState.addAck(new DefaultAck(l));
+ sessionState.addAck(new DefaultAck(l));
}
}
}
@@ -363,18 +330,71 @@
sessionStatesMap = new LinkedHashMap();
}
- SessionTxState sessionTxState = (SessionTxState)sessionStatesMap.get(sessionID);
+ SessionTxState sessionTxState = (SessionTxState) sessionStatesMap.get(sessionID);
if (sessionTxState == null)
{
sessionTxState = new SessionTxState(sessionID);
-
+
sessionStatesMap.put(sessionID, sessionTxState);
}
return sessionTxState;
}
+ /**
+ * merges in the state of another transaction
+ *
+ * @param toMerge the clientTransaction to merge
+ */
+ public void mergeIn(ClientTransaction toMerge)
+ {
+ state = toMerge.state;
+
+ //if there is anything to merge in merge them
+ if (toMerge.sessionStatesMap != null)
+ {
+ if(sessionStatesMap == null)
+ {
+ sessionStatesMap = new LinkedHashMap();
+ }
+ Iterator it = toMerge.sessionStatesMap.keySet().iterator();
+ while (it.hasNext())
+ {
+ Object key = it.next();
+ SessionTxState ss = (SessionTxState) toMerge.sessionStatesMap.get(key);
+ //if the sessionstate doesnt exist add it
+ if (sessionStatesMap.get(key) == null)
+ {
+ sessionStatesMap.put(key, ss);
+ }
+ //otherwise it already exist so we need to merge in the acks
+ else
+ {
+ SessionTxState orig = (SessionTxState) sessionStatesMap.get(key);
+ List acksToMerge = ss.getAcks();
+ if (acksToMerge != null)
+ {
+ Iterator acksIt = acksToMerge.iterator();
+ while (acksIt.hasNext())
+ {
+ //if the ack isnt there add it
+ Ack ack = (Ack) acksIt.next();
+ if (!orig.getAcks().contains(ack))
+ {
+ orig.getAcks().add(ack);
+ }
+ }
+ }
+ }
+
+ }
+ sessionStatesList = sessionStatesMap == null ?
+ Collections.EMPTY_LIST : new ArrayList(sessionStatesMap.values());
+ }
+
+ }
+
// Inner Classes -------------------------------------------------
public class SessionTxState
@@ -418,10 +438,10 @@
{
return sessionID;
}
-
+
public void setAcks(List acks)
{
- this.acks = acks;
+ this.acks = acks;
}
boolean handleFailover(int newServerID, String oldSessionID, String newSessionID)
@@ -432,13 +452,16 @@
serverID = newServerID;
// Remove any non persistent acks
- for(Iterator i = acks.iterator(); i.hasNext(); )
+ for (Iterator i = acks.iterator(); i.hasNext();)
{
- DeliveryInfo di = (DeliveryInfo)i.next();
+ DeliveryInfo di = (DeliveryInfo) i.next();
if (!di.getMessageProxy().getMessage().isReliable())
{
- if (trace) { log.trace(this + " discarded non-persistent " + di + " on failover"); }
+ if (trace)
+ {
+ log.trace(this + " discarded non-persistent " + di + " on failover");
+ }
i.remove();
}
}
@@ -446,7 +469,7 @@
}
else
{
- return false;
+ return false;
}
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java 2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/MessagingXAResource.java 2008-01-21 15:52:25 UTC (rev 3601)
@@ -32,20 +32,21 @@
/**
* An XAResource implementation.
- *
+ *
* This defines the contract for the application server to interact with the resource manager.
- *
+ *
* It mainly delegates to the resource manager.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
- *
+ *
* Parts based on JBoss MQ XAResource implementation by:
- *
+ *
* @author Hiram Chirino (Cojonudo14 at hotmail.com)
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
- *
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ *
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -55,33 +56,33 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(MessagingXAResource.class);
-
+
// Attributes -----------------------------------------------------------------------------------
-
+
private boolean trace = log.isTraceEnabled();
private ResourceManager rm;
-
+
private SessionState sessionState;
-
+
private ConnectionDelegate connection;
-
+
//For testing only
private boolean preventJoining;
// Static ---------------------------------------------------------------------------------------
-
+
// Constructors ---------------------------------------------------------------------------------
public MessagingXAResource(ResourceManager rm, SessionState sessionState)
- {
+ {
this.rm = rm;
-
+
this.sessionState = sessionState;
-
+
this.connection = (ConnectionDelegate)(sessionState.getParent()).getDelegate();
}
-
+
// XAResource implementation --------------------------------------------------------------------
public boolean setTransactionTimeout(int timeout) throws XAException
@@ -102,23 +103,23 @@
{
return false;
}
-
+
if (!(xaResource instanceof MessagingXAResource))
{
return false;
}
-
+
boolean same = ((MessagingXAResource)xaResource).rm.getServerID() == this.rm.getServerID();
-
+
if (trace) { log.trace("Calling isSameRM, result is " + same + " " + ((MessagingXAResource)xaResource).rm.getServerID() + " " + this.rm.getServerID()); }
-
+
return same;
}
-
+
public void start(Xid xid, int flags) throws XAException
{
if (trace) { log.trace(this + " starting " + xid + ", flags: " + flags); }
-
+
// Recreate Xid. See JBMESSAGING-661 [JPL]
if (!(xid instanceof MessagingXid))
@@ -127,21 +128,21 @@
}
boolean convertTx = false;
-
+
Object currentXid = sessionState.getCurrentTxId();
-
+
// Sanity check
if (currentXid == null)
{
throw new MessagingXAException(XAException.XAER_RMFAIL, "Current xid is not set");
}
-
- if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
+
+ if (sessionState.getCurrentTxId() instanceof LocalTx)
{
convertTx = true;
-
+
if (trace) { log.trace("Converting local tx into global tx branch"); }
- }
+ }
//TODO why do we need this synchronized block?
synchronized (this)
@@ -150,7 +151,7 @@
{
case TMNOFLAGS :
if (convertTx)
- {
+ {
// If I commit/rollback the tx, then there is a short period of time between the
// AS (or whoever) calling commit on the tx and calling start to enrolling the
// session in a new tx. If the session has any listeners then in that period,
@@ -161,30 +162,37 @@
setCurrentTransactionId(rm.convertTx((LocalTx)sessionState.getCurrentTxId(), xid));
}
else
- {
- setCurrentTransactionId(rm.startTx(xid));
+ {
+ setCurrentTransactionId(rm.startTx(xid));
}
break;
case TMJOIN :
- if(!xid.equals(currentXid))
+ if(convertTx)
{
- rm.removeTx(currentXid);
+ setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
}
- setCurrentTransactionId(rm.joinTx(xid));
+ else
+ {
+ setCurrentTransactionId(rm.joinTx(xid));
+ }
break;
case TMRESUME :
- if(!xid.equals(currentXid))
+ if(convertTx)
{
- rm.removeTx(currentXid);
+ setCurrentTransactionId(rm.convertOnJoinTx((LocalTx)sessionState.getCurrentTxId(), xid));
}
- setCurrentTransactionId(rm.resumeTx(xid));
+ else
+ {
+ setCurrentTransactionId(rm.resumeTx(xid));
+ }
+
break;
default:
throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
}
}
}
-
+
public void end(Xid xid, int flags) throws XAException
{
if (trace) { log.trace(this + " ending " + xid + ", flags: " + flags); }
@@ -198,12 +206,12 @@
//TODO - why do we need this synchronized block?
synchronized (this)
- {
- unsetCurrentTransactionId(xid);
-
+ {
+ unsetCurrentTransactionId(xid);
+
switch (flags)
{
- case TMSUSPEND :
+ case TMSUSPEND :
rm.suspendTx(xid);
break;
case TMFAIL :
@@ -213,11 +221,11 @@
rm.endTx(xid, true);
break;
default :
- throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
- }
+ throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
+ }
}
}
-
+
public int prepare(Xid xid) throws XAException
{
if (trace) { log.trace(this + " preparing " + xid); }
@@ -231,7 +239,7 @@
return rm.prepare(xid, connection);
}
-
+
public void commit(Xid xid, boolean onePhase) throws XAException
{
if (trace) { log.trace(this + " committing " + xid + (onePhase ? " (one phase)" : " (two phase)")); }
@@ -245,7 +253,7 @@
rm.commit(xid, onePhase, connection);
}
-
+
public void rollback(Xid xid) throws XAException
{
if (trace) { log.trace(this + " rolling back " + xid); }
@@ -259,7 +267,7 @@
rm.rollback(xid, connection);
}
-
+
public void forget(Xid xid) throws XAException
{
if (trace) { log.trace(this + " forgetting " + xid + " (currently an NOOP)"); }
@@ -270,19 +278,19 @@
if (trace) { log.trace(this + " recovering, flags: " + flags); }
Xid[] xids = rm.recover(flags, connection);
-
+
if (trace) { log.trace("Recovered txs: " + xids); }
-
+
return xids;
}
-
+
// Public ---------------------------------------------------------------------------------------
public String toString()
{
return "MessagingXAResource[" + sessionState.getDelegate().getID()+ "]";
}
-
+
/*
* This is used in testing to force isSameRM() to always return false
* This allows us to test 2PC properly - since otherwise the transaction manager
@@ -295,18 +303,18 @@
}
// Package protected ----------------------------------------------------------------------------
-
+
// Protected ------------------------------------------------------------------------------------
-
+
// Private --------------------------------------------------------------------------------------
-
+
private void setCurrentTransactionId(Object xid)
{
if (trace) { log.trace(this + " setting current xid to " + xid + ", previous " + sessionState.getCurrentTxId()); }
sessionState.setCurrentTxId(xid);
}
-
+
private void unsetCurrentTransactionId(Object xid)
{
if (xid == null)
@@ -328,10 +336,10 @@
// a full explanation
// So in other words - when the session is not enlisted in a global tx
// it will always have a local xid set
-
+
sessionState.setCurrentTxId(rm.createLocalTx());
}
}
-
+
// Inner classes --------------------------------------------------------------------------------
}
\ No newline at end of file
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-21 15:52:25 UTC (rev 3601)
@@ -21,19 +21,7 @@
*/
package org.jboss.jms.tx;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.SessionDelegate;
@@ -44,19 +32,26 @@
import org.jboss.jms.tx.ClientTransaction.SessionTxState;
import org.jboss.logging.Logger;
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
/**
* The ResourceManager manages work done in both local and global (XA) transactions.
- *
+ *
* This is one instance of ResourceManager per JMS server. The ResourceManager instances are managed
* by ResourceManagerFactory.
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:Konda.Madhu at uk.mizuho-sc.com">Madhu Konda</a>
* @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
* @author <a href="mailto:Cojonudo14 at hotmail.com">Hiram Chirino</a>
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
* @version $Revision$
*
* $Id$
@@ -64,33 +59,35 @@
public class ResourceManager
{
// Constants ------------------------------------------------------------------------------------
-
+
// Attributes -----------------------------------------------------------------------------------
-
+
private boolean trace = log.isTraceEnabled();
-
+
private ConcurrentHashMap transactions = new ConcurrentHashMap();
-
- private int serverID;
-
+
+ private ConcurrentHashMap convertedIds = new ConcurrentHashMap();
+
+ private int serverID;
+
// Static ---------------------------------------------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(ResourceManager.class);
-
+
// Constructors ---------------------------------------------------------------------------------
-
+
ResourceManager(int serverID)
- {
+ {
this.serverID = serverID;
}
-
+
// Public ---------------------------------------------------------------------------------------
-
+
public int getServerID()
{
return serverID;
}
-
+
/*
* Merge another resource manager into this one - used in failover
*/
@@ -98,7 +95,7 @@
{
transactions.putAll(other.transactions);
}
-
+
/**
* Remove a tx
*/
@@ -106,110 +103,110 @@
{
return removeTxInternal(xid);
}
-
+
/**
* Create a local tx.
*/
public LocalTx createLocalTx()
{
ClientTransaction tx = new ClientTransaction();
-
+
LocalTx xid = getNextTxId();
-
+
transactions.put(xid, tx);
-
+
return xid;
}
-
+
/**
* Add a message to a transaction
- *
+ *
* @param xid - The id of the transaction to add the message to
* @param m The message
*/
public void addMessage(Object xid, String sessionId, JBossMessage m)
{
if (trace) { log.trace("addding message " + m + " for xid " + xid); }
-
+
ClientTransaction tx = getTxInternal(xid);
-
+
tx.addMessage(sessionId, m);
}
-
+
/*
* Failover session from old session ID -> new session ID
*/
public void handleFailover(int newServerID, String oldSessionID, String newSessionID)
- {
+ {
for (Iterator i = transactions.values().iterator(); i.hasNext(); )
{
ClientTransaction tx = (ClientTransaction)i.next();
-
+
tx.handleFailover(newServerID, oldSessionID, newSessionID);
- }
- }
-
+ }
+ }
+
/*
* Get all the deliveries corresponding to the session ID
*/
public List getDeliveriesForSession(String sessionID)
{
List ackInfos = new ArrayList();
-
+
for (Iterator i = transactions.values().iterator(); i.hasNext(); )
{
ClientTransaction tx = (ClientTransaction)i.next();
-
+
List acks = tx.getDeliveriesForSession(sessionID);
-
+
ackInfos.addAll(acks);
}
-
+
return ackInfos;
}
-
-
+
+
/**
* Add an acknowledgement to the transaction
- *
+ *
* @param xid - The id of the transaction to add the message to
* @param ackInfo Information describing the acknowledgement
*/
public void addAck(Object xid, String sessionId, DeliveryInfo ackInfo) throws JMSException
{
if (trace) { log.trace("adding " + ackInfo + " to transaction " + xid); }
-
+
ClientTransaction tx = getTxInternal(xid);
-
+
if (tx == null)
{
throw new JMSException("There is no transaction with id " + xid);
}
-
+
tx.addAck(sessionId, ackInfo);
}
-
+
public void commitLocal(LocalTx xid, ConnectionDelegate connection) throws JMSException
{
if (trace) { log.trace("committing " + xid); }
-
+
ClientTransaction tx = this.getTxInternal(xid);
-
+
// Invalid xid
if (tx == null)
{
throw new IllegalStateException("Cannot find transaction " + xid);
}
-
+
TransactionRequest request =
new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
-
+
try
{
connection.sendTransaction(request, false);
-
+
// If we get this far we can remove the transaction
-
+
if (this.removeTxInternal(xid) == null)
{
throw new IllegalStateException("Cannot find xid to remove " + xid);
@@ -224,149 +221,149 @@
{
// If a problem occurs during commit processing the session should be rolled back
rollbackLocal(xid);
-
+
JMSException e = new MessagingTransactionRolledBackException(t.getMessage());
e.initCause(t);
- throw e;
+ throw e;
}
}
-
+
public void rollbackLocal(Object xid) throws JMSException
{
if (trace) { log.trace("rolling back local xid " + xid); }
-
+
ClientTransaction ts = removeTxInternal(xid);
-
+
if (ts == null)
- {
- throw new IllegalStateException("Cannot find transaction with xid:" + xid);
+ {
+ throw new IllegalStateException("Cannot find transaction with xid:" + xid);
}
-
+
// don't need messages for rollback
// We don't clear the acks since we need to redeliver locally
ts.clearMessages();
-
+
// for one phase rollback there is nothing to do on the server
-
+
redeliverMessages(ts);
}
-
+
//Only used for testing
public ClientTransaction getTx(Object xid)
{
return getTxInternal(xid);
}
-
+
//Only used for testing
public int size()
{
return transactions.size();
}
-
-
+
+
public boolean checkForAcksInSession(String sessionId)
- {
+ {
Iterator iter = transactions.entrySet().iterator();
-
+
while (iter.hasNext())
- {
+ {
Map.Entry entry = (Map.Entry)iter.next();
-
+
ClientTransaction tx = (ClientTransaction)entry.getValue();
-
+
if (tx.getState() == ClientTransaction.TX_PREPARED)
- {
+ {
List dels = tx.getDeliveriesForSession(sessionId);
-
+
if (!dels.isEmpty())
{
// There are outstanding prepared acks in this session
-
+
return true;
}
}
}
return false;
}
-
+
// Protected ------------------------------------------------------------------------------------
-
+
// Package Private ------------------------------------------------------------------------------
-
+
Xid startTx(Xid xid) throws XAException
{
if (trace) { log.trace("starting " + xid); }
-
+
ClientTransaction state = getTxInternal(xid);
-
+
if (state != null)
{
throw new MessagingXAException(XAException.XAER_DUPID, "Transaction already exists with xid " + xid);
}
-
+
transactions.put(xid, new ClientTransaction());
-
+
return xid;
}
-
+
void endTx(Xid xid, boolean success) throws XAException
{
if (trace) { log.trace("ending " + xid + ", success=" + success); }
-
+
ClientTransaction state = getTxInternal(xid);
-
+
if (state == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
- }
-
+ }
+
state.setState(ClientTransaction.TX_ENDED);
}
-
+
int prepare(Xid xid, ConnectionDelegate connection) throws XAException
{
if (trace) { log.trace("preparing " + xid); }
-
+
ClientTransaction state = getTxInternal(xid);
-
+
if (state == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
- }
-
+ }
+
TransactionRequest request =
new TransactionRequest(TransactionRequest.TWO_PHASE_PREPARE_REQUEST, xid, state);
-
- sendTransactionXA(request, connection);
-
+
+ sendTransactionXA(request, connection);
+
state.setState(ClientTransaction.TX_PREPARED);
-
+
if (trace) { log.trace("State is now: " + state.getState()); }
-
+
return XAResource.XA_OK;
}
-
+
void commit(Xid xid, boolean onePhase, ConnectionDelegate connection) throws XAException
{
if (trace) { log.trace("commiting xid " + xid + ", onePhase=" + onePhase); }
-
+
ClientTransaction tx = removeTxInternal(xid);
-
+
if (trace) { log.trace("got tx: " + tx + " state " + tx.getState()); }
-
+
if (onePhase)
{
//Invalid xid
if (tx == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
}
-
+
TransactionRequest request =
new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
-
- request.state = tx;
-
+
+ request.state = tx;
+
sendTransactionXA(request, connection);
}
else
@@ -374,7 +371,7 @@
if (tx != null)
{
if (tx.getState() != ClientTransaction.TX_PREPARED)
- {
+ {
throw new MessagingXAException(XAException.XAER_PROTO, "commit called for transaction, but it is not prepared");
}
}
@@ -384,173 +381,201 @@
//may happen if we have recovered from failure and the transaction manager
//is calling commit on the transaction as part of the recovery process.
}
-
+
TransactionRequest request =
new TransactionRequest(TransactionRequest.TWO_PHASE_COMMIT_REQUEST, xid, null);
-
- request.xid = xid;
-
- sendTransactionXA(request, connection);
+
+ request.xid = xid;
+
+ sendTransactionXA(request, connection);
}
-
+
if (tx != null)
{
tx.setState(ClientTransaction.TX_COMMITED);
}
}
-
+
void rollback(Xid xid, ConnectionDelegate connection) throws XAException
{
if (trace) { log.trace("rolling back xid " + xid); }
-
+
ClientTransaction tx = removeTxInternal(xid);
-
+
if (tx == null)
{
throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
}
-
+
//It's possible we don't actually have the prepared tx here locally - this
//may happen if we have recovered from failure and the transaction manager
//is calling rollback on the transaction as part of the recovery process.
-
+
TransactionRequest request = null;
-
+
//don't need the messages
if (tx != null)
{
tx.clearMessages();
}
-
+
if ((tx == null) || tx.getState() == ClientTransaction.TX_PREPARED)
{
//2PC rollback
-
+
request = new TransactionRequest(TransactionRequest.TWO_PHASE_ROLLBACK_REQUEST, xid, tx);
-
+
if (trace) { log.trace("Sending rollback to server, tx:" + tx); }
-
+
sendTransactionXA(request, connection);
- }
+ }
else
{
- //For one phase rollback there is nothing to do on the server
-
+ //For one phase rollback there is nothing to do on the server
+
if (tx == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
}
}
-
+
//we redeliver the messages
//locally to their original consumers if they are still open or cancel them to the server
//if the original consumers have closed
-
+
if (trace) { log.trace("Redelivering messages, tx:" + tx); }
-
+
try
{
if (tx != null)
{
redeliverMessages(tx);
-
- tx.setState(ClientTransaction.TX_ROLLEDBACK);
+
+ tx.setState(ClientTransaction.TX_ROLLEDBACK);
}
-
+
}
catch (JMSException e)
{
log.error("Failed to redeliver", e);
- }
+ }
}
-
-
+
+
Xid joinTx(Xid xid) throws XAException
{
if (trace) { log.trace("joining " + xid); }
-
+
ClientTransaction state = getTxInternal(xid);
-
+
if (state == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
- }
-
+ }
+
return xid;
}
-
-
-
+
+
+
Xid resumeTx(Xid xid) throws XAException
{
if (trace) { log.trace("resuming " + xid); }
-
+
ClientTransaction state = getTxInternal(xid);
-
+
if (state == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
}
-
+
return xid;
}
-
+
Xid suspendTx(Xid xid) throws XAException
{
if (trace) { log.trace("suspending " + xid); }
ClientTransaction state = getTxInternal(xid);
-
+
if (state == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
}
-
+
return xid;
}
Xid convertTx(LocalTx localTx, Xid xid) throws XAException
{
if (trace) { log.trace("converting " + localTx + " to " + xid); }
-
+
//Sanity check
-
+
ClientTransaction newTx = getTxInternal(xid);
if (newTx != null)
- {
+ {
throw new MessagingXAException(XAException.XAER_DUPID, "Transaction already exists:" + xid);
}
//Remove the local tx
-
+
ClientTransaction local = removeTxInternal(localTx);
if (local == null)
- {
+ {
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + localTx);
}
-
+
// Add the local back in with the new xid
-
+
transactions.put(xid, local);
-
+
return xid;
}
-
-
+
+
+
+
+ Xid convertOnJoinTx(LocalTx localTx, Xid xid) throws XAException
+ {
+ if (trace) { log.trace("converting " + localTx + " to " + xid); }
+
+ //Sanity check
+
+ ClientTransaction newTx = getTxInternal(xid);
+
+ if (newTx == null)
+ {
+ throw new MessagingXAException(XAException.XAER_DUPID, "Transaction doesnt exist:" + xid);
+ }
+
+ //Remove the local tx
+
+ ClientTransaction local = removeTxInternal(localTx);
+
+ if (local == null)
+ {
+ throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + localTx);
+ }
+
+ newTx.mergeIn(local);
+ return xid;
+ }
+
Xid[] recover(int flags, ConnectionDelegate conn) throws XAException
{
if (trace) { log.trace("calling recover with flags: " + flags); }
-
+
if (flags == XAResource.TMSTARTRSCAN)
{
try
{
Xid[] txs = conn.getPreparedTransactions();
-
+
if (trace) { log.trace("Got " + txs.length + " transactions from server"); }
-
+
//populate with TxState --MK
for (int i = 0; i < txs.length;i++)
{
@@ -558,9 +583,9 @@
if (!transactions.containsKey(txs[i]))
{
ClientTransaction tx = new ClientTransaction();
-
+
tx.setState(ClientTransaction.TX_PREPARED);
-
+
transactions.put(txs[i], tx);
}
}
@@ -577,58 +602,58 @@
return new Xid[0];
}
}
-
+
// Private --------------------------------------------------------------------------------------
-
+
private ClientTransaction getTxInternal(Object xid)
{
if (trace) { log.trace("getting transaction for " + xid); }
-
+
return (ClientTransaction)transactions.get(xid);
}
-
+
private ClientTransaction removeTxInternal(Object xid)
{
return (ClientTransaction)transactions.remove(xid);
}
-
+
/*
* Rollback has occurred so we need to redeliver any unacked messages corresponding to the acks
* is in the transaction.
- *
+ *
*/
private void redeliverMessages(ClientTransaction ts) throws JMSException
{
List sessionStates = ts.getSessionStates();
-
+
//Need to do this in reverse order
-
+
Collections.reverse(sessionStates);
-
+
for (Iterator i = sessionStates.iterator(); i.hasNext();)
{
SessionTxState state = (SessionTxState)i.next();
-
+
List acks = state.getAcks();
-
+
if (!acks.isEmpty())
{
DeliveryInfo info = (DeliveryInfo)acks.get(0);
-
+
MessageProxy mp = info.getMessageProxy();
-
+
SessionDelegate del = mp.getSessionDelegate();
-
+
del.redeliver(acks);
}
}
}
-
+
private synchronized LocalTx getNextTxId()
{
return new LocalTx();
}
-
+
private void sendTransactionXA(TransactionRequest request, ConnectionDelegate connection)
throws XAException
{
@@ -640,26 +665,26 @@
{
MessagingXAException xaEx = new MessagingXAException(XAException.XA_RBROLLBACK, "A security exception happend!", security);
log.error(xaEx, xaEx);
- throw xaEx;
+ throw xaEx;
}
catch (Throwable t)
{
//Catch anything else
-
+
//We assume that any error is recoverable - and the recovery manager should retry again
//either after the network connection has been repaired (if that was the problem), or
//the server has been fixed.
-
+
//(In some cases it will not be possible to fix so the user will have to manually resolve the tx)
-
+
//Therefore we throw XA_RETRY
//Note we DO NOT throw XAER_RMFAIL or XAER_RMERR since both if these will cause the Arjuna
//tx mgr NOT to retry and the transaction will have to be resolve manually.
-
+
throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending the transaction", t);
}
}
-
+
// Inner Classes --------------------------------------------------------------------------------
-
+
}
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java 2008-01-21 15:51:41 UTC (rev 3600)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/XATest.java 2008-01-21 15:52:25 UTC (rev 3601)
@@ -52,6 +52,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
* @version <tt>$Revision: 1.1 $</tt>
*
* $Id$
@@ -64,7 +65,7 @@
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
-
+
protected TransactionManager tm;
protected Transaction suspendedTx;
@@ -82,15 +83,15 @@
// TestCase overrides -------------------------------------------
public void setUp() throws Exception
- {
+ {
super.setUp();
-
- ResourceManagerFactory.instance.clear();
+ ResourceManagerFactory.instance.clear();
+
//Also need a local tx mgr if test is running remote
if (ServerManagement.isRemote())
{
- sc = new ServiceContainer("transaction");
+ sc = new ServiceContainer("transaction");
//Don't drop the tables again!
sc.start(false);
@@ -101,7 +102,7 @@
tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
assertTrue(tm instanceof TransactionManagerImple);
-
+
if (!ServerManagement.isRemote())
{
suspendedTx = tm.suspend();
@@ -109,7 +110,7 @@
}
public void tearDown() throws Exception
- {
+ {
if (TxUtils.isUncommitted(tm))
{
//roll it back
@@ -138,13 +139,30 @@
{
sc.stop();
}
-
+
super.tearDown();
}
// Public --------------------------------------------------------
+ /*
+ resource is not enlisted in tx
- public void testMemoryLeakForLocalTXs() throws Exception
+ do some work with tx W1
+
+ enlist resource in tx
+
+ do some more work W2
+
+ delist resource from tx
+
+ prepare tx
+
+ commit tx
+
+ validate that both sets of work W1 and W2 get applied.
+ */
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsWithWork() throws Exception
{
XAConnection conn = null;
@@ -162,26 +180,156 @@
ResourceManager rm = state.getResourceManager();
- XASession xaSession = conn.createXASession();
+ //Create a session
+ JBossSession sess1 = (JBossSession) conn.createXASession();
+ DummyListener listener = new DummyListener();
+ sess1.setMessageListener(listener);
+ conn.start();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ tm.begin();
+ Transaction trans = tm.getTransaction();
+ trans.enlistResource(res1);
+ trans.delistResource(res1, XAResource.TMSUCCESS);
+ MessageProducer p = sess1.createProducer(queue1);
+ MessageConsumer cons = sess1.createConsumer(queue1);
+ conn.start();
+ //send 10 messages
+ for(int i = 0; i < 10; i++)
+ {
+ TextMessage message = sess1.createTextMessage("delistedwork" + i);
+ p.send(message);
+ }
+ //now receive 5
+ for(int i = 0; i < 5; i++)
+ {
+ TextMessage textMessage = (TextMessage) cons.receive();
+ assertEquals("delistedwork" + i, textMessage.getText());
+ }
+ //once we enlist ensure that the 5 acks are merged ok, the first timne we do this there is nothing to merge in the global tx
+ //so all acks are just copied
+ trans.enlistResource(res1);
+ SessionState sstate = (SessionState)((DelegateSupport)sess1.getDelegate()).getState();
+ ClientTransaction clientTransaction = rm.getTx(sstate.getCurrentTxId());
+ assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+ ClientTransaction.SessionTxState sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+ assertEquals("wrong number of acks",5, sessionTxState.getAcks().size());
+
+ trans.delistResource(res1, XAResource.TMSUCCESS);
+ for(int i = 5; i < 10; i++)
+ {
+ TextMessage textMessage = (TextMessage) cons.receive();
+ assertEquals("delistedwork" + i, textMessage.getText());
+ }
+ //now reenlist and make sure that there are now 10 acks, this time around a merge will be done with the first 5 acks
+ //
+ trans.enlistResource(res1);
+
+ clientTransaction = rm.getTx(sstate.getCurrentTxId());
+ assertEquals("to many session states", clientTransaction.getSessionStates().size(), 1 );
+ sessionTxState = (ClientTransaction.SessionTxState) clientTransaction.getSessionStates().get(0);
+ assertEquals("wrong number of acks",10, sessionTxState.getAcks().size());
+
+ tm.commit();
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ }
+
+ }
+
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnJoinOnePhase() throws Exception
+ {
+ XAConnection conn = null;
+
+ Transaction tx1 = null;
+ try
+ {
+
+ conn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
//Create a session
XASession sess1 = conn.createXASession();
- XAResource res1 = sess1.getXAResource();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMJOIN);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.commit(xid, true);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
- tm.begin();
+ }
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnJoinTwoPhase() throws Exception
+ {
+ XAConnection conn = null;
- tx1 = tm.getTransaction();
+ Transaction tx1 = null;
+ try
+ {
- tx1.enlistResource(res1);
- int sizeBefore = rm.size();
- tx1.delistResource(res1, XAResource.TMSUCCESS);
+ conn = cf.createXAConnection();
- tx1.enlistResource(res1);
- int sizeAfter = rm.size();
- assertTrue(sizeBefore == sizeAfter);
- tx1.commit();
+ JBossConnection jbConn = (JBossConnection)conn;
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ //Create a session
+ XASession sess1 = conn.createXASession();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMJOIN);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.prepare(xid);
+ res1.commit(xid, false);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
}
finally
{
@@ -193,8 +341,103 @@
}
}
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnResumeOnePhase() throws Exception
+ {
+ XAConnection conn = null;
-
+ Transaction tx1 = null;
+ try
+ {
+
+ conn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ //Create a session
+ XASession sess1 = conn.createXASession();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMRESUME);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.commit(xid, true);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ }
+
+ }
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-1221
+ public void testMemoryLeakForLocalTXsOnJResumeTwoPhase() throws Exception
+ {
+ XAConnection conn = null;
+
+ Transaction tx1 = null;
+ try
+ {
+
+ conn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)conn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ //Create a session
+ XASession sess1 = conn.createXASession();
+ MessagingXAResource res1 = (MessagingXAResource) sess1.getXAResource();
+ byte[] branchQualifier = new byte[] { 1, 2, 3, 4, 5, 6, 0, 0, 0, 0 };
+ byte[] globalTxId = new byte[] { 6, 5, 4, 3, 2, 1, 0, 0, 0, 0 };
+ int rmSizeBeforeStart = rm.size();
+ Xid xid = new MessagingXid(branchQualifier, 12435, globalTxId);
+ res1.start(xid, XAResource.TMNOFLAGS);
+ res1.end(xid, XAResource.TMSUCCESS);
+ int rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.start(xid, XAResource.TMRESUME);
+ res1.end(xid, XAResource.TMSUCCESS);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart + 1 == rmAfter);
+ res1.prepare(xid);
+ res1.commit(xid, false);
+ rmAfter = rm.size();
+ assertTrue(rmSizeBeforeStart == rmAfter);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ }
+
+ }
/* If there is no global tx present the send must behave as non transacted.
* See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
* http://jira.jboss.com/jira/browse/JBMESSAGING-410
@@ -218,7 +461,7 @@
XAConnection xconn = xcf.createXAConnection();
XASession xs = xconn.createXASession();
-
+
MessageProducer p = xs.createProducer(queue1);
Message m = xs.createTextMessage("one");
@@ -250,9 +493,9 @@
/*
* If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
* falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
- *
+ *
* There is one exception to this:
- *
+ *
* For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
* is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
* (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
@@ -261,7 +504,7 @@
* Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
* is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
* is converted to the global tx brach.
- *
+ *
* We are testing the exceptional case here without a global tx here
*
* See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
@@ -353,14 +596,14 @@
}
}
}
-
-
+
+
/*
* If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
* falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
- *
+ *
* There is one exception to this:
- *
+ *
* For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
* is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
* (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
@@ -369,7 +612,7 @@
* Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
* is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
* is converted to the global tx brach.
- *
+ *
* We are testing the standard case without a global tx here
*
* See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
@@ -414,11 +657,11 @@
TextMessage rm = (TextMessage)c.receive(1000);
assertEquals("one", rm.getText());
-
+
// messages should be acked
count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
assertEquals(0, count.intValue());
-
+
xconn.close();
}
finally
@@ -430,14 +673,14 @@
}
}
}
-
+
/*
* If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
* falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
- *
+ *
* There is one exception to this:
- *
+ *
* For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
* is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
* (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
@@ -446,7 +689,7 @@
* Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
* is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
* is converted to the global tx brach.
- *
+ *
* We are testing the case with a global tx here
*
* See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
@@ -543,7 +786,7 @@
* - Receive one message over a consumer created used a XASession
* - Call Recover
* - Receive the second message
- * - The queue should be empty after that
+ * - The queue should be empty after that
* Verifies if messages are sent ok and ack properly when recovery is called
* NOTE: To accomodate TCK tests where Session/Consumers are being used without transaction enlisting
* we are processing those cases as nonTransactional/AutoACK, however if the session is being used
@@ -1140,9 +1383,9 @@
ServerManagement.startServerPeer();
deployAndLookupAdministeredObjects();
-
+
conn1.close();
-
+
conn1 = cf.createXAConnection();
XAResource res = conn1.createXASession().getXAResource();
@@ -1166,7 +1409,7 @@
finally
{
removeAllMessages(queue1.getQueueName(), true, 0);
-
+
if (conn1 != null)
{
try
More information about the jboss-cvs-commits
mailing list