Author: clebert.suconic(a)jboss.com
Date: 2011-12-01 10:59:22 -0500 (Thu, 01 Dec 2011)
New Revision: 11805
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-7606 - proper exception handling on ACK
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-01
10:18:48 UTC (rev 11804)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-01
15:59:22 UTC (rev 11805)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -586,7 +587,7 @@
return messageQueue;
}
- public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long
messageID) throws Exception
+ public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long
messageID) throws Exception
{
if (browseOnly)
{
@@ -595,34 +596,73 @@
// Acknowledge acknowledges all refs delivered by the consumer up to and including
the one explicitly
// acknowledged
-
- MessageReference ref;
- do
+
+ // We use a transaction here as if the message is not found, we should rollback
anything done
+ // This could eventually happen on retries during transactions, and we need to make
sure we don't ACK things we are not supposed to acknowledge
+
+ boolean startedTransaction = false;
+
+ if (tx == null || autoCommitAcks)
{
- ref = deliveringRefs.poll();
-
- if (ref == null)
+ startedTransaction = true;
+ tx = new TransactionImpl(storageManager);
+ }
+
+ try
+ {
+
+ MessageReference ref;
+ do
{
- throw new IllegalStateException(System.identityHashCode(this) + " Could
not find reference on consumerID=" +
- id +
- ", messageId = " +
- messageID +
- " queue = " +
- messageQueue.getName() +
- " closed = " +
- closed);
+ ref = deliveringRefs.poll();
+
+ if (ref == null)
+ {
+
+ HornetQException e = new HornetQException(HornetQException.ILLEGAL_STATE,
"Could not find reference on consumerID=" +
+ id +
+ ", messageId = " +
+ messageID +
+ " queue = " +
+ messageQueue.getName());
+ throw e;
+ }
+
+ ref.getQueue().acknowledge(tx, ref);
}
-
- if (autoCommitAcks || tx == null)
+ while (ref.getMessage().getMessageID() != messageID);
+
+ if (startedTransaction)
{
- ref.getQueue().acknowledge(ref);
+ tx.commit();
}
+ }
+ catch (HornetQException e)
+ {
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
else
{
- ref.getQueue().acknowledge(tx, ref);
+ tx.markAsRollbackOnly(e);
}
+ throw e;
}
- while (ref.getMessage().getMessageID() != messageID);
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ HornetQException hqex = new HornetQException(HornetQException.ILLEGAL_STATE,
e.getMessage());
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
+ else
+ {
+ tx.markAsRollbackOnly(hqex);
+ }
+ throw hqex;
+ }
}
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx,
final long messageID) throws Exception
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-12-01
10:18:48 UTC (rev 11804)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-12-01
15:59:22 UTC (rev 11805)
@@ -586,6 +586,11 @@
public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
+
+ if (consumer == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Consumer "
+ consumerID + " wasn't created on the server");
+ }
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
Show replies by date