[rhmessaging-commits] rhmessaging commits: r4002 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Jun 2 08:35:44 EDT 2010


Author: rgemmell
Date: 2010-06-02 08:35:43 -0400 (Wed, 02 Jun 2010)
New Revision: 4002

Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Advance the cursor before notifying the recovery handler of a new QueueEntry to avoid DeadlockException if the handler then attempts to remove the QueueEntry from the store and is unable to to lock the record again. Signal QE recovery completion to prompt result logging.


Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-06-02 09:26:17 UTC (rev 4001)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2010-06-02 12:35:43 UTC (rev 4002)
@@ -704,9 +704,10 @@
             EntryBinding keyBinding = new QueueEntryTB();
 
             DatabaseEntry value = new DatabaseEntry();
-            EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
 
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            OperationStatus status = cursor.getNext(key, value, LockMode.RMW);
+            
+            while (status == OperationStatus.SUCCESS)
             {
 
                 QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
@@ -714,13 +715,12 @@
                 AMQShortString queueName = dd.getQueueName();
                 long messageId = dd.getMessageId();
                 
+                //Advance the cursor BEFORE passing the previous entry to the
+                //recovery handler. This is required in order to release the
+                //lock on the record in case the handler decides to remove it.
+                status = cursor.getNext(key, value, LockMode.RMW);
+                
                 qerh.queueEntry(queueName.asString(),messageId);
-
-//                if (_log.isDebugEnabled())
-//                {
-//                    _log.debug("On recovery, delivering Message ID:" + message.getMessageId() + " to " + queue.getName());
-//                }
-
             }
         }
         catch (DatabaseException e)
@@ -736,17 +736,7 @@
             }
         }
 
-//        if (_log.isInfoEnabled())
-//        {
-//            _log.info("Recovered message counts: " + _queueRecoveries);
-//        }
-//
-//        for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet())
-//        {
-//            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
-//
-//            CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
-//        }
+        qerh.completeQueueEntryRecovery();
     }
 
     /**



More information about the rhmessaging-commits mailing list