[hornetq-commits] JBoss hornetq SVN: r8556 - in trunk: examples/jms/transaction-failover/src/org/hornetq/jms/example and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 4 10:46:19 EST 2009


Author: jmesnil
Date: 2009-12-04 10:46:18 -0500 (Fri, 04 Dec 2009)
New Revision: 8556

Modified:
   trunk/examples/jms/transaction-failover/readme.html
   trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
modified transacted session behaviour after failover

* when a session commits after being flagged as rolled back only during failover, automatically
  rolls back the session before throwing the HornetQException.TRANSACTION_ROLLED_BACK
* updated transaction-failover example
* added test in FailoverTest

Modified: trunk/examples/jms/transaction-failover/readme.html
===================================================================
--- trunk/examples/jms/transaction-failover/readme.html	2009-12-04 15:46:13 UTC (rev 8555)
+++ trunk/examples/jms/transaction-failover/readme.html	2009-12-04 15:46:18 UTC (rev 8556)
@@ -14,7 +14,7 @@
      <p>When a <em>transacted</em> JMS session is used, once-and-only once delivery is guaranteed.</p>
      <ul>
         <li>if the failover occurs while there is an in-flight transaction, the transaction will be flagged as <em>rollback only</em>. In that case, the JMS client 
-           will need to rollback the session and retry the transaction work.</li>
+           will need to retry the transaction work.</li>
         <li>if the failover occurs while there is <em>no</em> in-flight transaction, the failover will be transparent to the user.</li>
      </ul>
      <p>HornetQ also provides an example for <a href="../non-transactional-failover/readme.html">non-transaction failover</a>.</p>
@@ -80,15 +80,10 @@
               session.commit();
            } catch (TransactionRolledBackException e)
            {
-              ...
+              System.err.println("transaction has been rolled back: " + e.getMessage());
            }
         </pre>
         
-        <li>In the <code>catch (TransactionRolledBackException e)</code> block, we rollback the session
-        <pre class="prettyprint">
-           session.rollback();
-        </pre>
-        
         <li>We resend all the messages</li>
         <pre class="prettyprint">
            sendMessages(session, producer, numMessages, false);

Modified: trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java
===================================================================
--- trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java	2009-12-04 15:46:13 UTC (rev 8555)
+++ trunk/examples/jms/transaction-failover/src/org/hornetq/jms/example/TransactionFailoverExample.java	2009-12-04 15:46:18 UTC (rev 8556)
@@ -72,23 +72,22 @@
          // Step 8. We send half of the messages, kill the live server and send the remaining messages
          sendMessages(session, producer, numMessages, true);
 
+         // Step 9. As failover occured during transaction, the session has been marked for rollback only
          try
          {
-            // Step 9. As failover occured during transaction, the session has been marked for rollback only
             session.commit();
          } catch (TransactionRolledBackException e)
          {
-            // Step 10. We rollback the transaction
-            session.rollback();
+            System.err.println("transaction has been rolled back: " + e.getMessage());
          }
          
-         // Step 11. We resend all the messages
+         // Step 10. We resend all the messages
          sendMessages(session, producer, numMessages, false);
-         // Step 12. We commit the session succesfully: the messages will be all delivered to the activated backup server
+         // Step 11. We commit the session succesfully: the messages will be all delivered to the activated backup server
          session.commit();
 
 
-         // Step 13. We are now transparently reconnected to server #0, the backup server.
+         // Step 12. We are now transparently reconnected to server #0, the backup server.
          // We consume the messages sent before the crash of the live server and commit the session.
          for (int i = 0; i < numMessages; i++)
          {
@@ -102,7 +101,7 @@
       }
       finally
       {
-         // Step 14. Be sure to close our resources!
+         // Step 13. Be sure to close our resources!
 
          if (connection != null)
          {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-04 15:46:13 UTC (rev 8555)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-04 15:46:18 UTC (rev 8556)
@@ -461,6 +461,8 @@
 
       if (rollbackOnly)
       {
+         rollback(false);
+         
          throw new HornetQException(TRANSACTION_ROLLED_BACK,
                                     "The transaction was rolled back on failover to a backup server");
       }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-12-04 15:46:13 UTC (rev 8555)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-12-04 15:46:18 UTC (rev 8556)
@@ -13,6 +13,8 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
+import static org.hornetq.tests.util.RandomUtil.randomInt;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -42,6 +44,7 @@
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -328,6 +331,90 @@
       assertEquals(0, sf.numConnections());
    }
 
+   /**
+    * Test that once the transacted session has throw a TRANSACTION_ROLLED_BACK exception,
+    * it can be reused again
+    */
+   public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
+   {
+      ClientSessionFactoryInternal sf = getSessionFactory();
+
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnPersistentSend(true);
+
+      ClientSession session = sf.createSession(false, false);
+
+      session.createQueue(ADDRESS, ADDRESS, null, true);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener extends BaseListener
+      {
+         public void connectionFailed(HornetQException me)
+         {
+            latch.countDown();
+         }
+
+      }
+
+      session.addFailureListener(new MyListener());
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(i % 2 == 0);
+
+         setBody(i, message);
+
+         message.putIntProperty("counter", i);
+
+         producer.send(message);
+      }
+
+      fail(session, latch);
+
+      assertTrue(session.isRollbackOnly());
+
+      try
+      {
+         session.commit();
+
+         fail("Should throw exception");
+      }
+      catch (HornetQException e)
+      {
+         assertEquals(HornetQException.TRANSACTION_ROLLED_BACK, e.getCode());
+      }
+      
+      ClientMessage message = session.createClientMessage(false);
+      int counter = randomInt();
+      message.putIntProperty("counter", counter);
+
+      producer.send(message);
+
+      // session is working again
+      session.commit();
+      
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      message = consumer.receiveImmediate();
+
+      assertNotNull(message);
+      assertEquals(counter, message.getIntProperty("counter").intValue());
+      
+      session.close();
+
+      assertEquals(0, sf.numSessions());
+
+      assertEquals(0, sf.numConnections());
+   }
    public void testTransactedMessagesNotSentSoNoRollback() throws Exception
    {
       ClientSessionFactoryInternal sf = getSessionFactory();
@@ -409,7 +496,7 @@
       assertEquals(0, sf.numConnections());
    }
 
-   public void testTransactedMessagesWithConsumerStartedBedoreFailover() throws Exception
+   public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
    {
       ClientSessionFactoryInternal sf = getSessionFactory();
 



More information about the hornetq-commits mailing list