[hornetq-commits] JBoss hornetq SVN: r9169 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Apr 26 11:46:29 EDT 2010


Author: ataylor
Date: 2010-04-26 11:46:28 -0400 (Mon, 26 Apr 2010)
New Revision: 9169

Modified:
   trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
   trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
Log:
fixed ra to rollback with last message as delivered

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-04-26 14:23:46 UTC (rev 9168)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-04-26 15:46:28 UTC (rev 9169)
@@ -291,7 +291,7 @@
          {
             try
             {
-               session.rollback();
+               session.rollback(true);
             }
             catch (HornetQException e1)
             {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java	2010-04-26 14:23:46 UTC (rev 9168)
+++ trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java	2010-04-26 15:46:28 UTC (rev 9169)
@@ -14,6 +14,8 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
@@ -72,6 +74,98 @@
       Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
    }
 
+   public void testBasicSend2times() throws Exception
+   {
+      SimpleString dla = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setMaxDeliveryAttempts(2);
+      addressSettings.setDeadLetterAddress(dla);
+      server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      SimpleString dlq = new SimpleString("DLQ1");
+      clientSession.createQueue(dla, dlq, null, false);
+      clientSession.createQueue(qName, qName, null, false);
+      ClientProducer producer = clientSession.createProducer(qName);
+      producer.send(createTextMessage("heyho!", clientSession));
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      ClientMessage m = clientConsumer.receive(5000);
+      m.acknowledge();
+      Assert.assertNotNull(m);
+      Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+      // force a cancel
+      clientSession.rollback();
+      clientSession.start();
+      m = clientConsumer.receive(5000);
+      m.acknowledge();
+      Assert.assertNotNull(m);
+      Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+      // force a cancel
+      clientSession.rollback();
+      m = clientConsumer.receiveImmediate();
+      Assert.assertNull(m);
+      clientConsumer.close();
+      clientConsumer = clientSession.createConsumer(dlq);
+      m = clientConsumer.receive(5000);
+      Assert.assertNotNull(m);
+      Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+   }
+
+   public void testReceiveWithListeners() throws Exception
+   {
+      SimpleString dla = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setMaxDeliveryAttempts(2);
+      addressSettings.setDeadLetterAddress(dla);
+      server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      SimpleString dlq = new SimpleString("DLQ1");
+      clientSession.createQueue(dla, dlq, null, false);
+      clientSession.createQueue(qName, qName, null, false);
+      ClientProducer producer = clientSession.createProducer(qName);
+      producer.send(createTextMessage("heyho!", clientSession));
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      final CountDownLatch latch = new CountDownLatch(2);
+      TestHandler handler = new TestHandler(latch, clientSession);
+      clientConsumer.setMessageHandler(handler);
+      clientSession.start();
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      assertEquals(handler.count, 2);
+      clientConsumer = clientSession.createConsumer(dlq);
+      Message m = clientConsumer.receiveImmediate();
+      Assert.assertNotNull(m);
+      Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+   }
+
+   class  TestHandler implements MessageHandler
+   {
+      private CountDownLatch latch;
+      int count = 0;
+
+      private ClientSession clientSession;
+
+      public TestHandler(CountDownLatch latch, ClientSession clientSession)
+      {
+         this.latch = latch;
+         this.clientSession = clientSession;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         count++;
+         latch.countDown();
+         try
+         {
+            clientSession.rollback(true);
+         }
+         catch (HornetQException e)
+         {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+         }
+         throw new RuntimeException();
+      }
+   }
+
    public void testBasicSendToMultipleQueues() throws Exception
    {
       SimpleString dla = new SimpleString("DLA");



More information about the hornetq-commits mailing list