Author: clebert.suconic(a)jboss.com
Date: 2011-12-01 13:03:10 -0500 (Thu, 01 Dec 2011)
New Revision: 11806
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
Log:
JBPAPP-7606 - proper exception handling on ACK
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-01
15:59:22 UTC (rev 11805)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
@@ -2328,6 +2328,10 @@
for (MessageReference ref : refsToAck)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace("rolling back " + ref);
+ }
try
{
if (ref.getQueue().checkRedelivery(ref, timeBase))
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-01
15:59:22 UTC (rev 11805)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-01
18:03:10 UTC (rev 11806)
@@ -615,6 +615,11 @@
do
{
ref = deliveringRefs.poll();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("ACKing ref " + ref + " on " + this);
+ }
if (ref == null)
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java 2011-12-01
15:59:22 UTC (rev 11805)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/AcknowledgeTest.java 2011-12-01
18:03:10 UTC (rev 11806)
@@ -19,7 +19,14 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -82,7 +89,109 @@
}
}
}
+
+
+ /**
+ * This is validating a case where a consumer will try to ack a message right after
failover, but the consumer at the target server didn't
+ * receive the message yet.
+ * on that case the system should rollback any acks done and redeliver any messages
+ */
+ public void testInvalidACK() throws Exception
+ {
+ HornetQServer server = createServer(false);
+ try
+ {
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setAckBatchSize(0);
+
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory cf = locator.createSessionFactory();
+
+
+ int numMessages = 100;
+
+ ClientSession sessionConsumer = cf.createSession(true, true, 0);
+
+ sessionConsumer.start();
+
+ sessionConsumer.createQueue(addressA, queueA, true);
+
+ ClientConsumer consumer = sessionConsumer.createConsumer(queueA);
+ // sending message
+ {
+ ClientSession sendSession = cf.createSession(false, true, true);
+
+ ClientProducer cp = sendSession.createProducer(addressA);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = sendSession.createMessage(true);
+ msg.putIntProperty("seq", i);
+ cp.send(msg);
+ }
+
+ sendSession.close();
+ }
+
+ {
+
+ ClientMessage msg = consumer.receive(5000);
+
+ // need to way some time before all the possible references are sent to the
consumer
+ // as we need to guarantee the order on cancellation on this test
+ Thread.sleep(1000);
+
+ try
+ {
+ // pretending to be an unbehaved client doing an invalid ack right after
failover
+ ((ClientSessionInternal)sessionConsumer).acknowledge(0, 12343);
+ fail("supposed to throw an exception here");
+ }
+ catch (Exception e)
+ {
+ }
+
+ try
+ {
+ // pretending to be an unbehaved client doing an invalid ack right after
failover
+ ((ClientSessionInternal)sessionConsumer).acknowledge(3, 12343);
+ fail("supposed to throw an exception here");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ consumer.close();
+
+ consumer = sessionConsumer.createConsumer(queueA);
+
+
+ for (int i = 0 ; i < numMessages; i++)
+ {
+ msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("seq").intValue());
+ msg.acknowledge();
+ }
+ }
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void testAsyncConsumerNoAck() throws Exception
{
HornetQServer server = createServer(false);