[hornetq-commits] JBoss hornetq SVN: r11805 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 1 10:59:23 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-01 10:59:22 -0500 (Thu, 01 Dec 2011)
New Revision: 11805

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-7606 - proper exception handling on ACK

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-01 10:18:48 UTC (rev 11804)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-01 15:59:22 UTC (rev 11805)
@@ -21,6 +21,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -586,7 +587,7 @@
       return messageQueue;
    }
 
-   public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
+   public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
@@ -595,34 +596,73 @@
 
       // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
       // acknowledged
-
-      MessageReference ref;
-      do
+      
+      // We use a transaction here as if the message is not found, we should rollback anything done
+      // This could eventually happen on retries during transactions, and we need to make sure we don't ACK things we are not supposed to acknowledge
+      
+      boolean startedTransaction = false;
+      
+      if (tx == null || autoCommitAcks)
       {
-         ref = deliveringRefs.poll();
-
-         if (ref == null)
+         startedTransaction = true;
+         tx = new TransactionImpl(storageManager);
+      }
+      
+      try
+      {
+   
+         MessageReference ref;
+         do
          {
-            throw new IllegalStateException(System.identityHashCode(this) + " Could not find reference on consumerID=" +
-                                            id +
-                                            ", messageId = " +
-                                            messageID +
-                                            " queue = " +
-                                            messageQueue.getName() +
-                                            " closed = " +
-                                            closed);
+            ref = deliveringRefs.poll();
+   
+            if (ref == null)
+            {
+               
+               HornetQException e = new HornetQException(HornetQException.ILLEGAL_STATE, "Could not find reference on consumerID=" +
+                                id +
+                                ", messageId = " +
+                                messageID +
+                                " queue = " +
+                                messageQueue.getName());
+               throw e;
+            }
+   
+            ref.getQueue().acknowledge(tx, ref);
          }
-
-         if (autoCommitAcks || tx == null)
+         while (ref.getMessage().getMessageID() != messageID);
+         
+         if (startedTransaction)
          {
-            ref.getQueue().acknowledge(ref);
+            tx.commit();
          }
+      }
+      catch (HornetQException e)
+      {
+         if (startedTransaction)
+         {
+            tx.rollback();
+         }
          else
          {
-            ref.getQueue().acknowledge(tx, ref);
+            tx.markAsRollbackOnly(e);
          }
+         throw e;
       }
-      while (ref.getMessage().getMessageID() != messageID);
+      catch (Throwable e)
+      {
+         log.error(e.getMessage(), e);
+         HornetQException hqex = new HornetQException(HornetQException.ILLEGAL_STATE, e.getMessage());
+         if (startedTransaction)
+         {
+            tx.rollback();
+         }
+         else
+         {
+            tx.markAsRollbackOnly(hqex);
+         }
+         throw hqex;
+      }
    }
 
    public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-12-01 10:18:48 UTC (rev 11804)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-12-01 15:59:22 UTC (rev 11805)
@@ -586,6 +586,11 @@
    public void acknowledge(final long consumerID, final long messageID) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);
+      
+      if (consumer == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Consumer " + consumerID + " wasn't created on the server");
+      }
 
       consumer.acknowledge(autoCommitAcks, tx, messageID);
    }



More information about the hornetq-commits mailing list