[rhmessaging-commits] rhmessaging commits: r1387 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Nov 29 07:38:22 EST 2007


Author: gordonsim
Date: 2007-11-29 07:38:22 -0500 (Thu, 29 Nov 2007)
New Revision: 1387

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
Log:
Slight improvement to the previous fix for deadlock avoidance: set cursor to message, then remove mapping, then if needed delete message from position of first cursor.



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-29 10:19:28 UTC (rev 1386)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-29 12:38:22 UTC (rev 1387)
@@ -1130,22 +1130,27 @@
 
 bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
 {
-    //need to get a lock on the messageDb in case we want to delete
-    //(to avoid deadlocks with enqueue that acquires locks in this
-    //order)
-    Dbt peek;
-    peek.set_flags(DB_DBT_USERMEM);
-    peek.set_ulen(0);
+    //First look up the message, this gets a lock on that table in
+    //case we need to delete it (avoiding deadlocks with enqueue where
+    //the locking order is messageDb then mappingDb)
+    Cursor msgCursor;
+    msgCursor.open(messageDb, txn);
+
     try {
-        int status = messageDb.get(txn, &messageId, &peek, DB_RMW);
-        if (status != DB_BUFFER_SMALL) {
-            THROW_STORE_EXCEPTION("Unexpected status code when peeking at message: " + string(DbEnv::strerror(status)));
+        Dbt peek;
+        peek.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+        peek.set_ulen(0);
+        int status = msgCursor->get(&messageId, &peek, DB_SET | DB_RMW);
+        if (status == DB_NOTFOUND ) {
+            THROW_STORE_EXCEPTION("Can't find record for message");
+        } else if (status != 0 && status != DB_BUFFER_SMALL) {
+            string e = "Dequeue failed (while seeking message) with unexpected status = ";
+            e += DbEnv::strerror(status);
+            THROW_STORE_EXCEPTION(e);
         }
     } catch (DbMemoryException& expected) {
-        //api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
     }
 
-
     Cursor cursor;
     cursor.open(mappingDb, txn);
 
@@ -1158,7 +1163,12 @@
         THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
     }
 
-    return deleteIfUnused(cursor, txn, messageId);
+    if (isUnused(cursor, messageId)) {
+        msgCursor->del(0);
+        return true;
+    } else {
+        return false;
+    }
 }
 
 u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
@@ -1177,15 +1187,24 @@
 
 bool BdbMessageStore::deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId)
 {
+    if (isUnused(cursor, messageId)) {
+        messageDb.del(txn, &messageId, 0);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool BdbMessageStore::isUnused(Cursor& cursor, Dbt& messageId)
+{
     Dbt empty;
     int status = cursor->get(&messageId, &empty, DB_SET);
     if (status == DB_NOTFOUND) {
-        messageDb.del(txn, &messageId, 0);
         return true;
     } else if (status == 0) {
         return false;
     } else {
-        THROW_STORE_EXCEPTION("Dequeue failed with status = " + status);
+        THROW_STORE_EXCEPTION("Dequeue failed (in isUnused()) with status = " + status);
     }
 }
 

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-11-29 10:19:28 UTC (rev 1386)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-11-29 12:38:22 UTC (rev 1387)
@@ -110,6 +110,7 @@
 			        const qpid::broker::PersistableQueue& queue);
             bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
             bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
+            bool isUnused(Cursor& cursor, Dbt& messageId);
             void destroy(Db& db, const qpid::broker::Persistable& p);
             bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
             void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);




More information about the rhmessaging-commits mailing list