[jboss-cvs] JBoss Messaging SVN: r2618 - in trunk: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 2 00:36:53 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-05-02 00:36:53 -0400 (Wed, 02 May 2007)
New Revision: 2618

Modified:
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-946 - fixing session.recover behavior when non globalTransaction

Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-05-02 01:01:33 UTC (rev 2617)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2007-05-02 04:36:53 UTC (rev 2618)
@@ -98,7 +98,7 @@
       if (trace) { log.trace("handleClosing()"); }
 
       //Sanity check
-      if (state.isXA())
+      if (state.isXA() && !isConsideredNonTransacted(state))
       {
          if (trace) { log.trace("Session is XA"); }
          
@@ -128,7 +128,7 @@
       //any deliveries - this is because the message listener might have closed
       //before on message had finished executing
       
-      if (ackMode == Session.AUTO_ACKNOWLEDGE)
+      if (ackMode == Session.AUTO_ACKNOWLEDGE || isConsideredNonTransacted(state))
       {
          //Acknowledge or cancel any outstanding auto ack
          
@@ -250,9 +250,7 @@
       }
       // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
       // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
-      else if (ackMode == Session.AUTO_ACKNOWLEDGE ||
-               (state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
-                  (state.getDistinguishedListener() == null)))
+      else if (ackMode == Session.AUTO_ACKNOWLEDGE || isConsideredNonTransacted(state))
       {
          // We collect the single acknowledgement in the state. 
                            
@@ -309,9 +307,7 @@
 
       // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
       // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
-      if (ackMode == Session.AUTO_ACKNOWLEDGE ||
-         (state.isXA() && (state.getCurrentTxId() instanceof LocalTx) &&
-                  (state.getDistinguishedListener() == null)))
+      if (ackMode == Session.AUTO_ACKNOWLEDGE || isConsideredNonTransacted(state))
       {
          // We auto acknowledge.
 
@@ -424,7 +420,7 @@
             
       SessionState state = getState(invocation);
       
-      if (state.isTransacted())
+      if (state.isTransacted() && !isConsideredNonTransacted(state))
       {
          throw new IllegalStateException("Cannot recover a transacted session");
       }
@@ -443,8 +439,10 @@
          state.setClientAckList(new ArrayList());
          
          del.redeliver(dels);
+
+         state.setRecoverCalled(true);
       }
-      else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+      else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE || isConsideredNonTransacted(state))
       {
          DeliveryInfo info = state.getAutoAckInfo();
          
@@ -459,11 +457,12 @@
             del.redeliver(redels);
             
             state.setAutoAckInfo(null);            
+
+            state.setRecoverCalled(true);
          }
       }   
         
-      state.setRecoverCalled(true);
-      
+
       return null;  
    }
    
@@ -606,6 +605,11 @@
       SessionState state = getState(invocation);
       Object txID = state.getCurrentTxId();
 
+      // If there is no GlobalTransaction we run it as local transacted
+      // as discussed at http://www.jboss.com/index.html?module=bb&op=viewtopic&t=98577
+      // http://jira.jboss.org/jira/browse/JBMESSAGING-946
+      // and
+      // http://jira.jboss.org/jira/browse/JBMESSAGING-410
       if ((!state.isXA() && state.isTransacted()) || (state.isXA() && !(txID instanceof LocalTx)))
       {
          // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
@@ -873,6 +877,17 @@
       }
    }
 
+   /** http://jira.jboss.org/jira/browse/JBMESSAGING-946 - To accomodate TCK and the MQ behavior
+    *    we should behave as non transacted, AUTO_ACK when there is no transaction enlisted
+    *    However when the Session is being used by ASF we should consider the case where
+    *    we will convert LocalTX to GlobalTransactions.
+    *    This function helper will ensure the condition that needs to be tested on this aspect
+    * */
+   private boolean isConsideredNonTransacted(SessionState state)
+   {
+      return state.isXA() && (state.getCurrentTxId() instanceof LocalTx) && (state.getDistinguishedListener() == null);
+   }
+
    // Inner Classes -------------------------------------------------
    
    private static class AsfMessageHolder

Modified: trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java	2007-05-02 01:01:33 UTC (rev 2617)
+++ trunk/tests/src/org/jboss/test/messaging/jms/XATestBase.java	2007-05-02 04:36:53 UTC (rev 2618)
@@ -620,6 +620,89 @@
    }
 
    /*
+    *   This test will:
+    *     - Send two messages over a producer
+    *     - Receive one message over a consumer created used a XASession
+    *     - Call Recover
+    *     - Receive the second message
+    *     - The queue should be empty after that 
+    *   Verifies if messages are sent ok and ack properly when recovery is called
+    *      NOTE: To accomodate TCK tests where Session/Consumers are being used without transaction enlisting
+    *            we are processing those cases as nonTransactional/AutoACK, however if the session is being used
+    *            to process MDBs we will consider the LocalTransaction convertion and process those as the comment above
+    *            This was done as per: http://jira.jboss.org/jira/browse/JBMESSAGING-946
+    *
+    */
+   public void testRecoverOnXA() throws Exception
+   {
+      XAConnection xaconn = null;
+
+      try
+      {
+         ServerManagement.deployQueue("MyQueue2");
+
+         // send a message to the queue
+
+         ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+         Queue queue = (Queue)initialContext.lookup("queue/MyQueue2");
+         Connection conn = cf.createConnection();
+         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer p = s.createProducer(queue);
+         p.setDeliveryMode(DeliveryMode.PERSISTENT);
+         Message m = s.createTextMessage("one");
+         p.send(m);
+         m = s.createTextMessage("two");
+         p.send(m);
+         conn.close();
+
+         ObjectName queueMBean = new ObjectName("jboss.messaging.destination:service=Queue,name=MyQueue2");
+         Integer count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+         assertEquals(2, count.intValue());
+
+         XAConnectionFactory xacf = (XAConnectionFactory)cf;
+
+         xaconn = xacf.createXAConnection();
+
+         xaconn.start();
+
+         XASession xasession = xaconn.createXASession();
+
+         MessageConsumer consumer = xasession.createConsumer(queue);
+
+         TextMessage messageReceived = (TextMessage)consumer.receive(1000);
+
+         assertNotNull(messageReceived);
+
+         assertEquals("one", messageReceived.getText());
+
+         xasession.recover();
+
+         messageReceived = (TextMessage)consumer.receive(1000);
+
+         assertEquals("two", messageReceived.getText());
+
+         consumer.close();
+
+         // I can't call xasession.close for this test as JCA layer would cache the session
+         // So.. keep this close commented!
+         //xasession.close();
+
+         count = (Integer)ServerManagement.getAttribute(queueMBean, "MessageCount");
+         assertEquals(0, count.intValue());
+
+
+      }
+      finally
+      {
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+         ServerManagement.undeployQueue("MyQueue2");
+      }
+   }
+
+   /*
     * If there is no global tx present messages consumed must consumed as if they were in a
     * local tx. Note this behaviour differs from messages sent
     * This is so we can support transacted delivery of messags in an MDB as mentioned




More information about the jboss-cvs-commits mailing list