[rhmessaging-commits] rhmessaging commits: r2238 - store/branches/mrg-1.0/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Jul 30 16:28:39 EDT 2008


Author: kpvdr
Date: 2008-07-30 16:28:39 -0400 (Wed, 30 Jul 2008)
New Revision: 2238

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
Log:
Backports of trunk r.2236,2237: Improvements on fix for BZ457239, also tightening up on exception handling around transactions in BdbMessageStore. Also added txn to one addtional call omitted on previouis fixes.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-30 20:09:46 UTC (rev 2237)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-30 20:28:39 UTC (rev 2238)
@@ -249,7 +249,7 @@
     } catch (const DbException& e) {
         txn.abort();
         THROW_STORE_EXCEPTION_2("Error opening databases", e);
-    } catch (const std::exception& e) {
+    } catch (...) {
         txn.abort();
         throw;
     }
@@ -359,7 +359,6 @@
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
     }
     try {
-        qpid::sys::Mutex::ScopedLock sl(bdbQueueLock);
         if (!create(queueDb, queueIdSequence, queue)) {
             THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
         }
@@ -435,7 +434,16 @@
     Dbt key(&id, sizeof(id));
     BufferValue value (p);
 
-    int status = db.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT);
+    int status;
+    TxnCtxt txn;
+    txn.begin(env, true);
+    try {
+        status = db.put(txn.get(), &key, &value, DB_NOOVERWRITE);
+        txn.commit();
+    } catch (...) {
+        txn.abort();
+        throw;
+    }
     if (status == DB_KEYEXIST) {
         return false;
     } else {
@@ -461,8 +469,13 @@
     BindingDbt value(e, q, k, a);
     TxnCtxt txn;
     txn.begin(env, true);
-    put(bindingDb, txn.get(), key, value);
-    txn.commit();
+    try {
+        put(bindingDb, txn.get(), key, value);
+        txn.commit();
+    } catch (...) {
+        txn.abort();
+        throw;
+    }
 }
 
 void BdbMessageStore::unbind(const PersistableExchange& e,
@@ -498,10 +511,12 @@
         recoverGeneral(txn, registry);
 
         txn.commit();
-
     } catch (const DbException& e) {
         txn.abort();
         THROW_STORE_EXCEPTION_2("Error on recovery", e);
+    } catch (...) {
+        txn.abort();
+        throw;
     }
 
     //recover transactions:
@@ -785,7 +800,7 @@
     value.buffer.record();
 
     TxnCtxt txn;
-    txn.begin(env);
+    txn.begin(env, true);
     try {
         if (messageDb.get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
             txn.abort();
@@ -804,6 +819,9 @@
     } catch (const DbException& e) {
         txn.abort();
         THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + string(e.what()));
+    } catch (...) {
+        txn.abort();
+        throw;
     }
     return ret;
 }
@@ -922,7 +940,7 @@
             store(NULL, &txn, key, msg, true);
             msg->setPersistenceId(messageId);
             txn.commit();
-        } catch (const std::exception& e) {
+        } catch (...) {
             txn.abort();
             throw;
         }
@@ -943,6 +961,9 @@
         } catch (const DbException& e) {
             txn.abort();
             THROW_STORE_EXCEPTION_2("Error destroying message", e);
+        } catch (...) {
+            txn.abort();
+            throw;
         }
     }
 }
@@ -950,7 +971,17 @@
 u_int64_t BdbMessageStore::getRecordSize(Db& db,
                                          Dbt& key)
 {
-    return getRecordSize(0, db, key);
+    u_int64_t ret = 0;
+    TxnCtxt txn;
+    txn.begin(env, true);
+    try {
+        ret = getRecordSize(txn.get(), db, key);
+        txn.commit();
+    } catch (...) {
+        txn.abort();
+        throw;
+    }
+    return ret;
 }
 
 u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn,
@@ -977,6 +1008,8 @@
     checkInit();
     u_int64_t messageId (msg->getPersistenceId());
     if (messageId != 0) {
+        TxnCtxt txn;
+        txn.begin(env, true);
         try {
             Dbt key (&messageId, sizeof(messageId));
             u_int64_t offset = getRecordSize(messageDb, key);
@@ -992,9 +1025,14 @@
             value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
             value.set_doff(offset);
             value.set_dlen(size);
-            messageDb.put(0, &key, &value, DB_AUTO_COMMIT);
+            messageDb.put(txn.get(), &key, &value, 0);
+            txn.commit();
         } catch (const DbException& e) {
+            txn.abort();
             THROW_STORE_EXCEPTION_2("Error appending content", e);
+        } catch (...) {
+            txn.abort();
+            throw;
         }
     } else {
         THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
@@ -1035,16 +1073,18 @@
             value.set_dlen(length);
             int status = messageDb.get(txn.get(), &key, &value, 0);
             if (status == DB_NOTFOUND) {
-                txn.abort();
                 delete [] buffer;
                 THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
-            } else {
-                txn.commit();
-                data.assign(buffer, value.get_size());
-                delete [] buffer;
             }
+            data.assign(buffer, value.get_size());
+            delete [] buffer;
+            txn.commit();
         } catch (const DbException& e) {
+            txn.abort();
             THROW_STORE_EXCEPTION_2("Error loading content", e);
+        } catch (...) {
+            txn.abort();
+            throw;
         }
     } else {
         THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
@@ -1378,26 +1418,32 @@
     TxnCtxt txn;
     txn.begin(env, true);
     try {
-        Cursor bindings;
-        bindings.open(bindingDb, txn.get());
+        {
+            Cursor bindings;
+            bindings.open(bindingDb, txn.get());
 
-        IdDbt key;
-        Dbt value;
-        while (bindings.next(key, value)) {
-            Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
-            if (buffer.available() < 8) {
-                THROW_STORE_EXCEPTION("Not enough data for binding");
+            IdDbt key;
+            Dbt value;
+            while (bindings.next(key, value)) {
+                Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+                if (buffer.available() < 8) {
+                    THROW_STORE_EXCEPTION("Not enough data for binding");
+                }
+                uint64_t queueId = buffer.getLongLong();
+                if (queue.getPersistenceId() == queueId) {
+                    bindings->del(0);
+                    QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+                }
             }
-            uint64_t queueId = buffer.getLongLong();
-            if (queue.getPersistenceId() == queueId) {
-                bindings->del(0);
-                QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
-            }
         }
+        txn.commit();
     } catch (const std::exception& e) {
+        txn.abort();
         THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+    } catch (...) {
+        txn.abort();
+        throw;
     }
-    txn.commit();
     QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
 }
 
@@ -1408,33 +1454,39 @@
     TxnCtxt txn;
     txn.begin(env, true);
     try {
-        Cursor bindings;
-        bindings.open(bindingDb, txn.get());
+        {
+            Cursor bindings;
+            bindings.open(bindingDb, txn.get());
 
-        IdDbt key(exchange.getPersistenceId());
-        Dbt value;
+            IdDbt key(exchange.getPersistenceId());
+            Dbt value;
 
-        for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
-            Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
-            if (buffer.available() < 8) {
-                THROW_STORE_EXCEPTION("Not enough data for binding");
-            }
-            uint64_t queueId = buffer.getLongLong();
-            if (queue.getPersistenceId() == queueId) {
-                std::string q;
-                std::string k;
-                buffer.getShortString(q);
-                buffer.getShortString(k);
-                if (bkey == k) {
-                    bindings->del(0);
-                    QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+            for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
+                Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+                if (buffer.available() < 8) {
+                    THROW_STORE_EXCEPTION("Not enough data for binding");
                 }
+                uint64_t queueId = buffer.getLongLong();
+                if (queue.getPersistenceId() == queueId) {
+                    std::string q;
+                    std::string k;
+                    buffer.getShortString(q);
+                    buffer.getShortString(k);
+                    if (bkey == k) {
+                        bindings->del(0);
+                        QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+                    }
+                }
             }
         }
+        txn.commit();
     } catch (const std::exception& e) {
+        txn.abort();
         THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+    } catch (...) {
+        txn.abort();
+        throw;
     }
-    txn.commit();
 }
 
 string BdbMessageStore::getJrnlBaseDir()

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-30 20:09:46 UTC (rev 2237)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-30 20:28:39 UTC (rev 2238)
@@ -115,7 +115,6 @@
     static qpid::sys::Duration defJournalFlushTimeout;
     qpid::management::Store::shared_ptr mgmtObject;
     qpid::sys::Mutex jrnlCreateLock;
-    qpid::sys::Mutex bdbQueueLock;
 
     // Parameter validation and calculation
     static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,




More information about the rhmessaging-commits mailing list