Author: kpvdr
Date: 2008-07-30 11:00:18 -0400 (Wed, 30 Jul 2008)
New Revision: 2233
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Proposed fix for BZ457239: "Qpidd sometimes deadlocks on queue_declare from
Python". BDB is not always thread safe for certain operations. Added a lock on queue
create (the cause of this bug), added local transaction to
BdbMessageStore::getRecordSize() (used by BdbMessageStore::appendContent()) and
BdbMessageStore::getExternMessage() which were found by inspection to be vulnerable for
similar reasons.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-29 20:13:17 UTC (rev 2232)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-30 15:00:18 UTC (rev 2233)
@@ -373,6 +373,7 @@
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ":
create() failed: " + e.what());
}
try {
+ qpid::sys::Mutex::ScopedLock sl(bdbQueueLock);
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
@@ -802,23 +803,36 @@
uint64_t messageId,
unsigned&
headerSize)
{
+ RecoverableMessage::shared_ptr ret;
Dbt key (&messageId, sizeof(messageId));
size_t preamble_length = sizeof(u_int32_t); /*header size*/
BufferValue value(preamble_length, 0);
value.buffer.record();
- if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
- }
- //read header only to begin with
- headerSize = value.buffer.getLong();
- BufferValue header(headerSize, preamble_length);
- if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ TxnCtxt txn;
+ txn.begin(env);
+ try {
+ if (messageDb.get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
+ txn.abort();
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ }
+
+ //read header only to begin with
+ headerSize = value.buffer.getLong();
+
+ BufferValue header(headerSize, preamble_length);
+ if (messageDb.get(txn.get(), &key, &header, 0) == DB_NOTFOUND) {
+ txn.abort();
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ }
+ ret = recovery.recoverMessage(header.buffer);
+ txn.commit();
+ } catch (const DbException& e) {
+ txn.abort();
+ THROW_STORE_EXCEPTION("Unexpected BDB error in
BdbMessageStore::getExternMessage(): " + string(e.what()));
}
-
- return recovery.recoverMessage(header.buffer);
+ return ret;
}
int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
@@ -963,7 +977,17 @@
u_int64_t BdbMessageStore::getRecordSize(Db& db,
Dbt& key)
{
- return getRecordSize(0, db, key);
+ u_int64_t ret = 0;
+ TxnCtxt txn;
+ txn.begin(env);
+ try {
+ ret = getRecordSize(txn.get(), db, key);
+ txn.commit();
+ } catch (const std::exception& e) {
+ txn.abort();
+ throw;
+ }
+ return ret;
}
u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn,
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-29 20:13:17 UTC (rev 2232)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-30 15:00:18 UTC (rev 2233)
@@ -115,6 +115,7 @@
static qpid::sys::Duration defJournalFlushTimeout;
qpid::management::Store* mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
+ qpid::sys::Mutex bdbQueueLock;
// Parameter validation and calculation
static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,