Author: gordonsim
Date: 2007-11-29 07:38:22 -0500 (Thu, 29 Nov 2007)
New Revision: 1387
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Slight improvement to the previous fix for deadlock avoidance: set cursor to message, then
remove mapping, then if needed delete message from position of first cursor.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 10:19:28 UTC (rev 1386)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 12:38:22 UTC (rev 1387)
@@ -1130,22 +1130,27 @@
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
{
- //need to get a lock on the messageDb in case we want to delete
- //(to avoid deadlocks with enqueue that acquires locks in this
- //order)
- Dbt peek;
- peek.set_flags(DB_DBT_USERMEM);
- peek.set_ulen(0);
+ //First look up the message, this gets a lock on that table in
+ //case we need to delete it (avoiding deadlocks with enqueue where
+ //the locking order is messageDb then mappingDb)
+ Cursor msgCursor;
+ msgCursor.open(messageDb, txn);
+
try {
- int status = messageDb.get(txn, &messageId, &peek, DB_RMW);
- if (status != DB_BUFFER_SMALL) {
- THROW_STORE_EXCEPTION("Unexpected status code when peeking at message:
" + string(DbEnv::strerror(status)));
+ Dbt peek;
+ peek.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ peek.set_ulen(0);
+ int status = msgCursor->get(&messageId, &peek, DB_SET | DB_RMW);
+ if (status == DB_NOTFOUND ) {
+ THROW_STORE_EXCEPTION("Can't find record for message");
+ } else if (status != 0 && status != DB_BUFFER_SMALL) {
+ string e = "Dequeue failed (while seeking message) with unexpected
status = ";
+ e += DbEnv::strerror(status);
+ THROW_STORE_EXCEPTION(e);
}
} catch (DbMemoryException& expected) {
- //api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
}
-
Cursor cursor;
cursor.open(mappingDb, txn);
@@ -1158,7 +1163,12 @@
THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
}
- return deleteIfUnused(cursor, txn, messageId);
+ if (isUnused(cursor, messageId)) {
+ msgCursor->del(0);
+ return true;
+ } else {
+ return false;
+ }
}
u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue&
/*queue*/)
@@ -1177,15 +1187,24 @@
bool BdbMessageStore::deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId)
{
+ if (isUnused(cursor, messageId)) {
+ messageDb.del(txn, &messageId, 0);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool BdbMessageStore::isUnused(Cursor& cursor, Dbt& messageId)
+{
Dbt empty;
int status = cursor->get(&messageId, &empty, DB_SET);
if (status == DB_NOTFOUND) {
- messageDb.del(txn, &messageId, 0);
return true;
} else if (status == 0) {
return false;
} else {
- THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
+ THROW_STORE_EXCEPTION("Dequeue failed (in isUnused()) with status = " +
status);
}
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-29 10:19:28 UTC (rev 1386)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-29 12:38:22 UTC (rev 1387)
@@ -110,6 +110,7 @@
const qpid::broker::PersistableQueue& queue);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
+ bool isUnused(Cursor& cursor, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
bool create(Db& db, IdSequence& seq, const
qpid::broker::Persistable& p);
void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool
commit);