[jboss-cvs] JBoss Messaging SVN: r2655 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/client/container and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 8 11:57:03 EDT 2007
Author: timfox
Date: 2007-05-08 11:57:02 -0400 (Tue, 08 May 2007)
New Revision: 2655
Modified:
branches/Branch_1_0_1_SP/build-messaging.xml
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
Log:
Reverted http://jira.jboss.com/jira/browse/JBMESSAGING-946 and updated version numbers
Modified: branches/Branch_1_0_1_SP/build-messaging.xml
===================================================================
--- branches/Branch_1_0_1_SP/build-messaging.xml 2007-05-04 16:26:12 UTC (rev 2654)
+++ branches/Branch_1_0_1_SP/build-messaging.xml 2007-05-08 15:57:02 UTC (rev 2655)
@@ -48,7 +48,7 @@
<property name="messaging.version.major" value="1"/>
<property name="messaging.version.minor" value="0"/>
<property name="messaging.version.revision" value="1"/>
- <property name="messaging.version.incrementing" value="6"/>
+ <property name="messaging.version.incrementing" value="13"/>
<property name="messaging.version.tag" value="SP5"/>
<property name="messaging.version.name" value=""/>
<property name="messaging.version.cvstag" value="JBossMessaging_1_0_1_SP5"/>
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-04 16:26:12 UTC (rev 2654)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-08 15:57:02 UTC (rev 2655)
@@ -37,7 +37,6 @@
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.tx.AckInfo;
-import org.jboss.jms.tx.LocalTx;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.Util;
@@ -67,6 +66,7 @@
// Public --------------------------------------------------------
+
public Object handleClosing(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
@@ -82,7 +82,7 @@
{
AckInfo ack = (AckInfo)i.next();
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
acks.add(ack);
}
@@ -133,7 +133,7 @@
if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
// We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK
@@ -185,7 +185,7 @@
}
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
// We acknowledge immediately on a non-transacted session that does not want to
// CLIENT_ACKNOWLEDGE
@@ -249,7 +249,9 @@
SessionState state = getState(invocation);
- if (state.isTransacted() && !isXAAndConsideredNonTransacted(state))
+ int ackMode = state.getAcknowledgeMode();
+
+ if (ackMode == Session.SESSION_TRANSACTED)
{
throw new IllegalStateException("Cannot recover a transacted session");
}
@@ -258,18 +260,13 @@
//Call redeliver
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
- List toAck = state.getToAck();
-
- if (!toAck.isEmpty())
- {
- del.redeliver(state.getToAck());
-
- state.getToAck().clear();
-
- state.setRecoverCalled(true);
- }
+ del.redeliver(state.getToAck());
+
+ state.getToAck().clear();
+
+ state.setRecoverCalled(true);
+
return null;
}
@@ -377,17 +374,6 @@
return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
}
- /** http://jira.jboss.org/jira/browse/JBMESSAGING-946 - To accomodate TCK and the MQ behavior
- * we should behave as non transacted, AUTO_ACK when there is no transaction enlisted
- * However when the Session is being used by ASF we should consider the case where
- * we will convert LocalTX to GlobalTransactions.
- * This function helper will ensure the condition that needs to be tested on this aspect
- * */
- private boolean isXAAndConsideredNonTransacted(SessionState state)
- {
- return state.isXA() && (state.getCurrentTxId() instanceof LocalTx) && (state.getDistinguishedListener() == null);
- }
-
// Inner Classes -------------------------------------------------
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2007-05-04 16:26:12 UTC (rev 2654)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2007-05-08 15:57:02 UTC (rev 2655)
@@ -64,6 +64,7 @@
// Public --------------------------------------------------------
+
public Object handleClose(Invocation invocation) throws Throwable
{
Object res = invocation.invokeNext();
@@ -152,7 +153,7 @@
SessionState state = (SessionState)getState(invocation);
Object txID = state.getCurrentTxId();
- if ((!state.isXA() && state.isTransacted()) || (state.isXA() && !(txID instanceof LocalTx)))
+ if (txID != null)
{
// the session is non-XA and transacted, or XA and enrolled in a global transaction, so
// we add the message to a transaction instead of sending it now. An XA session that has
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-05-04 16:26:12 UTC (rev 2654)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-05-08 15:57:02 UTC (rev 2655)
@@ -72,7 +72,7 @@
private static final Logger log = Logger.getLogger(WireFormatTest.class);
// TODO - replace with a dynamic value
- private static final byte CURRENT_VERSION = 6;
+ private static final byte CURRENT_VERSION = 13;
// Static --------------------------------------------------------
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-05-04 16:26:12 UTC (rev 2654)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/XATest.java 2007-05-08 15:57:02 UTC (rev 2655)
@@ -21,26 +21,16 @@
*/
package org.jboss.test.messaging.jms;
-import java.util.ArrayList;
-
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.ServerSession;
-import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
-import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -50,18 +40,12 @@
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
-import org.jboss.jms.client.JBossSession;
import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.tx.LocalTx;
import org.jboss.jms.tx.MessagingXAResource;
import org.jboss.jms.tx.ResourceManager;
-import org.jboss.logging.Logger;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.jmx.ServiceContainer;
import org.jboss.tm.TransactionManagerLocator;
/**
@@ -92,43 +76,48 @@
super(name);
}
+
// TestCase overrides --------------------------------------------------------------------------
public void setUp() throws Exception
{
- if (ServerManagement.isRemote())
- {
- throw new IllegalStateException("This test should only be run in local mode");
- }
-
super.setUp();
-
ServerManagement.start("all");
-
+ initialContext = new InitialContext();
+
initialContext = new InitialContext(ServerManagement.getJNDIEnvironment());
cf = (JBossConnectionFactory)initialContext.lookup("/ConnectionFactory");
-
- tm = (TransactionManager)initialContext.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
+ if (!ServerManagement.isRemote())
+ {
+ tm = TransactionManagerLocator.getInstance().locate();
+ }
+
ServerManagement.undeployQueue("Queue");
ServerManagement.deployQueue("Queue");
queue = (Destination)initialContext.lookup("/queue/Queue");
this.drainDestination(cf, queue);
- suspendedTx = tm.suspend();
+ if (!ServerManagement.isRemote())
+ {
+ suspendedTx = tm.suspend();
+ }
}
public void tearDown() throws Exception
{
ServerManagement.undeployQueue("Queue");
- if (tm.getTransaction() != null)
+ if (!ServerManagement.isRemote())
{
- //roll it back
- tm.rollback();
+ if (tm.getTransaction() != null)
+ {
+ //roll it back
+ tm.rollback();
+ }
}
-
+
if (suspendedTx != null)
{
tm.resume(suspendedTx);
@@ -139,529 +128,62 @@
+
// Public ---------------------------------------------------------------------------------------
-
- /* If there is no global tx present the send must behave as non transacted.
- * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
- * http://jira.jboss.com/jira/browse/JBMESSAGING-410
- * http://jira.jboss.com/jira/browse/JBMESSAGING-721
- * http://jira.jboss.org/jira/browse/JBMESSAGING-946
- */
- public void testSendNoGlobalTransaction() throws Exception
- {
- Transaction suspended = null;
-
- try
- {
- ServerManagement.deployQueue("MyQueue");
-
- // make sure there's no active JTA transaction
-
- suspended = tm.suspend();
-
- // send a message to the queue using an XASession that's not enlisted in a global tx
-
- Queue queue = (Queue)initialContext.lookup("queue/MyQueue");
-
- XAConnectionFactory xcf =
- (XAConnectionFactory)initialContext.lookup("java:/XAConnectionFactory");
-
- XAConnection xconn = xcf.createXAConnection();
-
- XASession xs = xconn.createXASession();
-
- MessageProducer p = xs.createProducer(queue);
- Message m = xs.createTextMessage("one");
-
- p.send(m);
-
- log.debug("message sent");
-
- xconn.close();
-
- // receive the message
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- Connection conn = cf.createConnection();
- conn.start();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = s.createConsumer(queue);
- TextMessage rm = (TextMessage)c.receive(1000);
- assertNotNull(rm);
-
- assertEquals("one", rm.getText());
-
- conn.close();
- }
- finally
- {
- ServerManagement.undeployQueue("MyQueue");
-
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
-
-
- /*
- * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
- * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
- *
- * There is one exception to this:
- *
- * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
- * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
- * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
- * processing for a discussion of how different app servers deal with this)
- * This is not a problem specific to JBoss and was solved with JCA 1.5 message inflow.
- * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
- * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
- * is converted to the global tx brach.
- *
- * We are testing the exceptional case here without a global tx here
- *
- * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
- * http://jira.jboss.com/jira/browse/JBMESSAGING-410
- * http://jira.jboss.com/jira/browse/JBMESSAGING-721
- * http://jira.jboss.org/jira/browse/JBMESSAGING-946
- *
- */
- public void testConsumeWithConnectionConsumerNoGlobalTransaction() throws Exception
- {
- ServerManagement.deployQueue("MyQueue2");
-
- try
- {
- // send a message to the queue
-
- ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
- Queue queue = (Queue) initialContext.lookup("queue/MyQueue2");
- Connection conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
- p.send(m);
- conn.close();
-
- // make sure there's no active JTA transaction
-
- Transaction suspended = tm.suspend();
-
- XAConnection xaconn = null;
- try
- {
-
- ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
- Integer count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(1, count.intValue());
-
- // using XA with a ConnectionConsumer (testing the transaction behavior under MDBs)
-
- XAConnectionFactory xacf = (XAConnectionFactory) initialContext.lookup("/XAConnectionFactory");
- xaconn = xacf.createXAConnection();
- xaconn.start();
- XASession xasession = xaconn.createXASession();
- DummyListener listener = new DummyListener();
- xasession.setMessageListener(listener);
-
- ServerSessionPool pool = new MockServerSessionPool(xasession);
-
- xaconn.createConnectionConsumer(queue, null, pool, 1);
-
- Thread.sleep(1000);
- assertEquals(1, listener.messages.size());
-
- // Message should still be on server
- count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(1, count.intValue());
-
- XAResource resource = xasession.getXAResource();
-
- // Starts a new transaction
- tm.begin();
-
- Transaction trans = tm.getTransaction();
-
- JBossSession session = (JBossSession)xasession;
- SessionState state = (SessionState)((DelegateSupport)session.getDelegate()).getState();
-
- // Validates TX convertion
- assertTrue(state.getCurrentTxId() instanceof LocalTx);
-
- // Enlist the transaction... as supposed to be happening on JBossAS with the
- // default listener (enlist happening after message is received)
- trans.enlistResource(resource);
-
- // Validates TX convertion
- assertFalse(state.getCurrentTxId() instanceof LocalTx);
-
- trans.delistResource(resource, XAResource.TMSUCCESS);
-
- trans.commit();
-
-
- // After commit the message should be consumed
- count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(0, count.intValue());
- }
- finally
- {
- if (xaconn != null)
- {
- xaconn.close();
- }
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
- finally
- {
- ServerManagement.undeployQueue("MyQueue2");
- }
-
-
- }
-
-
- /*
- * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
- * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
- *
- * There is one exception to this:
- *
- * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
- * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
- * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
- * processing for a discussion of how different app servers deal with this)
- * This is not a problem specific to JBoss and was solved with JCA 1.5 message inflow.
- * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
- * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
- * is converted to the global tx brach.
- *
- * We are testing the standard case without a global tx here
- *
- * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
- * http://jira.jboss.com/jira/browse/JBMESSAGING-410
- * http://jira.jboss.com/jira/browse/JBMESSAGING-721
- * http://jira.jboss.org/jira/browse/JBMESSAGING-946
- *
- */
- public void testConsumeWithoutConnectionConsumerNoGlobalTransaction() throws Exception
- {
- try
- {
- ServerManagement.deployQueue("MyQueue2");
-
- // send a message to the queue
-
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
- Connection conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
- p.send(m);
- conn.close();
-
- // make sure there's no active JTA transaction
-
- Transaction suspended = tm.suspend();
-
- try
- {
-
- ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
- Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(1, count.intValue());
-
- XAConnectionFactory xcf =
- (XAConnectionFactory)initialContext.lookup("java:/XAConnectionFactory");
- XAConnection xconn = xcf.createXAConnection();
- xconn.start();
-
- // no active JTA transaction here
-
- XASession xs = xconn.createXASession();
-
- MessageConsumer c = xs.createConsumer(queue);
-
- // the message should be store unacked in the local session
- TextMessage rm = (TextMessage)c.receive(1000);
-
- assertEquals("one", rm.getText());
-
- // messages should be acked
- count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(0, count.intValue());
-
- xconn.close();
- }
- finally
- {
-
- if (suspended != null)
- {
- TransactionManagerLocator.getInstance().locate().resume(suspended);
- }
- }
- }
- finally
- {
- ServerManagement.undeployQueue("MyQueue2");
- }
- }
-
-
- /*
- * If messages are consumed using an XASession that is not enlisted in a transaction then the behaviour of the session
- * falls back to being AUTO_ACK - i.e. the messages will get acked immediately.
- *
- * There is one exception to this:
- *
- * For transactional delivery of messages in an MDB using the old container invoker (non JCA 1.5 inflow) the message
- * is received from the JMS provider *before* the MDB container has a chance to enlist the session in a transaction.
- * (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration" of Mark Little's book Java Transaction
- * processing for a discussion of how different app servers deal with this)
- * This is not a problem specific to JBoss and was solved with JCA 1.5 message inflow.
- * Consequently, if we detect the session has a distinguised session listener (which it will if using ASF) then the behaviour
- * is to fall back to being a local transacted session. Later on, when the session is enlisted the work done in the local tx
- * is converted to the global tx brach.
- *
- * We are testing the case with a global tx here
- *
- * See http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577&postdays=0&postorder=asc&start=0
- * http://jira.jboss.com/jira/browse/JBMESSAGING-410
- * http://jira.jboss.com/jira/browse/JBMESSAGING-721
- * http://jira.jboss.org/jira/browse/JBMESSAGING-946
- *
- */
- public void testConsumeGlobalTransaction() throws Exception
- {
- XAConnection xaconn = null;
-
- try
- {
- ServerManagement.deployQueue("MyQueue2");
-
- // send a message to the queue
-
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
- Connection conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
- p.send(m);
- conn.close();
-
- ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
- Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(1, count.intValue());
-
- tm.begin();
-
- Transaction trans = tm.getTransaction();
-
- XAConnectionFactory xacf = (XAConnectionFactory)cf;
-
- xaconn = xacf.createXAConnection();
-
- xaconn.start();
-
- XASession xasession = xaconn.createXASession();
-
- XAResource resouce = xasession.getXAResource();
-
- trans.enlistResource(resouce);
-
- MessageConsumer consumer = xasession.createConsumer(queue);
-
- TextMessage messageReceived = (TextMessage)consumer.receive(1000);
-
- assertNotNull(messageReceived);
-
- assertEquals("one", messageReceived.getText());
-
- assertNull(consumer.receive(1000));
-
- count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
-
- assertEquals(1, count.intValue());
-
- trans.delistResource(resouce, XAResource.TMSUCCESS);
-
- tm.rollback();
-
- tm.begin();
- trans = tm.getTransaction();
- trans.enlistResource(resouce);
-
- messageReceived = (TextMessage)consumer.receive(1000);
-
- assertNotNull(messageReceived);
-
- assertEquals("one", messageReceived.getText());
-
- count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(1, count.intValue());
-
- trans.commit();
-
- count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(0, count.intValue());
-
- }
- finally
- {
- if (xaconn != null)
- {
- xaconn.close();
- }
- ServerManagement.undeployQueue("MyQueue2");
- }
- }
-
- /*
- * This test will:
- * - Send two messages over a producer
- * - Receive one message over a consumer created used a XASession
- * - Call Recover
- * - Receive the second message
- * - The queue should be empty after that
- * Verifies if messages are sent ok and ack properly when recovery is called
- * NOTE: To accomodate TCK tests where Session/Consumers are being used without transaction enlisting
- * we are processing those cases as nonTransactional/AutoACK, however if the session is being used
- * to process MDBs we will consider the LocalTransaction convertion and process those as the comment above
- * This was done as per: http://jira.jboss.org/jira/browse/JBMESSAGING-946
- *
- */
- public void testRecoverOnXA() throws Exception
- {
- XAConnection xaconn = null;
-
- try
- {
- ServerManagement.deployQueue("MyQueue2");
-
- // send two messages to the queue
-
- ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
- Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
- Connection conn = cf.createConnection();
- Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = s.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.PERSISTENT);
- Message m = s.createTextMessage("one");
- p.send(m);
- m = s.createTextMessage("two");
- p.send(m);
- conn.close();
-
- ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
- Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(2, count.intValue());
-
- XAConnectionFactory xacf = (XAConnectionFactory)cf;
-
- xaconn = xacf.createXAConnection();
-
- xaconn.start();
-
- XASession xasession = xaconn.createXASession();
-
- MessageConsumer consumer = xasession.createConsumer(queue);
-
- TextMessage messageReceived = (TextMessage)consumer.receive(1000);
-
- assertNotNull(messageReceived);
-
- assertEquals("one", messageReceived.getText());
-
- xasession.recover();
-
- messageReceived = (TextMessage)consumer.receive(1000);
-
- assertEquals("two", messageReceived.getText());
-
- consumer.close();
-
- // I can't call xasession.close for this test as JCA layer would cache the session
- // So.. keep this close commented!
- //xasession.close();
-
- count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(0, count.intValue());
-
-
- }
- finally
- {
- if (xaconn != null)
- {
- xaconn.close();
- }
- ServerManagement.undeployQueue("MyQueue2");
- }
- }
-
- //See http://jira.jboss.com/jira/browse/JBMESSAGING-638
+ // See http://jira.jboss.com/jira/browse/JBMESSAGING-638
public void testResourceManagerMemoryLeakOnCommit() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection xaConn = null;
-
+
try
{
xaConn = cf.createXAConnection();
-
+
JBossConnection jbConn = (JBossConnection)xaConn;
-
+
ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
-
+
ConnectionState state = (ConnectionState)del.getState();
-
+
ResourceManager rm = state.getResourceManager();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
XAResource res = xaSession.getXAResource();
-
+
XAResource dummy = new DummyXAResource();
-
+
for (int i = 0; i < 100; i++)
{
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
-
+
tx.enlistResource(res);
-
+
tx.enlistResource(dummy);
-
+
assertEquals(1, rm.size());
-
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
tx.delistResource(dummy, XAResource.TMSUCCESS);
-
+
tm.commit();
- }
-
+ }
+
assertEquals(1, rm.size());
-
+
xaConn.close();
-
+
xaConn = null;
-
+
assertEquals(0, rm.size());
}
@@ -673,57 +195,60 @@
}
}
}
-
+
//See http://jira.jboss.com/jira/browse/JBMESSAGING-638
public void testResourceManagerMemoryLeakOnRollback() throws Exception
- {
+ {
+ if (ServerManagement.isRemote()) return;
+
+
XAConnection xaConn = null;
-
+
try
{
xaConn = cf.createXAConnection();
-
+
JBossConnection jbConn = (JBossConnection)xaConn;
-
+
ClientConnectionDelegate del = (ClientConnectionDelegate)jbConn.getDelegate();
-
+
ConnectionState state = (ConnectionState)del.getState();
-
+
ResourceManager rm = state.getResourceManager();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
XAResource res = xaSession.getXAResource();
-
+
XAResource dummy = new DummyXAResource();
-
+
for (int i = 0; i < 100; i++)
- {
+ {
tm.begin();
-
+
Transaction tx = tm.getTransaction();
-
+
tx.enlistResource(res);
-
+
tx.enlistResource(dummy);
-
+
assertEquals(1, rm.size());
-
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
tx.delistResource(dummy, XAResource.TMSUCCESS);
-
+
tm.rollback();
- }
-
+ }
+
assertEquals(1, rm.size());
-
+
xaConn.close();
-
+
xaConn = null;
-
+
assertEquals(0, rm.size());
}
@@ -735,250 +260,229 @@
}
}
}
+
- // http://jira.jboss.com/jira/browse/JBMESSAGING-721
+ //http://jira.jboss.com/jira/browse/JBMESSAGING-721
public void testConvertFromLocalTx() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
+
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
- ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=Queue");
- Integer count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(0, count.intValue());
-
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
- count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(2, count.intValue());
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
- DummyListener listener = new DummyListener();
-
- xaSession.setMessageListener(listener);
-
- ServerSessionPool pool = new MockServerSessionPool(xaSession);
-
- xaConn.createConnectionConsumer(queue, null, pool, 1);
-
- Thread.sleep(1000);
-
- assertEquals(2, listener.messages.size());
-
- assertEquals("message1", ((TextMessage)(listener.messages.get(0))).getText());
- assertEquals("message2", ((TextMessage)(listener.messages.get(1))).getText());
-
- count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(2, count.intValue());
-
- listener.messages.clear();
-
+
+ MessageConsumer cons = xaSession.createConsumer(queue);
+
+ //Receive the two messages outside of a transaction
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals("message1", rm1.getText());
+
+ TextMessage rm2 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm2);
+
+ assertEquals("message2", rm2.getText());
+
+ Message rm3 = cons.receive(1000);
+
+ assertNull(rm3);
+
//Now we enlist the session in an xa transaction
-
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
+
+ // This should cause the work done previously to be converted into work done in the XA
+ // transaction. This is what an MDB does. There is a difficulty in transactional delivery
+ // with an MDB. The message is received from the destination and then sent to the MDB
+ // container so it can call onMessage().
+ // For transactional delivery the receipt of the message should be in a transaction but by
+ // the time the MDB container is invoked the message has already been received it is too
+ // late - the message has already been received and passed on (see page 199 (chapter 5 JMS
+ // and Transactions, section "Application Server Integration" of Mark Little's book Java
+ // Transaction processing for a discussion of how different app serves deal with this).
+ // The way JBoss Messaging (and JBossMQ) deals with this is to convert any work done prior
+ // to when the XASession is enlisted in the transaction, into work done in the XA
+ // transaction
- //This should cause the work done previously to be converted into work done in the xa transaction
- //this is what an MDB does
- //There is a difficulty in transactional delivery with an MDB.
- //The message is received from the destination and then sent to the mdb container so
- //it can call onMessage.
- //For transactional delivery the receipt of the message should be in a transaction but by the time
- //the mdb container is invoked the message has already been received it is too late - the message
- //has already been received and passed on (see page 199 (chapter 5 JMS and Transactions, section "Application Server Integration"
- //of Mark Little's book Java Transaction processing
- //for a discussion of how different app serves deal with this)
- //The way jboss messaging (and jboss mq) deals with this is to convert any work done
- //prior to when the xasession is enlisted in the tx, into work done in the xa tx
-
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Now rollback the tx - this should cause redelivery of the two messages
- tm.rollback();
-
- count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(2, count.intValue());
-
- Thread.sleep(1000);
-
- assertEquals(2, listener.messages.size());
-
- listener.messages.clear();
-
- tm.begin();
-
- tx = tm.getTransaction();
- tx.enlistResource(res);
-
- tm.commit();
-
- Thread.sleep(1000);
-
- assertEquals(0, listener.messages.size());
-
- count = (Integer) ServerManagement.getAttribute(queueMBean, "MessageCount");
- assertEquals(0, count.intValue());
-
- assertNull(tm.getTransaction());
-
-
+ tx.rollback();
+
+ rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals("message1", rm1.getText());
+
+ rm2 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm2);
+
+ assertEquals("message2", rm2.getText());
+
+ rm3 = cons.receive(1000);
+
+ assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
}
-
- /* if (suspended != null)
- {
- tm.resume(suspended);
- }*/
}
}
-
+
//http://jira.jboss.com/jira/browse/JBMESSAGING-721
- // Note: The behavior of this test was changed after http://jira.jboss.com/jira/browse/JBMESSAGING-946
- // When you have a XASession without a transaction enlisted we will behave the same way as non transactedSession, AutoAck
public void testTransactionIdSetAfterCommit() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
+
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
MessageConsumer cons = xaSession.createConsumer(queue);
-
+
//Now we enlist the session in an xa transaction
-
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
-
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Then we do a commit
tm.commit();
-
- // I have changed where this begin was originally set
- // as when you don't have a resource enlisted, XASessions will act as
- // non transacted + AutoAck
-
- //And enlist again - this should convert the work done in the local tx
- //into the global branch
-
- tx = tm.getTransaction();
-
- tm.begin();
-
- tx = tm.getTransaction();
- tx.enlistResource(res);
-
+
//Then we receive the messages outside the tx
-
+
TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
Message rm3 = cons.receive(1000);
-
+
assertNull(rm3);
-
- tx.delistResource(res, XAResource.TMSUCCESS);
-
+
+ //And enlist again - this should convert the work done in the local tx
+ //into the global branch
+
+ tx = tm.getTransaction();
+
+ tm.begin();
+
+ tx = tm.getTransaction();
+ tx.enlistResource(res);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
//Now rollback the tx - this should cause redelivery of the two messages
tx.rollback();
-
+
rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
rm3 = cons.receive(1000);
-
+
assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
@@ -986,114 +490,112 @@
}
}
-
+
//http://jira.jboss.com/jira/browse/JBMESSAGING-721
public void testTransactionIdSetAfterRollback() throws Exception
- {
+ {
+ if (ServerManagement.isRemote()) return;
+
+
Connection conn = null;
-
+
XAConnection xaConn = null;
-
+
try
{
-
+
//First send some messages to a queue
-
+
conn = cf.createConnection();
-
+
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sessSend.createProducer(queue);
-
+
TextMessage tm1 = sessSend.createTextMessage("message1");
-
+
TextMessage tm2 = sessSend.createTextMessage("message2");
-
+
prod.send(tm1);
-
+
prod.send(tm2);
-
-
+
+
xaConn = cf.createXAConnection();
-
+
XASession xaSession = xaConn.createXASession();
-
+
xaConn.start();
-
+
MessageConsumer cons = xaSession.createConsumer(queue);
-
+
//Now we enlist the session in an xa transaction
-
+
XAResource res = xaSession.getXAResource();
-
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Then we do a rollback
- tm.rollback();
-
-
- tm.begin();
-
- //And enlist again - the work should then be converted into the global tx branch
-
-
- // I have changed where this begin was originally set
- // as when you don't have a resource enlisted, XASessions will act as
- // non transacted + AutoAck
-
- tx = tm.getTransaction();
-
- tx.enlistResource(res);
-
+ tm.rollback();
+
//Then we receive the messages outside the global tx
-
+
TextMessage rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
TextMessage rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
Message rm3 = cons.receive(1000);
-
+
assertNull(rm3);
+
+ tm.begin();
+
+ //And enlist again - the work should then be converted into the global tx branch
+
+ tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
tx.delistResource(res, XAResource.TMSUCCESS);
-
+
//Now rollback the tx - this should cause redelivery of the two messages
tx.rollback();
-
+
rm1 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm1);
-
+
assertEquals("message1", rm1.getText());
-
+
rm2 = (TextMessage)cons.receive(1000);
-
+
assertNotNull(rm2);
-
+
assertEquals("message2", rm2.getText());
-
+
rm3 = cons.receive(1000);
-
+
assertNull(rm3);
}
finally
- {
+ {
if (conn != null)
{
conn.close();
}
-
+
if (xaConn != null)
{
xaConn.close();
@@ -1102,8 +604,11 @@
}
+
public void test2PCSendCommit1PCOptimization() throws Exception
- {
+ {
+ if (ServerManagement.isRemote()) return;
+
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -1161,6 +666,8 @@
public void test2PCSendCommit() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -1220,6 +727,8 @@
public void test2PCSendRollback1PCOptimization() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -1272,6 +781,8 @@
public void test2PCSendFailOnPrepare() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
try
@@ -1335,6 +846,8 @@
public void test2PCSendRollback() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
try
@@ -1387,6 +900,8 @@
public void test2PCReceiveCommit1PCOptimization() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -1462,6 +977,8 @@
public void test2PCReceiveCommit() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -1536,6 +1053,8 @@
public void test2PCReceiveRollback1PCOptimization() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -1614,7 +1133,9 @@
}
public void test2PCReceiveRollback() throws Exception
- {
+ {
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -1694,6 +1215,8 @@
public void test1PCSendCommit() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -1749,6 +1272,8 @@
public void test1PCSendRollback() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
try
@@ -1795,6 +1320,8 @@
public void test1PCReceiveCommit() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -1863,6 +1390,8 @@
public void test1PCReceiveRollback() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -1941,6 +1470,8 @@
public void testMultipleSessionsOneTxCommitAcknowledge1PCOptimization() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2017,6 +1548,8 @@
public void testMultipleSessionsOneTxCommitAcknowledge() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2092,6 +1625,8 @@
public void testMultipleSessionsOneTxRollbackAcknowledge1PCOptimization() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2197,6 +1732,8 @@
public void testMultipleSessionsOneTxRollbackAcknowledge() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2308,6 +1845,8 @@
public void testMultipleSessionsOneTxRollbackAcknowledgeForceFailureInCommit() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2415,6 +1954,8 @@
public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -2483,6 +2024,8 @@
public void testMultipleSessionsOneTxCommitSend() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -2554,6 +2097,8 @@
public void testMultipleSessionsOneTxRollbackSend1PCOptimization() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
//Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
@@ -2618,6 +2163,8 @@
public void testMultipleSessionsOneTxRollbackSend() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2683,6 +2230,8 @@
public void testOneSessionTwoTransactionsCommitAcknowledge() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2766,6 +2315,8 @@
public void testOneSessionTwoTransactionsRollbackAcknowledge() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2863,6 +2414,8 @@
public void testOneSessionTwoTransactionsCommitSend() throws Exception
{
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -2939,7 +2492,9 @@
public void testOneSessionTwoTransactionsRollbackSend() throws Exception
- {
+ {
+ if (ServerManagement.isRemote()) return;
+
XAConnection conn = null;
Connection conn2 = null;
@@ -3020,57 +2575,6 @@
// Inner classes --------------------------------------------------------------------------------
- static class DummyListener implements MessageListener
- {
-
- protected Logger log = Logger.getLogger(getClass());
-
- public ArrayList messages = new ArrayList();
-
- public void onMessage(Message message)
- {
- log.info("Message received on DummyListener " + message);
- messages.add(message);
- }
- }
-
- static class MockServerSessionPool implements ServerSessionPool
- {
- private ServerSession serverSession;
-
- MockServerSessionPool(Session sess)
- {
- serverSession = new MockServerSession(sess);
- }
-
- public ServerSession getServerSession() throws JMSException
- {
- return serverSession;
- }
- }
-
- static class MockServerSession implements ServerSession
- {
- Session session;
-
- MockServerSession(Session sess)
- {
- this.session = sess;
- }
-
-
- public Session getSession() throws JMSException
- {
- return session;
- }
-
- public void start() throws JMSException
- {
- session.run();
- }
-
- }
-
static class DummyXAResource implements XAResource
{
boolean failOnPrepare;
More information about the jboss-cvs-commits
mailing list