[jboss-cvs] JBoss Messaging SVN: r2143 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/client/container and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 2 10:21:32 EST 2007
Author: timfox
Date: 2007-02-02 10:21:32 -0500 (Fri, 02 Feb 2007)
New Revision: 2143
Modified:
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
Backported http://jira.jboss.com/jira/browse/JBMESSAGING-797 http://jira.jboss.com/jira/browse/JBMESSAGING-638 http://jira.jboss.com/jira/browse/JBMESSAGING-721
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/JBossConnectionFactory.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -40,6 +40,7 @@
import javax.naming.Reference;
import org.jboss.aop.Advised;
+import org.jboss.aop.AspectManager;
import org.jboss.jms.client.container.JmsClientAspectXMLLoader;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.delegate.ConnectionDelegate;
@@ -226,20 +227,24 @@
{
if (!configLoaded)
{
- // Load the client side aspect stack configuration from the server and apply it
-
- delegate.init();
-
- byte[] clientAOPConfig = delegate.getClientAOPConfig();
-
- // Remove interceptor since we don't want it on the front of the stack
- ((Advised)delegate)._getInstanceAdvisor().removeInterceptor(delegate.getName());
-
- JmsClientAspectXMLLoader loader = new JmsClientAspectXMLLoader();
-
- loader.deployXML(clientAOPConfig);
-
- configLoaded = true;
+ // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797
+ synchronized (AspectManager.instance())
+ {
+ // Load the client side aspect stack configuration from the server and apply it
+
+ delegate.init();
+
+ byte[] clientAOPConfig = delegate.getClientAOPConfig();
+
+ // Remove interceptor since we don't want it on the front of the stack
+ ((Advised)delegate)._getInstanceAdvisor().removeInterceptor(delegate.getName());
+
+ JmsClientAspectXMLLoader loader = new JmsClientAspectXMLLoader();
+
+ loader.deployXML(clientAOPConfig);
+
+ configLoaded = true;
+ }
}
}
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -132,11 +132,9 @@
if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
- state.getCurrentTxId() == null)
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK, and
- // also for XA sessions not enrolled in a global transaction.
+ // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
@@ -186,11 +184,10 @@
}
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
- ackMode != Session.CLIENT_ACKNOWLEDGE && state.getCurrentTxId() == null)
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
// We acknowledge immediately on a non-transacted session that does not want to
- // CLIENT_ACKNOWLEDGE, or an XA session not enrolled in a global transaction.
+ // CLIENT_ACKNOWLEDGE
SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/SessionState.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -80,13 +80,13 @@
xaResource = new MessagingXAResource(parent.getResourceManager(), this);
}
- // If session is transacted and XA, the currentTxId will be updated when the XAResource will
- // be enrolled with a global transaction.
+ // Note we create the transaction even if XA - XA transactions must behave like
+ // local tx when not enlisted in a global tx
- if (transacted & !xa)
+ if (transacted)
{
// Create a local tx
- currentTxId = parent.getResourceManager().createLocalTx();
+ currentTxId = parent.getResourceManager().createLocalTx();
}
executor = new QueuedExecutor(new LinkedQueue());
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -29,6 +29,7 @@
import javax.naming.Context;
import javax.naming.InitialContext;
+import org.jboss.aop.AspectManager;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
import org.jboss.jms.server.ConnectionFactoryManager;
@@ -93,8 +94,14 @@
new ClientConnectionFactoryDelegate(id, locatorURI, serverPeer.getVersion(),
serverPeer.getServerPeerID(), clientPing);
- ConnectionFactoryAdvised connFactoryAdvised = new ConnectionFactoryAdvised(endpoint);
+ ConnectionFactoryAdvised connFactoryAdvised;
+ // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797
+ synchronized (AspectManager.instance())
+ {
+ connFactoryAdvised = new ConnectionFactoryAdvised(endpoint);
+ }
+
JMSDispatcher.instance.registerTarget(new Integer(id), connFactoryAdvised);
endpoints.put(new Integer(id), endpoint);
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -33,6 +33,7 @@
import javax.jms.InvalidClientIDException;
import javax.transaction.xa.Xid;
+import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.remoting.CallbackServerFactory;
import org.jboss.jms.delegate.SessionDelegate;
@@ -41,6 +42,7 @@
import org.jboss.jms.server.ConnectionManager;
import org.jboss.jms.server.SecurityManager;
import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
import org.jboss.jms.server.endpoint.advised.SessionAdvised;
import org.jboss.jms.server.plugin.contract.ChannelMapper;
import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -186,8 +188,16 @@
// connection endpoint instance
ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this);
putSessionDelegate(sessionID, ep);
- SessionAdvised sessionAdvised = new SessionAdvised(ep);
- JMSDispatcher.instance.registerTarget(new Integer(sessionID), sessionAdvised);
+
+ SessionAdvised advised;
+
+ // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797
+ synchronized (AspectManager.instance())
+ {
+ advised = new SessionAdvised(ep);
+ }
+
+ JMSDispatcher.instance.registerTarget(new Integer(sessionID), advised);
ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -23,10 +23,12 @@
import javax.jms.JMSException;
+import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.connectionfactory.JNDIBindings;
+import org.jboss.jms.server.endpoint.advised.BrowserAdvised;
import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.util.ExceptionUtil;
@@ -124,9 +126,16 @@
defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
int connectionID = endpoint.getConnectionID();
+
+ ConnectionAdvised advised;
+
+ // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797
+ synchronized (AspectManager.instance())
+ {
+ advised = new ConnectionAdvised(endpoint);
+ }
- ConnectionAdvised connAdvised = new ConnectionAdvised(endpoint);
- JMSDispatcher.instance.registerTarget(new Integer(connectionID), connAdvised);
+ JMSDispatcher.instance.registerTarget(new Integer(connectionID), advised);
log.debug("created and registered " + endpoint);
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -33,6 +33,7 @@
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
+import org.jboss.aop.AspectManager;
import org.jboss.jms.client.delegate.ClientBrowserDelegate;
import org.jboss.jms.client.delegate.ClientConsumerDelegate;
import org.jboss.jms.delegate.BrowserDelegate;
@@ -43,6 +44,7 @@
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.endpoint.advised.BrowserAdvised;
+import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
import org.jboss.jms.server.plugin.contract.ChannelMapper;
import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -265,9 +267,16 @@
subscription == null ? (Channel)coreDestination : subscription,
this, selector, noLocal, jmsDestination, prefetchSize, dlq);
- JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
-
+ ConsumerAdvised advised;
+ // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797
+ synchronized (AspectManager.instance())
+ {
+ advised = new ConsumerAdvised(ep);
+ }
+
+ JMSDispatcher.instance.registerTarget(new Integer(consumerID), advised);
+
ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize, maxDeliveryAttempts);
if (subscription != null)
@@ -321,8 +330,16 @@
new ServerBrowserEndpoint(this, browserID, (Channel)destination, messageSelector);
putBrowserDelegate(browserID, ep);
+
+ BrowserAdvised advised;
+
+ // Need to synchronize due to http://jira.jboss.com/jira/browse/JBMESSAGING-797
+ synchronized (AspectManager.instance())
+ {
+ advised = new BrowserAdvised(ep);
+ }
- JMSDispatcher.instance.registerTarget(new Integer(browserID), new BrowserAdvised(ep));
+ JMSDispatcher.instance.registerTarget(new Integer(browserID), advised);
ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/tx/MessagingXAResource.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -27,6 +27,7 @@
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.ConnectionDelegate;
+import org.jboss.jms.util.MessagingXAException;
import org.jboss.logging.Logger;
/**
@@ -114,34 +115,30 @@
if (trace) { log.trace(this + " committing " + xid + (onePhase ? " (one phase)" : " (two phase)")); }
rm.commit(xid, onePhase, connection);
-
- // leave the session in a 'clean' state, the currentTxId will be set when the XAResource will
- // be enrolled with a new transaction.
-
- setCurrentTransactionId(null);
}
public void end(Xid xid, int flags) throws XAException
{
if (trace) { log.trace(this + " ending " + xid + ", flags: " + flags); }
+ unsetCurrentTransactionId(xid);
+
synchronized (this)
- {
+ {
switch (flags)
{
- case TMSUSPEND :
- unsetCurrentTransactionId(xid);
+ case TMSUSPEND :
rm.suspendTx(xid);
break;
case TMFAIL :
- unsetCurrentTransactionId(xid);
rm.endTx(xid, false);
break;
case TMSUCCESS :
- unsetCurrentTransactionId(xid);
rm.endTx(xid, true);
break;
- }
+ default :
+ throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
+ }
}
}
@@ -179,17 +176,23 @@
boolean convertTx = false;
- if (sessionState.getCurrentTxId() != null)
+ Object currentXid = sessionState.getCurrentTxId();
+
+ // Sanity check
+ if (currentXid == null)
{
- if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
- {
- convertTx = true;
- }
+ throw new MessagingXAException(XAException.XAER_RMFAIL, "Current xid is not set");
}
+
+ if (flags == TMNOFLAGS && sessionState.getCurrentTxId() instanceof LocalTx)
+ {
+ convertTx = true;
+
+ if (trace) { log.trace("Converting local tx into global tx branch"); }
+ }
synchronized (this)
{
-
switch (flags)
{
case TMNOFLAGS :
@@ -200,6 +203,8 @@
// session in a new tx. If the session has any listeners then in that period,
// messages can be received asychronously but we want them to be received in the
// context of a tx, so we convert.
+ // Also for an transacted delivery in a MDB we need to do this as discussed
+ // in fallbackToLocalTx()
setCurrentTransactionId(rm.convertTx((LocalTx)sessionState.getCurrentTxId(), xid));
}
else
@@ -213,6 +218,8 @@
case TMRESUME :
setCurrentTransactionId(rm.resumeTx(xid));
break;
+ default:
+ throw new MessagingXAException(XAException.XAER_PROTO, "Invalid flags: " + flags);
}
}
}
@@ -241,14 +248,14 @@
// Private -------------------------------------------------------
- private void setCurrentTransactionId(final Xid xid)
+ private void setCurrentTransactionId(Object xid)
{
if (trace) { log.trace(this + " setting current xid to " + xid + ", previous " + sessionState.getCurrentTxId()); }
sessionState.setCurrentTxId(xid);
}
- private void unsetCurrentTransactionId(final Xid xid)
+ private void unsetCurrentTransactionId(Object xid)
{
if (xid == null)
{
@@ -261,6 +268,15 @@
// recycled
if (xid.equals(sessionState.getCurrentTxId()))
{
+ // When a transaction association ends we fall back to acting as if in a local tx
+ // This is because for MDBs, the message is received before the global tx
+ // has started. Therefore we receive it in a local tx, then convert the work
+ // done into the global tx branch when the resource is enlisted.
+ // See Mark Little's book "Java Transaction Processing" Chapter 5 for
+ // a full explanation
+ // So in other words - when the session is not enlisted in a global tx
+ // it will always have a local xid set
+
sessionState.setCurrentTxId(rm.createLocalTx());
}
}
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JCAWrapperTest.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -158,209 +158,8 @@
}
}
- /**
- * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-410.
- */
- public void testSendNoGlobalTransaction() throws Exception
- {
- Transaction suspended = null;
+
- try
- {
- ServerManagement.deployQueue("MyQueue");
-
- // make sure there's no active JTA transaction
-
- suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
- // send a message to the queue, using a JCA wrapper
-
- Queue queue = (Queue)ic.lookup("queue/MyQueue");
-
- ConnectionFactory mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
-
- Connection conn = mcf.createConnection();
-
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
-
- p.send(m);
-
- log.debug("message sent");
-
- conn.close();
-
- // receive the message
- ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
- conn = cf.createConnection();
- conn.start();
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = s.createConsumer(queue);
- TextMessage rm = (TextMessage)c.receive(1000);
-
- assertEquals("one", rm.getText());
-
- conn.close();
- }
- finally
- {
- ServerManagement.undeployQueue("MyQueue");
-
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
-
- /**
- * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-410. Use a cached connection that
- * was initally enroled in a global transaction.
- */
- public void testSendNoGlobalTransaction2() throws Exception
- {
-
- Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
- try
- {
-
- ConnectionFactory mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
- Connection conn = mcf.createConnection();
- conn.start();
-
- UserTransaction ut = ServerManagement.getUserTransaction();
-
- ut.begin();
-
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- Message m = s.createTextMessage("one");
-
- p.send(m);
-
- ut.commit();
-
- conn.close();
-
- ConnectionFactory cf = (ConnectionFactory)ic.lookup("ConnectionFactory");
- conn = cf.createConnection();
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn.start();
-
- TextMessage rm = (TextMessage)s.createConsumer(queue).receive(500);
-
- assertEquals("one", rm.getText());
-
- conn.close();
-
- // make sure there's no active JTA transaction
-
- assertNull(TransactionManagerLocator.getInstance().locate().getTransaction());
-
- // send a message to the queue, using a JCA wrapper
-
- mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
-
- conn = mcf.createConnection();
-
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- m = s.createTextMessage("one");
-
- p.send(m);
-
- conn.close();
-
- // receive the message
- cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
- conn = cf.createConnection();
- conn.start();
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = s.createConsumer(queue);
- rm = (TextMessage)c.receive(1000);
-
- assertEquals("one", rm.getText());
-
- conn.close();
- }
- finally
- {
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
-
- /**
- * Test case for http://jira.jboss.org/jira/browse/JBMESSAGING-520.
- */
- public void testReceiveNoGlobalTransaction() throws Exception
- {
- try
- {
- ServerManagement.deployQueue("MyQueue2");
-
- // send a message to the queue
-
- ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
- Queue queue = (Queue)ic.lookup("queue/MyQueue2");
- Connection conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
- p.send(m);
- conn.close();
-
- // make sure there's no active JTA transaction
-
- Transaction suspended = TransactionManagerLocator.getInstance().locate().suspend();
-
- try
- {
- // using a JCA wrapper
-
- ConnectionFactory mcf = (ConnectionFactory)ic.lookup("java:/JCAConnectionFactory");
- conn = mcf.createConnection();
- conn.start();
-
- // no active JTA transaction here
-
- s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = s.createConsumer(queue);
-
- // this method should send an untransacted acknowledgment that should clear the delivery
- TextMessage rm = (TextMessage)c.receive(1000);
-
- assertEquals("one", rm.getText());
-
- conn.close();
-
- // now the queue should be empty
- ObjectName on = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
- Integer count = (Integer)ServerManagement.getAttribute(on, "MessageCount");
- assertEquals(0, count.intValue());
- }
- finally
- {
-
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
- finally
- {
- ServerManagement.undeployQueue("MyQueue2");
- }
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-02-02 11:24:06 UTC (rev 2142)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-02-02 15:21:32 UTC (rev 2143)
@@ -38,8 +38,12 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.tx.MessagingXAResource;
+import org.jboss.jms.tx.ResourceManager;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.tm.TransactionManagerLocator;
@@ -126,224 +130,348 @@
// Public ---------------------------------------------------------------------------------------
- //http://jira.jboss.com/jira/browse/JBMESSAGING-721
- public void testConvertFromLocalTx() throws Exception
+ // See http://jira.jboss.com/jira/browse/JBMESSAGING-638
+ public void testResourceManagerMemoryLeakOnCommit() throws Exception
{
- if (ServerManagement.isRemote())
+
+ XAConnection xaConn = null;
+
+ try
{
- return;
+ xaConn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)xaConn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ XASession xaSession = xaConn.createXASession();
+
+ xaConn.start();
+
+ XAResource res = xaSession.getXAResource();
+
+ XAResource dummy = new DummyXAResource();
+
+ for (int i = 0; i < 100; i++)
+ {
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.enlistResource(dummy);
+
+ assertEquals(1, rm.size());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(dummy, XAResource.TMSUCCESS);
+
+ tm.commit();
+ }
+
+ assertEquals(1, rm.size());
+
+ xaConn.close();
+
+ xaConn = null;
+
+ assertEquals(0, rm.size());
+
}
+ finally
+ {
+ if (xaConn != null)
+ {
+ xaConn.close();
+ }
+ }
+ }
+
+ //See http://jira.jboss.com/jira/browse/JBMESSAGING-638
+ public void testResourceManagerMemoryLeakOnRollback() throws Exception
+ {
+ XAConnection xaConn = null;
+
+ try
+ {
+ xaConn = cf.createXAConnection();
+
+ JBossConnection jbConn = (JBossConnection)xaConn;
+
+ ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
+
+ ConnectionState state = (ConnectionState)del.getState();
+
+ ResourceManager rm = state.getResourceManager();
+
+ XASession xaSession = xaConn.createXASession();
+
+ xaConn.start();
+
+ XAResource res = xaSession.getXAResource();
+
+ XAResource dummy = new DummyXAResource();
+
+ for (int i = 0; i < 100; i++)
+ {
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.enlistResource(dummy);
+
+ assertEquals(1, rm.size());
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(dummy, XAResource.TMSUCCESS);
+
+ tm.rollback();
+ }
+
+ assertEquals(1, rm.size());
+
+ xaConn.close();
+
+ xaConn = null;
+
+ assertEquals(0, rm.size());
+ }
+ finally
+ {
+ if (xaConn != null)
+ {
+ xaConn.close();
+ }
+ }
+ }
+
+
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ public void testConvertFromLocalTx() throws Exception
+ {
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
- // First send some messages to a queue
-
+
+ //First send some messages to a queue
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
MessageConsumer cons = xaSession.createConsumer(queue);
-
- // Receive the two messages outside of a transaction
-
+
+ //Receive the two messages outside of a transaction
+
TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
Message rm3 = cons.receive(1000);
-
+
assertNull(rm3);
-
- // Now we enlist the session in an xa transaction
-
- log.info("enlisting");
+
+ //Now we enlist the session in an xa transaction
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
-
- // This should cause the work done previously to be converted into work done in the XA
- // transaction. This is what an MDB does. There is a difficulty in transactional delivery
- // with an MDB. The message is received from the destination and then sent to the MDB
- // container so it can call onMessage().
- // For transactional delivery the receipt of the message should be in a transaction but by
- // the time the MDB container is invoked the message has already been received it is too
- // late - the message has already been received and passed on (see page 199 (chapter 5 JMS
- // and Transactions, section "Application Server Integration" of Mark Little's book Java
- // Transaction processing for a discussion of how different app serves deal with this).
- // The way JBoss Messaging (and JBossMQ) deals with this is to convert any work done prior
- // to when the XASession is enlisted in the transaction, into work done in the XA
- // transaction
-
- // Now rollback the tx - this should cause redelivery of the two messages
+
+ //This should cause the work done previously to be converted into work done in the xa transaction
+ //this is what an MDB does
+ //There is a difficulty in transactional delivery with an MDB.
+ //The message is received from the destination and then sent to the mdb container so
+ //it can call onMessage.
+ //For transactional delivery the receipt of the message should be in a transaction but by the time
+ //the mdb container is invoked the message has already been received it is too late - the message
+ //has already been received and passed on (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration"
+ //of Mark Little's book Java Transaction processing
+ //for a discussion of how different app serves deal with this)
+ //The way jboss messaging (and jboss mq) deals with this is to convert any work done
+ //prior to when the xasession is enlisted in the tx, into work done in the xa tx
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ //Now rollback the tx - this should cause redelivery of the two messages
tx.rollback();
-
+
rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
rm3 = cons.receive(1000);
-
+
assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
}
}
}
-
+
//http://jira.jboss.com/jira/browse/JBMESSAGING-721
public void testTransactionIdSetAfterCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
MessageConsumer cons = xaSession.createConsumer(queue);
-
+
//Now we enlist the session in an xa transaction
-
- log.info("enlisting");
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
-
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
//Then we do a commit
tm.commit();
-
- //And enlist again
-
- tx = tm.getTransaction();
-
-
- tm.begin();
-
- tx = tm.getTransaction();
- tx.enlistResource(res);
-
- //Then we receive the messages
-
+
+ //Then we receive the messages outside the tx
+
TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
Message rm3 = cons.receive(1000);
-
+
assertNull(rm3);
-
+
+ //And enlist again - this should convert the work done in the local tx
+ //into the global branch
+
+ tx = tm.getTransaction();
+
+ tm.begin();
+
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
//Now rollback the tx - this should cause redelivery of the two messages
tx.rollback();
-
+
rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
rm3 = cons.receive(1000);
-
+
assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
@@ -351,108 +479,109 @@
}
}
-
+
//http://jira.jboss.com/jira/browse/JBMESSAGING-721
public void testTransactionIdSetAfterRollback() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
MessageConsumer cons = xaSession.createConsumer(queue);
-
+
//Now we enlist the session in an xa transaction
-
- log.info("enlisting");
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
-
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
//Then we do a rollback
- tm.rollback();
-
- tm.begin();
-
- //And enlist again
-
- tx = tm.getTransaction();
- tx.enlistResource(res);
-
- //Then we receive the messages
-
+ tm.rollback();
+
+ //Then we receive the messages outside the global tx
+
TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
Message rm3 = cons.receive(1000);
-
+
assertNull(rm3);
-
+
+ tm.begin();
+
+ //And enlist again - the work should then be converted into the global tx branch
+
+ tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
//Now rollback the tx - this should cause redelivery of the two messages
tx.rollback();
-
+
rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
rm3 = cons.receive(1000);
-
+
assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
More information about the jboss-cvs-commits
mailing list