Author: rgemmell
Date: 2010-06-02 08:35:43 -0400 (Wed, 02 Jun 2010)
New Revision: 4002
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Advance the cursor before notifying the recovery handler of a new QueueEntry to avoid
DeadlockException if the handler then attempts to remove the QueueEntry from the store and
is unable to to lock the record again. Signal QE recovery completion to prompt result
logging.
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-02
09:26:17 UTC (rev 4001)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-06-02
12:35:43 UTC (rev 4002)
@@ -704,9 +704,10 @@
EntryBinding keyBinding = new QueueEntryTB();
DatabaseEntry value = new DatabaseEntry();
- EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ OperationStatus status = cursor.getNext(key, value, LockMode.RMW);
+
+ while (status == OperationStatus.SUCCESS)
{
QueueEntryKey dd = (QueueEntryKey) keyBinding.entryToObject(key);
@@ -714,13 +715,12 @@
AMQShortString queueName = dd.getQueueName();
long messageId = dd.getMessageId();
+ //Advance the cursor BEFORE passing the previous entry to the
+ //recovery handler. This is required in order to release the
+ //lock on the record in case the handler decides to remove it.
+ status = cursor.getNext(key, value, LockMode.RMW);
+
qerh.queueEntry(queueName.asString(),messageId);
-
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("On recovery, delivering Message ID:" +
message.getMessageId() + " to " + queue.getName());
-// }
-
}
}
catch (DatabaseException e)
@@ -736,17 +736,7 @@
}
}
-// if (_log.isInfoEnabled())
-// {
-// _log.info("Recovered message counts: " + _queueRecoveries);
-// }
-//
-// for(Map.Entry<AMQShortString,Integer> entry :
_queueRecoveries.entrySet())
-// {
-// CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_RECOVERED(entry.getValue(), String.valueOf(entry.getKey())));
-//
-// CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_RECOVERY_COMPLETE(String.valueOf(entry.getKey()), true));
-// }
+ qerh.completeQueueEntryRecovery();
}
/**