[jboss-cvs] JBoss Messaging SVN: r2618 - in trunk: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 2 00:36:53 EDT 2007
Author: clebert.suconic at jboss.com
Date: 2007-05-02 00:36:53 -0400 (Wed, 02 May 2007)
New Revision: 2618
Modified:
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-946 - fixing session.recover behavior when non globalTransaction
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-02 01:01:33 UTC (rev 2617)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-05-02 04:36:53 UTC (rev 2618)
@@ -98,7 +98,7 @@
if (trace) { log.trace("handleClosing()"); }
//Sanity check
- if (state.isXA())
+ if (state.isXA() && !isConsideredNonTransacted(state))
{
if (trace) { log.trace("Session is XA"); }
@@ -128,7 +128,7 @@
//any deliveries - this is because the message listener might have closed
//before on message had finished executing
- if (ackMode == Session.AUTO_ACKNOWLEDGE)
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || isConsideredNonTransacted(state))
{
//Acknowledge or cancel any outstanding auto ack
@@ -250,9 +250,7 @@
}
// if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
// However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
- else if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- (state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
- (state.getDistinguishedListener() == null)))
+ else if (ackMode == Session.AUTO_ACKNOWLEDGE || isConsideredNonTransacted(state))
{
// We collect the single acknowledgement in the state.
@@ -309,9 +307,7 @@
// if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
// However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
- if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- (state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
- (state.getDistinguishedListener() == null)))
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || isConsideredNonTransacted(state))
{
// We auto acknowledge.
@@ -424,7 +420,7 @@
SessionState state = getState(invocation);
- if (state.isTransacted())
+ if (state.isTransacted() && !isConsideredNonTransacted(state))
{
throw new IllegalStateException("Cannot recover a transacted session");
}
@@ -443,8 +439,10 @@
state.setClientAckList(new ArrayList());
del.redeliver(dels);
+
+ state.setRecoverCalled(true);
}
- else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE || isConsideredNonTransacted(state))
{
DeliveryInfo info = state.getAutoAckInfo();
@@ -459,11 +457,12 @@
del.redeliver(redels);
state.setAutoAckInfo(null);
+
+ state.setRecoverCalled(true);
}
}
- state.setRecoverCalled(true);
-
+
return null;
}
@@ -606,6 +605,11 @@
SessionState state = getState(invocation);
Object txID = state.getCurrentTxId();
+ // If there is no GlobalTransaction we run it as local transacted
+ // as discussed at http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577
+ // http://jira.jboss.org/jira/browse/JBMESSAGING-946
+ // and
+ // http://jira.jboss.org/jira/browse/JBMESSAGING-410
if ((!state.isXA() && state.isTransacted()) || (state.isXA() && !(txID instanceof LocalTx)))
{
// the session is non-XA and transacted, or XA and enrolled in a global transaction, so
@@ -873,6 +877,17 @@
}
}
+ /** http://jira.jboss.org/jira/browse/JBMESSAGING-946 - To accomodate TCK and the MQ behavior
+ * we should behave as non transacted, AUTO_ACK when there is no transaction enlisted
+ * However when the Session is being used by ASF we should consider the case where
+ * we will convert LocalTX to GlobalTransactions.
+ * This function helper will ensure the condition that needs to be tested on this aspect
+ * */
+ private boolean isConsideredNonTransacted(SessionState state)
+ {
+ return state.isXA() && (state.getCurrentTxId() instanceof LocalTx) && (state.getDistinguishedListener() == null);
+ }
+
// Inner Classes -------------------------------------------------
private static class AsfMessageHolder
Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java 2007-05-02 01:01:33 UTC (rev 2617)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java 2007-05-02 04:36:53 UTC (rev 2618)
@@ -620,6 +620,89 @@
}
/*
+ * This test will:
+ * - Send two messages over a producer
+ * - Receive one message over a consumer created used a XASession
+ * - Call Recover
+ * - Receive the second message
+ * - The queue should be empty after that
+ * Verifies if messages are sent ok and ack properly when recovery is called
+ * NOTE: To accomodate TCK tests where Session/Consumers are being used without transaction enlisting
+ * we are processing those cases as nonTransactional/AutoACK, however if the session is being used
+ * to process MDBs we will consider the LocalTransaction convertion and process those as the comment above
+ * This was done as per: http://jira.jboss.org/jira/browse/JBMESSAGING-946
+ *
+ */
+ public void testRecoverOnXA() throws Exception
+ {
+ XAConnection xaconn = null;
+
+ try
+ {
+ ServerManagement.deployQueue("MyQueue2");
+
+ // send a message to the queue
+
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
+ Connection conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue);
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+ Message m = s.createTextMessage("one");
+ p.send(m);
+ m = s.createTextMessage("two");
+ p.send(m);
+ conn.close();
+
+ ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
+ Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(2, count.intValue());
+
+ XAConnectionFactory xacf = (XAConnectionFactory)cf;
+
+ xaconn = xacf.createXAConnection();
+
+ xaconn.start();
+
+ XASession xasession = xaconn.createXASession();
+
+ MessageConsumer consumer = xasession.createConsumer(queue);
+
+ TextMessage messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(messageReceived);
+
+ assertEquals("one", messageReceived.getText());
+
+ xasession.recover();
+
+ messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertEquals("two", messageReceived.getText());
+
+ consumer.close();
+
+ // I can't call xasession.close for this test as JCA layer would cache the session
+ // So.. keep this close commented!
+ //xasession.close();
+
+ count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+ assertEquals(0, count.intValue());
+
+
+ }
+ finally
+ {
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ ServerManagement.undeployQueue("MyQueue2");
+ }
+ }
+
+ /*
* If there is no global tx present messages consumed must consumed as if they were in a
* local tx. Note this behaviour differs from messages sent
* This is so we can support transacted delivery of messags in an MDB as mentioned
More information about the jboss-cvs-commits
mailing list