Author: kpvdr
Date: 2008-07-30 11:20:43 -0400 (Wed, 30 Jul 2008)
New Revision: 2234
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
Log:
Backport of trunk r.2233: Proposed fix for BZ457239: "Qpidd sometimes deadlocks on
queue_declare from Python". BDB is not always thread safe for certain operations.
Added a lock on queue create (the cause of this bug), added local transaction to
BdbMessageStore::getRecordSize() (used by BdbMessageStore::appendContent()) and
BdbMessageStore::getExternMessage() which were found by inspection to be vulnerable for
similar reasons.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-30 15:00:18 UTC (rev 2233)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-30 15:20:43 UTC (rev 2234)
@@ -359,6 +359,7 @@
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ":
create() failed: " + e.what());
}
try {
+ qpid::sys::Mutex::ScopedLock sl(bdbQueueLock);
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
@@ -776,23 +777,35 @@
uint64_t messageId,
unsigned&
headerSize)
{
+ RecoverableMessage::shared_ptr ret;
Dbt key (&messageId, sizeof(messageId));
size_t preamble_length = sizeof(u_int32_t); /*header size*/
BufferValue value(preamble_length, 0);
value.buffer.record();
- if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
- }
- //read header only to begin with
- headerSize = value.buffer.getLong();
- BufferValue header(headerSize, preamble_length);
- if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ TxnCtxt txn;
+ txn.begin(env);
+ try {
+ if (messageDb.get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
+ txn.abort();
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ }
+ //read header only to begin with
+ headerSize = value.buffer.getLong();
+
+ BufferValue header(headerSize, preamble_length);
+ if (messageDb.get(txn.get(), &key, &header, 0) == DB_NOTFOUND) {
+ txn.abort();
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ }
+ ret = recovery.recoverMessage(header.buffer);
+ txn.commit();
+ } catch (const DbException& e) {
+ txn.abort();
+ THROW_STORE_EXCEPTION("Unexpected BDB error in
BdbMessageStore::getExternMessage(): " + string(e.what()));
}
-
- return recovery.recoverMessage(header.buffer);
+ return ret;
}
int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-30 15:00:18 UTC (rev 2233)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-30 15:20:43 UTC (rev 2234)
@@ -115,6 +115,7 @@
static qpid::sys::Duration defJournalFlushTimeout;
qpid::management::Store::shared_ptr mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
+ qpid::sys::Mutex bdbQueueLock;
// Parameter validation and calculation
static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
Show replies by date