[hornetq-commits] JBoss hornetq SVN: r9169 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Apr 26 11:46:29 EDT 2010
Author: ataylor
Date: 2010-04-26 11:46:28 -0400 (Mon, 26 Apr 2010)
New Revision: 9169
Modified:
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
Log:
fixed ra to rollback with last message as delivered
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-04-26 14:23:46 UTC (rev 9168)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-04-26 15:46:28 UTC (rev 9169)
@@ -291,7 +291,7 @@
{
try
{
- session.rollback();
+ session.rollback(true);
}
catch (HornetQException e1)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2010-04-26 14:23:46 UTC (rev 9168)
+++ trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2010-04-26 15:46:28 UTC (rev 9169)
@@ -14,6 +14,8 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -72,6 +74,98 @@
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
}
+ public void testBasicSend2times() throws Exception
+ {
+ SimpleString dla = new SimpleString("DLA");
+ SimpleString qName = new SimpleString("q1");
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setMaxDeliveryAttempts(2);
+ addressSettings.setDeadLetterAddress(dla);
+ server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+ SimpleString dlq = new SimpleString("DLQ1");
+ clientSession.createQueue(dla, dlq, null, false);
+ clientSession.createQueue(qName, qName, null, false);
+ ClientProducer producer = clientSession.createProducer(qName);
+ producer.send(createTextMessage("heyho!", clientSession));
+ clientSession.start();
+ ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+ ClientMessage m = clientConsumer.receive(5000);
+ m.acknowledge();
+ Assert.assertNotNull(m);
+ Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+ // force a cancel
+ clientSession.rollback();
+ clientSession.start();
+ m = clientConsumer.receive(5000);
+ m.acknowledge();
+ Assert.assertNotNull(m);
+ Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+ // force a cancel
+ clientSession.rollback();
+ m = clientConsumer.receiveImmediate();
+ Assert.assertNull(m);
+ clientConsumer.close();
+ clientConsumer = clientSession.createConsumer(dlq);
+ m = clientConsumer.receive(5000);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+ }
+
+ public void testReceiveWithListeners() throws Exception
+ {
+ SimpleString dla = new SimpleString("DLA");
+ SimpleString qName = new SimpleString("q1");
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setMaxDeliveryAttempts(2);
+ addressSettings.setDeadLetterAddress(dla);
+ server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+ SimpleString dlq = new SimpleString("DLQ1");
+ clientSession.createQueue(dla, dlq, null, false);
+ clientSession.createQueue(qName, qName, null, false);
+ ClientProducer producer = clientSession.createProducer(qName);
+ producer.send(createTextMessage("heyho!", clientSession));
+ ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+ final CountDownLatch latch = new CountDownLatch(2);
+ TestHandler handler = new TestHandler(latch, clientSession);
+ clientConsumer.setMessageHandler(handler);
+ clientSession.start();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertEquals(handler.count, 2);
+ clientConsumer = clientSession.createConsumer(dlq);
+ Message m = clientConsumer.receiveImmediate();
+ Assert.assertNotNull(m);
+ Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+ }
+
+ class TestHandler implements MessageHandler
+ {
+ private CountDownLatch latch;
+ int count = 0;
+
+ private ClientSession clientSession;
+
+ public TestHandler(CountDownLatch latch, ClientSession clientSession)
+ {
+ this.latch = latch;
+ this.clientSession = clientSession;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ count++;
+ latch.countDown();
+ try
+ {
+ clientSession.rollback(true);
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ throw new RuntimeException();
+ }
+ }
+
public void testBasicSendToMultipleQueues() throws Exception
{
SimpleString dla = new SimpleString("DLA");
More information about the hornetq-commits
mailing list