[jboss-cvs] JBoss Messaging SVN: r2625 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/client/container and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 2 14:24:10 EDT 2007
Author: timfox
Date: 2007-05-02 14:24:09 -0400 (Wed, 02 May 2007)
New Revision: 2625
Modified:
branches/Branch_1_0_1_SP/.classpath
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
branches/Branch_1_0_1_SP/tests/build.xml
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-946
Modified: branches/Branch_1_0_1_SP/.classpath
===================================================================
--- branches/Branch_1_0_1_SP/.classpath 2007-05-02 16:42:17 UTC (rev 2624)
+++ branches/Branch_1_0_1_SP/.classpath 2007-05-02 18:24:09 UTC (rev 2625)
@@ -21,7 +21,6 @@
<classpathentry kind="lib" path="lib/jboss-system.jar"/>
<classpathentry kind="lib" path="lib/jboss-transaction.jar"/>
<classpathentry kind="lib" path="lib/jnp-client.jar"/>
- <classpathentry kind="lib" path="C:/dev/messaging/trunk/thirdparty/jboss/common-core/lib/jboss-common-core.jar"/>
<classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar"/>
<classpathentry kind="lib" path="tests/lib/jboss-common-jdbc-wrapper.jar"/>
<classpathentry kind="lib" path="tests/lib/jboss-jca.jar"/>
@@ -30,16 +29,13 @@
<classpathentry kind="lib" path="tests/lib/jms-ra.jar"/>
<classpathentry kind="lib" path="tests/lib/mysql-connector-java-3.1.13-bin.jar"/>
<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
- <classpathentry kind="lib" path="perf/resources/jcommon-1.0.0-rc1.jar"/>
- <classpathentry kind="lib" path="perf/resources/jfreechart-1.0.0-rc1.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
- <classpathentry sourcepath="/home/clebert/workspaces/jboss-head/aop/src/main" kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar" sourcepath="/home/clebert/workspaces/jboss-head/aop/src/main"/>
<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/profiler/jvmti/lib/jboss-profiler-jvmti.jar"/>
<classpathentry kind="lib" path="thirdparty/hsqldb/lib/hsqldb.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-logging/lib/commons-logging.jar"/>
- <classpathentry kind="var" path="ANT_HOME/ant-junit.jar"/>
- <classpathentry sourcepath="/JBossRemoting" kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar" sourcepath="/JBossRemoting"/>
<classpathentry kind="lib" path="thirdparty/jboss/serialization/lib/jboss-serialization.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="thirdparty/sun-javacc/lib/javacc.jar"/>
@@ -56,12 +52,14 @@
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jdk14-pluggable-instrumentor.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jrockit-pluggable-instrumentor.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/pluggable-instrumentor.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/common/lib/namespace.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/jbossxb/lib/jboss-xml-binding.jar"/>
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/backport-util-concurrent.jar"/>
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-runtime.jar"/>
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-transformer.jar"/>
<classpathentry kind="lib" path="thirdparty/trove/lib/trove.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/common-logging-spi/lib/jboss-logging-spi.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/common-core/lib/jboss-common-core.jar"/>
+ <classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
+ <classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-02 16:42:17 UTC (rev 2624)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-02 18:24:09 UTC (rev 2625)
@@ -37,6 +37,7 @@
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.tx.AckInfo;
+import org.jboss.jms.tx.LocalTx;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.Util;
@@ -81,7 +82,7 @@
{
AckInfo ack = (AckInfo)i.next();
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
{
acks.add(ack);
}
@@ -132,7 +133,7 @@
if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
{
// We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK
@@ -184,7 +185,7 @@
}
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
{
// We acknowledge immediately on a non-transacted session that does not want to
// CLIENT_ACKNOWLEDGE
@@ -248,9 +249,7 @@
SessionState state = getState(invocation);
- int ackMode = state.getAcknowledgeMode();
-
- if (ackMode == Session.SESSION_TRANSACTED)
+ if (state.isTransacted() && !isXAAndConsideredNonTransacted(state))
{
throw new IllegalStateException("Cannot recover a transacted session");
}
@@ -259,13 +258,18 @@
//Call redeliver
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ List toAck = state.getToAck();
+
+ if (!toAck.isEmpty())
+ {
+ del.redeliver(state.getToAck());
+
+ state.getToAck().clear();
+
+ state.setRecoverCalled(true);
+ }
- del.redeliver(state.getToAck());
-
- state.getToAck().clear();
-
- state.setRecoverCalled(true);
-
return null;
}
@@ -373,6 +377,17 @@
return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
}
+ /** http://jira.jboss.org/jira/browse/JBMESSAGING-946 - To accomodate TCK and the MQ behavior
+ * we should behave as non transacted, AUTO_ACK when there is no transaction enlisted
+ * However when the Session is being used by ASF we should consider the case where
+ * we will convert LocalTX to GlobalTransactions.
+ * This function helper will ensure the condition that needs to be tested on this aspect
+ * */
+ private boolean isXAAndConsideredNonTransacted(SessionState state)
+ {
+ return state.isXA() && (state.getCurrentTxId() instanceof LocalTx) && (state.getDistinguishedListener() == null);
+ }
+
// Inner Classes -------------------------------------------------
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2007-05-02 16:42:17 UTC (rev 2624)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2007-05-02 18:24:09 UTC (rev 2625)
@@ -152,7 +152,7 @@
SessionState state = (SessionState)getState(invocation);
Object txID = state.getCurrentTxId();
- if (txID != null)
+ if ((!state.isXA() && state.isTransacted()) || (state.isXA() && !(txID instanceof LocalTx)))
{
// the session is non-XA and transacted, or XA and enrolled in a global transaction, so
// we add the message to a transaction instead of sending it now. An XA session that has
Modified: branches/Branch_1_0_1_SP/tests/build.xml
===================================================================
--- branches/Branch_1_0_1_SP/tests/build.xml 2007-05-02 16:42:17 UTC (rev 2624)
+++ branches/Branch_1_0_1_SP/tests/build.xml 2007-05-02 18:24:09 UTC (rev 2625)
@@ -548,6 +548,7 @@
<exclude name="org/jboss/test/messaging/jms/MemLeakTest.class"/>
<exclude name="org/jboss/test/messaging/jms/ManifestTest.class"/>
<exclude name="org/jboss/test/messaging/jms/JCAWrapperTest.class"/>
+ <exclude name="org/jboss/test/messaging/jms/XATest.class"/>
<exclude name="org/jboss/test/thirdparty/jbosssx/SecurityAssociationTest.class"/>
</fileset>
</batchtest>
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-05-02 16:42:17 UTC (rev 2624)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-05-02 18:24:09 UTC (rev 2625)
@@ -21,16 +21,26 @@
*/
package org.jboss.test.messaging.jms;
+import java.util.ArrayList;
+
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
+import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -40,12 +50,18 @@
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.JBossSession;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.tx.LocalTx;
import org.jboss.jms.tx.MessagingXAResource;
import org.jboss.jms.tx.ResourceManager;
+import org.jboss.logging.Logger;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
import org.jboss.tm.TransactionManagerLocator;
/**
@@ -80,43 +96,39 @@
public void setUp() throws Exception
{
+ if (ServerManagement.isRemote())
+ {
+ throw new IllegalStateException("This test should only be run in local mode");
+ }
+
super.setUp();
+
ServerManagement.start("all");
- initialContext = new InitialContext();
-
+
initialContext = new InitialContext(ServerManagement.getJNDIEnvironment());
cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ tm = (TransactionManager)initialContext.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
- if (!ServerManagement.isRemote())
- {
- tm = TransactionManagerLocator.getInstance().locate();
- }
-
ServerManagement.undeployQueue("Queue");
ServerManagement.deployQueue("Queue");
queue = (Destination)initialContext.lookup("/queue/Queue");
this.drainDestination(cf, queue);
- if (!ServerManagement.isRemote())
- {
- suspendedTx = tm.suspend();
- }
+ suspendedTx = tm.suspend();
}
public void tearDown() throws Exception
{
ServerManagement.undeployQueue("Queue");
- if (!ServerManagement.isRemote())
+ if (tm.getTransaction() != null)
{
- if (tm.getTransaction() != null)
- {
- //roll it back
- tm.rollback();
- }
+ //roll it back
+ tm.rollback();
}
-
+
if (suspendedTx != null)
{
tm.resume(suspendedTx);
@@ -127,62 +139,529 @@
-
// Public ---------------------------------------------------------------------------------------
- // See http://jira.jboss.com/jira/browse/JBMESSAGING-638
+
+ /* If there is no global tx present the send must behave as non transacted.
+ * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-410
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-946
+ */
+ public void testSendNoGlobalTransaction() throws Exception
+ {
+ Transaction suspended = null;
+
+ try
+ {
+ ServerManagement.deployQueue("MyQueue");
+
+ // make sure there's no active JTA transaction
+
+ suspended = tm.suspend();
+
+ // send a message to the queue using an XASession that's not enlisted in a global tx
+
+ Queue queue = (Queue)initialContext.lookup("queue/MyQueue");
+
+ XAConnectionFactory xcf =
+ (XAConnectionFactory)initialContext.lookup("java:/XAConnectionFactory");
+
+ XAConnection xconn = xcf.createXAConnection();
+
+ XASession xs = xconn.createXASession();
+
+ MessageProducer p = xs.createProducer(queue);
+ Message m = xs.createTextMessage("one");
+
+ p.send(m);
+
+ log.debug("message sent");
+
+ xconn.close();
+
+ // receive the message
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ Connection conn = cf.createConnection();
+ conn.start();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer c = s.createConsumer(queue);
+ TextMessage rm = (TextMessage)c.receive(1000);
+ assertNotNull(rm);
+
+ assertEquals("one", rm.getText());
+
+ conn.close();
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("MyQueue");
+
+ if (suspended != null)
+ {
+ TransactionManagerLocator.getInstance().locate().resume(suspended);
+ }
+ }
+ }
+
+
+ /*
+ * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
+ * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
+ *
+ * There is one exception to this:
+ *
+ * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
+ * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
+ * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
+ * processing for a discussion of how different app servers deal with this)
+ * This is not a problem specific to JBoss and was solved with JCA 1.5 message inflow.
+ * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
+ * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
+ * is converted to the global tx brach.
+ *
+ * We are testing the exceptional case here without a global tx here
+ *
+ * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-410
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-946
+ *
+ */
+ public void testConsumeWithConnectionConsumerNoGlobalTransaction() throws Exception
+ {
+ ServerManagement.deployQueue("MyQueue2");
+
+ try
+ {
+ // send a message to the queue
+
+ ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+ Queue queue = (Queue) initialContext.lookup("queue/MyQueue2");
+ Connection conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue);
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+ Message m = s.createTextMessage("one");
+ p.send(m);
+ conn.close();
+
+ // make sure there's no active JTA transaction
+
+ Transaction suspended = tm.suspend();
+
+ XAConnection xaconn = null;
+ try
+ {
+
+ ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
+ Integer count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(1, count.intValue());
+
+ // using XA with a ConnectionConsumer (testing the transaction behavior under MDBs)
+
+ XAConnectionFactory xacf = (XAConnectionFactory) initialContext.lookup("/XAConnectionFactory");
+ xaconn = xacf.createXAConnection();
+ xaconn.start();
+ XASession xasession = xaconn.createXASession();
+ DummyListener listener = new DummyListener();
+ xasession.setMessageListener(listener);
+
+ ServerSessionPool pool = new MockServerSessionPool(xasession);
+
+ xaconn.createConnectionConsumer(queue, null, pool, 1);
+
+ Thread.sleep(1000);
+ assertEquals(1, listener.messages.size());
+
+ // Message should still be on server
+ count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(1, count.intValue());
+
+ XAResource resource = xasession.getXAResource();
+
+ // Starts a new transaction
+ tm.begin();
+
+ Transaction trans = tm.getTransaction();
+
+ JBossSession session = (JBossSession)xasession;
+ SessionState state = (SessionState)((DelegateSupport)session.getDelegate()).getState();
+
+ // Validates TX convertion
+ assertTrue(state.getCurrentTxId() instanceof LocalTx);
+
+ // Enlist the transaction... as supposed to be happening on JBossAS with the
+ // default listener (enlist happening after message is received)
+ trans.enlistResource(resource);
+
+ // Validates TX convertion
+ assertFalse(state.getCurrentTxId() instanceof LocalTx);
+
+ trans.delistResource(resource, XAResource.TMSUCCESS);
+
+ trans.commit();
+
+
+ // After commit the message should be consumed
+ count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(0, count.intValue());
+ }
+ finally
+ {
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ if (suspended != null)
+ {
+ TransactionManagerLocator.getInstance().locate().resume(suspended);
+ }
+ }
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("MyQueue2");
+ }
+
+
+ }
+
+
+ /*
+ * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
+ * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
+ *
+ * There is one exception to this:
+ *
+ * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
+ * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
+ * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
+ * processing for a discussion of how different app servers deal with this)
+ * This is not a problem specific to JBoss and was solved with JCA 1.5 message inflow.
+ * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
+ * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
+ * is converted to the global tx brach.
+ *
+ * We are testing the standard case without a global tx here
+ *
+ * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-410
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-946
+ *
+ */
+ public void testConsumeWithoutConnectionConsumerNoGlobalTransaction() throws Exception
+ {
+ try
+ {
+ ServerManagement.deployQueue("MyQueue2");
+
+ // send a message to the queue
+
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
+ Connection conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue);
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+ Message m = s.createTextMessage("one");
+ p.send(m);
+ conn.close();
+
+ // make sure there's no active JTA transaction
+
+ Transaction suspended = tm.suspend();
+
+ try
+ {
+
+ ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
+ Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(1, count.intValue());
+
+ XAConnectionFactory xcf =
+ (XAConnectionFactory)initialContext.lookup("java:/XAConnectionFactory");
+ XAConnection xconn = xcf.createXAConnection();
+ xconn.start();
+
+ // no active JTA transaction here
+
+ XASession xs = xconn.createXASession();
+
+ MessageConsumer c = xs.createConsumer(queue);
+
+ // the message should be store unacked in the local session
+ TextMessage rm = (TextMessage)c.receive(1000);
+
+ assertEquals("one", rm.getText());
+
+ // messages should be acked
+ count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(0, count.intValue());
+
+ xconn.close();
+ }
+ finally
+ {
+
+ if (suspended != null)
+ {
+ TransactionManagerLocator.getInstance().locate().resume(suspended);
+ }
+ }
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("MyQueue2");
+ }
+ }
+
+
+ /*
+ * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
+ * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
+ *
+ * There is one exception to this:
+ *
+ * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
+ * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
+ * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
+ * processing for a discussion of how different app servers deal with this)
+ * This is not a problem specific to JBoss and was solved with JCA 1.5 message inflow.
+ * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
+ * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
+ * is converted to the global tx brach.
+ *
+ * We are testing the case with a global tx here
+ *
+ * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-410
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-946
+ *
+ */
+ public void testConsumeGlobalTransaction() throws Exception
+ {
+ XAConnection xaconn = null;
+
+ try
+ {
+ ServerManagement.deployQueue("MyQueue2");
+
+ // send a message to the queue
+
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
+ Connection conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue);
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+ Message m = s.createTextMessage("one");
+ p.send(m);
+ conn.close();
+
+ ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
+ Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(1, count.intValue());
+
+ tm.begin();
+
+ Transaction trans = tm.getTransaction();
+
+ XAConnectionFactory xacf = (XAConnectionFactory)cf;
+
+ xaconn = xacf.createXAConnection();
+
+ xaconn.start();
+
+ XASession xasession = xaconn.createXASession();
+
+ XAResource resouce = xasession.getXAResource();
+
+ trans.enlistResource(resouce);
+
+ MessageConsumer consumer = xasession.createConsumer(queue);
+
+ TextMessage messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(messageReceived);
+
+ assertEquals("one", messageReceived.getText());
+
+ assertNull(consumer.receive(1000));
+
+ count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+
+ assertEquals(1, count.intValue());
+
+ trans.delistResource(resouce, XAResource.TMSUCCESS);
+
+ tm.rollback();
+
+ tm.begin();
+ trans = tm.getTransaction();
+ trans.enlistResource(resouce);
+
+ messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(messageReceived);
+
+ assertEquals("one", messageReceived.getText());
+
+ count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(1, count.intValue());
+
+ trans.commit();
+
+ count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(0, count.intValue());
+
+ }
+ finally
+ {
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ ServerManagement.undeployQueue("MyQueue2");
+ }
+ }
+
+ /*
+ * This test will:
+ * - Send two messages over a producer
+ * - Receive one message over a consumer created used a XASession
+ * - Call Recover
+ * - Receive the second message
+ * - The queue should be empty after that
+ * Verifies if messages are sent ok and ack properly when recovery is called
+ * NOTE: To accomodate TCK tests where Session/Consumers are being used without transaction enlisting
+ * we are processing those cases as nonTransactional/AutoACK, however if the session is being used
+ * to process MDBs we will consider the LocalTransaction convertion and process those as the comment above
+ * This was done as per: http://jira.jboss.org/jira/browse/JBMESSAGING-946
+ *
+ */
+ public void testRecoverOnXA() throws Exception
+ {
+ XAConnection xaconn = null;
+
+ try
+ {
+ ServerManagement.deployQueue("MyQueue2");
+
+ // send two messages to the queue
+
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
+ Connection conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue);
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+ Message m = s.createTextMessage("one");
+ p.send(m);
+ m = s.createTextMessage("two");
+ p.send(m);
+ conn.close();
+
+ ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
+ Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(2, count.intValue());
+
+ XAConnectionFactory xacf = (XAConnectionFactory)cf;
+
+ xaconn = xacf.createXAConnection();
+
+ xaconn.start();
+
+ XASession xasession = xaconn.createXASession();
+
+ MessageConsumer consumer = xasession.createConsumer(queue);
+
+ TextMessage messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(messageReceived);
+
+ assertEquals("one", messageReceived.getText());
+
+ xasession.recover();
+
+ messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertEquals("two", messageReceived.getText());
+
+ consumer.close();
+
+ // I can't call xasession.close for this test as JCA layer would cache the session
+ // So.. keep this close commented!
+ //xasession.close();
+
+ count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(0, count.intValue());
+
+
+ }
+ finally
+ {
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ ServerManagement.undeployQueue("MyQueue2");
+ }
+ }
+
+ //See http://jira.jboss.com/jira/browse/JBMESSAGING-638
public void testResourceManagerMemoryLeakOnCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection xaConn = null;
-
+
try
{
xaConn = cf.createXAConnection();
-
+
JBossConnection jbConn = (JBossConnection)xaConn;
-
+
ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
-
+
ConnectionState state = (ConnectionState)del.getState();
-
+
ResourceManager rm = state.getResourceManager();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
XAResource res = xaSession.getXAResource();
-
+
XAResource dummy = new DummyXAResource();
-
+
for (int i = 0; i < 100; i++)
{
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
-
+
tx.enlistResource(res);
-
+
tx.enlistResource(dummy);
-
+
assertEquals(1, rm.size());
-
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
tx.delistResource(dummy, XAResource.TMSUCCESS);
-
+
tm.commit();
- }
-
+ }
+
assertEquals(1, rm.size());
-
+
xaConn.close();
-
+
xaConn = null;
-
+
assertEquals(0, rm.size());
}
@@ -194,60 +673,57 @@
}
}
}
-
+
//See http://jira.jboss.com/jira/browse/JBMESSAGING-638
public void testResourceManagerMemoryLeakOnRollback() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
-
+ {
XAConnection xaConn = null;
-
+
try
{
xaConn = cf.createXAConnection();
-
+
JBossConnection jbConn = (JBossConnection)xaConn;
-
+
ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
-
+
ConnectionState state = (ConnectionState)del.getState();
-
+
ResourceManager rm = state.getResourceManager();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
XAResource res = xaSession.getXAResource();
-
+
XAResource dummy = new DummyXAResource();
-
+
for (int i = 0; i < 100; i++)
- {
+ {
tm.begin();
-
+
Transaction tx = tm.getTransaction();
-
+
tx.enlistResource(res);
-
+
tx.enlistResource(dummy);
-
+
assertEquals(1, rm.size());
-
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
tx.delistResource(dummy, XAResource.TMSUCCESS);
-
+
tm.rollback();
- }
-
+ }
+
assertEquals(1, rm.size());
-
+
xaConn.close();
-
+
xaConn = null;
-
+
assertEquals(0, rm.size());
}
@@ -259,229 +735,250 @@
}
}
}
-
- //http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ // http://jira.jboss.com/jira/browse/JBMESSAGING-721
public void testConvertFromLocalTx() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
-
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
+
+ ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=Queue");
+ Integer count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(0, count.intValue());
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+ count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(2, count.intValue());
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
- MessageConsumer cons = xaSession.createConsumer(queue);
-
- //Receive the two messages outside of a transaction
-
- TextMessage rm1 = (TextMessage)cons.receive(1000);
-
- assertNotNull(rm1);
-
- assertEquals("message1", rm1.getText());
-
- TextMessage rm2 = (TextMessage)cons.receive(1000);
-
- assertNotNull(rm2);
-
- assertEquals("message2", rm2.getText());
-
- Message rm3 = cons.receive(1000);
-
- assertNull(rm3);
-
+
+ DummyListener listener = new DummyListener();
+
+ xaSession.setMessageListener(listener);
+
+ ServerSessionPool pool = new MockServerSessionPool(xaSession);
+
+ xaConn.createConnectionConsumer(queue, null, pool, 1);
+
+ Thread.sleep(1000);
+
+ assertEquals(2, listener.messages.size());
+
+ assertEquals("message1", ((TextMessage)(listener.messages.get(0))).getText());
+ assertEquals("message2", ((TextMessage)(listener.messages.get(1))).getText());
+
+ count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(2, count.intValue());
+
+ listener.messages.clear();
+
//Now we enlist the session in an xa transaction
-
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
-
- // This should cause the work done previously to be converted into work done in the XA
- // transaction. This is what an MDB does. There is a difficulty in transactional delivery
- // with an MDB. The message is received from the destination and then sent to the MDB
- // container so it can call onMessage().
- // For transactional delivery the receipt of the message should be in a transaction but by
- // the time the MDB container is invoked the message has already been received it is too
- // late - the message has already been received and passed on (see page 199 (chapter 5 JMS
- // and Transactions, section "Application Server Integration" of Mark Little's book Java
- // Transaction processing for a discussion of how different app serves deal with this).
- // The way JBoss Messaging (and JBossMQ) deals with this is to convert any work done prior
- // to when the XASession is enlisted in the transaction, into work done in the XA
- // transaction
+ //This should cause the work done previously to be converted into work done in the xa transaction
+ //this is what an MDB does
+ //There is a difficulty in transactional delivery with an MDB.
+ //The message is received from the destination and then sent to the mdb container so
+ //it can call onMessage.
+ //For transactional delivery the receipt of the message should be in a transaction but by the time
+ //the mdb container is invoked the message has already been received it is too late - the message
+ //has already been received and passed on (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration"
+ //of Mark Little's book Java Transaction processing
+ //for a discussion of how different app serves deal with this)
+ //The way jboss messaging (and jboss mq) deals with this is to convert any work done
+ //prior to when the xasession is enlisted in the tx, into work done in the xa tx
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Now rollback the tx - this should cause redelivery of the two messages
- tx.rollback();
-
- rm1 = (TextMessage)cons.receive(1000);
-
- assertNotNull(rm1);
-
- assertEquals("message1", rm1.getText());
-
- rm2 = (TextMessage)cons.receive(1000);
-
- assertNotNull(rm2);
-
- assertEquals("message2", rm2.getText());
-
- rm3 = cons.receive(1000);
-
- assertNull(rm3);
+ tm.rollback();
+
+ count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(2, count.intValue());
+
+ Thread.sleep(1000);
+
+ assertEquals(2, listener.messages.size());
+
+ listener.messages.clear();
+
+ tm.begin();
+
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ tm.commit();
+
+ Thread.sleep(1000);
+
+ assertEquals(0, listener.messages.size());
+
+ count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(0, count.intValue());
+
+ assertNull(tm.getTransaction());
+
+
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
}
+
+ /* if (suspended != null)
+ {
+ tm.resume(suspended);
+ }*/
}
}
-
+
//http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ // Note: The behavior of this test was changed after http://jira.jboss.com/jira/browse/JBMESSAGING-946
+ // When you have a XASession without a transaction enlisted we will behave the same way as non transactedSession, AutoAck
public void testTransactionIdSetAfterCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
-
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
MessageConsumer cons = xaSession.createConsumer(queue);
-
+
//Now we enlist the session in an xa transaction
-
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
-
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Then we do a commit
tm.commit();
-
+
+ // I have changed where this begin was originally set
+ // as when you don't have a resource enlisted, XASessions will act as
+ // non transacted + AutoAck
+
+ //And enlist again - this should convert the work done in the local tx
+ //into the global branch
+
+ tx = tm.getTransaction();
+
+ tm.begin();
+
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+
//Then we receive the messages outside the tx
-
+
TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
Message rm3 = cons.receive(1000);
-
+
assertNull(rm3);
-
- //And enlist again - this should convert the work done in the local tx
- //into the global branch
-
- tx = tm.getTransaction();
-
- tm.begin();
-
- tx = tm.getTransaction();
- tx.enlistResource(res);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
//Now rollback the tx - this should cause redelivery of the two messages
tx.rollback();
-
+
rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
rm3 = cons.receive(1000);
-
+
assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
@@ -489,112 +986,114 @@
}
}
-
+
//http://jira.jboss.com/jira/browse/JBMESSAGING-721
public void testTransactionIdSetAfterRollback() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
-
+ {
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
MessageConsumer cons = xaSession.createConsumer(queue);
-
+
//Now we enlist the session in an xa transaction
-
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Then we do a rollback
- tm.rollback();
-
+ tm.rollback();
+
+
+ tm.begin();
+
+ //And enlist again - the work should then be converted into the global tx branch
+
+
+ // I have changed where this begin was originally set
+ // as when you don't have a resource enlisted, XASessions will act as
+ // non transacted + AutoAck
+
+ tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
//Then we receive the messages outside the global tx
-
+
TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
Message rm3 = cons.receive(1000);
-
+
assertNull(rm3);
-
- tm.begin();
-
- //And enlist again - the work should then be converted into the global tx branch
-
- tx = tm.getTransaction();
-
- tx.enlistResource(res);
-
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Now rollback the tx - this should cause redelivery of the two messages
tx.rollback();
-
+
rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
rm3 = cons.receive(1000);
-
+
assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
@@ -603,11 +1102,8 @@
}
-
public void test2PCSendCommit1PCOptimization() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
+ {
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -665,8 +1161,6 @@
public void test2PCSendCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -726,8 +1220,6 @@
public void test2PCSendRollback1PCOptimization() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -780,8 +1272,6 @@
public void test2PCSendFailOnPrepare() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
try
@@ -845,8 +1335,6 @@
public void test2PCSendRollback() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
try
@@ -899,8 +1387,6 @@
public void test2PCReceiveCommit1PCOptimization() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -976,8 +1462,6 @@
public void test2PCReceiveCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1052,8 +1536,6 @@
public void test2PCReceiveRollback1PCOptimization() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -1132,9 +1614,7 @@
}
public void test2PCReceiveRollback() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
+ {
XAConnection conn = null;
Connection conn2 = null;
@@ -1214,8 +1694,6 @@
public void test1PCSendCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1271,8 +1749,6 @@
public void test1PCSendRollback() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
try
@@ -1319,8 +1795,6 @@
public void test1PCReceiveCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1389,8 +1863,6 @@
public void test1PCReceiveRollback() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1469,8 +1941,6 @@
public void testMultipleSessionsOneTxCommitAcknowledge1PCOptimization() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1547,8 +2017,6 @@
public void testMultipleSessionsOneTxCommitAcknowledge() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1624,8 +2092,6 @@
public void testMultipleSessionsOneTxRollbackAcknowledge1PCOptimization() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1731,8 +2197,6 @@
public void testMultipleSessionsOneTxRollbackAcknowledge() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1844,8 +2308,6 @@
public void testMultipleSessionsOneTxRollbackAcknowledgeForceFailureInCommit() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -1953,8 +2415,6 @@
public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -2023,8 +2483,6 @@
public void testMultipleSessionsOneTxCommitSend() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -2096,8 +2554,6 @@
public void testMultipleSessionsOneTxRollbackSend1PCOptimization() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -2162,8 +2618,6 @@
public void testMultipleSessionsOneTxRollbackSend() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -2229,8 +2683,6 @@
public void testOneSessionTwoTransactionsCommitAcknowledge() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -2314,8 +2766,6 @@
public void testOneSessionTwoTransactionsRollbackAcknowledge() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -2413,8 +2863,6 @@
public void testOneSessionTwoTransactionsCommitSend() throws Exception
{
- if (ServerManagement.isRemote()) return;
-
XAConnection conn = null;
Connection conn2 = null;
@@ -2491,9 +2939,7 @@
public void testOneSessionTwoTransactionsRollbackSend() throws Exception
- {
- if (ServerManagement.isRemote()) return;
-
+ {
XAConnection conn = null;
Connection conn2 = null;
@@ -2574,6 +3020,57 @@
// Inner classes --------------------------------------------------------------------------------
+ static class DummyListener implements MessageListener
+ {
+
+ protected Logger log = Logger.getLogger(getClass());
+
+ public ArrayList messages = new ArrayList();
+
+ public void onMessage(Message message)
+ {
+ log.info("Message received on DummyListener " + message);
+ messages.add(message);
+ }
+ }
+
+ static class MockServerSessionPool implements ServerSessionPool
+ {
+ private ServerSession serverSession;
+
+ MockServerSessionPool(Session sess)
+ {
+ serverSession = new MockServerSession(sess);
+ }
+
+ public ServerSession getServerSession() throws JMSException
+ {
+ return serverSession;
+ }
+ }
+
+ static class MockServerSession implements ServerSession
+ {
+ Session session;
+
+ MockServerSession(Session sess)
+ {
+ this.session = sess;
+ }
+
+
+ public Session getSession() throws JMSException
+ {
+ return session;
+ }
+
+ public void start() throws JMSException
+ {
+ session.run();
+ }
+
+ }
+
static class DummyXAResource implements XAResource
{
boolean failOnPrepare;
More information about the jboss-cvs-commits
mailing list