[rhmessaging-commits] rhmessaging commits: r2693 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Oct 29 13:33:09 EDT 2008


Author: kpvdr
Date: 2008-10-29 13:33:09 -0400 (Wed, 29 Oct 2008)
New Revision: 2693

Modified:
   store/trunk/cpp/lib/Cursor.h
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
Log:
Fix for BZ462046 - "Automate bdb \'recover\' routine on retsrating crashed broker".

Modified: store/trunk/cpp/lib/Cursor.h
===================================================================
--- store/trunk/cpp/lib/Cursor.h	2008-10-28 16:16:20 UTC (rev 2692)
+++ store/trunk/cpp/lib/Cursor.h	2008-10-29 17:33:09 UTC (rev 2693)
@@ -24,6 +24,7 @@
 #ifndef _Cursor_
 #define _Cursor_
 
+#include <boost/shared_ptr.hpp>
 #include "db-inc.h"
 
 namespace mrg{
@@ -33,9 +34,12 @@
 {
     Dbc* cursor;
 public:
+    typedef boost::shared_ptr<Db> db_ptr;
+
     Cursor() : cursor(0) {}
     virtual ~Cursor() { if(cursor) cursor->close(); }
-    void open(Db& db, DbTxn* txn, u_int32_t flags = 0) { db.cursor(txn, &cursor, flags); }
+
+    void open(db_ptr db, DbTxn* txn, u_int32_t flags = 0) { db->cursor(txn, &cursor, flags); }
     void close() { if(cursor) cursor->close(); cursor = 0; }
     Dbc* get() { return cursor; }
     Dbc* operator->() { return cursor; }

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2008-10-28 16:16:20 UTC (rev 2692)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2008-10-29 17:33:09 UTC (rev 2693)
@@ -63,13 +63,6 @@
 
 MessageStoreImpl::MessageStoreImpl(const char* envpath) :
                                  env(0),
-                                 queueDb(&env, 0),
-                                 configDb(&env, 0),
-                                 exchangeDb(&env, 0),
-                                 messageDb(&env, 0),
-                                 mappingDb(&env, 0),
-                                 bindingDb(&env, 0),
-                                 generalDb(&env, 0),
                                  numJrnlFiles(0),
                                  autoJrnlExpand(false),
                                  autoJrnlExpandMaxFiles(0),
@@ -186,14 +179,16 @@
         // num-jfiles at max; disable auto-expand
         autoJrnlExpand = false;
         autoJrnlExpandMaxFiles = 0;
-        QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") must be higher than parameter " << numJrnlFilesParamName << " (" << numJrnlFiles << ") which is at the maximum allowable value; disabling auto-expand.");
+        QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") must be higher than parameter "
+                << numJrnlFilesParamName << " (" << numJrnlFiles << ") which is at the maximum allowable value; disabling auto-expand.");
         return;
     }
     if (p > JRNL_MAX_NUM_FILES) {
         // auto-expand-max-jfiles higher than max allowable, adjust
         autoJrnlExpand = true;
         autoJrnlExpandMaxFiles = JRNL_MAX_NUM_FILES;
-        QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
+        QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is above allowable maximum ("
+                << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
         return;
     }
     if (p <= numJrnlFiles) {
@@ -201,7 +196,9 @@
         u_int16_t incr = JRNL_MAX_NUM_FILES - numJrnlFiles > 1 ? 2 : 1;
         autoJrnlExpand = true;
         autoJrnlExpandMaxFiles = numJrnlFiles + incr;
-        QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is not above that of parameter " << numJrnlFilesParamName << " (" << numJrnlFiles << "); changing this parameter to value of parameter " << numJrnlFilesParamName << " plus " << incr << " (" << autoJrnlExpandMaxFiles << ").");
+        QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is not above that of parameter "
+                << numJrnlFilesParamName << " (" << numJrnlFiles << "); changing this parameter to value of parameter " << numJrnlFilesParamName
+                << " plus " << incr << " (" << autoJrnlExpandMaxFiles << ").");
         return;
     }
     // No adjustments req'd, set values
@@ -281,7 +278,9 @@
     journal::jdir::create_dir(getBdbBaseDir());
 
     try {
-        env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+        env.set_errpfx("msgstore");
+        env.set_lg_regionmax(256000); // default = 65000
+        env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
     } catch (const DbException& e) {
         if (e.get_errno() == DB_VERSION_MISMATCH)
             THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
@@ -292,6 +291,17 @@
 
     TxnCtxt txn;
     try {
+        // Databases are constructed here instead of the constructor so that the DB_RECOVER flag can be used
+        // against the database enviroment. Recover can only be performed if no databases have been created
+        // against the environmnet at the time of recovery, as recovery invalidates the environment.
+        queueDb.reset(new Db(&env, 0));
+        configDb.reset(new Db(&env, 0));
+        exchangeDb.reset(new Db(&env, 0));
+        messageDb.reset(new Db(&env, 0));
+        mappingDb.reset(new Db(&env, 0));
+        bindingDb.reset(new Db(&env, 0));
+        generalDb.reset(new Db(&env, 0));
+
         txn.begin(env, false);
         open(queueDb, txn.get(), "queues.db", false);
         open(configDb, txn.get(), "config.db", false);
@@ -340,20 +350,20 @@
     }
 }
 
-void MessageStoreImpl::open(Db& db,
+void MessageStoreImpl::open(db_ptr db,
                            DbTxn* txn,
                            const char* file,
                            bool dupKey)
 {
-    if(dupKey) db.set_flags(DB_DUPSORT);
-    db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
-    dbs.push_back(&db);
+    if(dupKey) db->set_flags(DB_DUPSORT);
+    db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
+    dbs.push_back(db);
 }
 
 MessageStoreImpl::~MessageStoreImpl()
 {
     try {
-        for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
+        for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
         }
         if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
@@ -377,7 +387,7 @@
     env.txn_begin(0, &txn, 0);
     u_int32_t count;
 
-    for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
+    for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
         (*i)->truncate(txn, &count, 0);
     }
 
@@ -481,7 +491,7 @@
     destroy(exchangeDb, exchange);
     //need to also delete bindings
     IdDbt key(exchange.getPersistenceId());
-    bindingDb.del(0, &key, DB_AUTO_COMMIT);
+    bindingDb->del(0, &key, DB_AUTO_COMMIT);
 }
 
 void MessageStoreImpl::create(const PersistableConfig& general)
@@ -505,7 +515,7 @@
     destroy(generalDb, general);
 }
 
-bool MessageStoreImpl::create(Db& db,
+bool MessageStoreImpl::create(db_ptr db,
                              IdSequence& seq,
                              const Persistable& p)
 {
@@ -517,7 +527,7 @@
     TxnCtxt txn;
     txn.begin(env, true);
     try {
-        status = db.put(txn.get(), &key, &value, DB_NOOVERWRITE);
+        status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE);
         txn.commit();
     } catch (...) {
         txn.abort();
@@ -531,10 +541,10 @@
     }
 }
 
-void MessageStoreImpl::destroy(Db& db, const Persistable& p)
+void MessageStoreImpl::destroy(db_ptr db, const Persistable& p)
 {
     IdDbt key(p.getPersistenceId());
-    db.del(0, &key, DB_AUTO_COMMIT);
+    db->del(0, &key, DB_AUTO_COMMIT);
 }
 
 
@@ -946,7 +956,7 @@
     TxnCtxt txn;
     txn.begin(env, true);
     try {
-        if (messageDb.get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
+        if (messageDb->get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
             txn.abort();
             THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
         }
@@ -955,7 +965,7 @@
         headerSize = value.buffer.getLong();
 
         BufferValue header(headerSize, preamble_length);
-        if (messageDb.get(txn.get(), &key, &header, 0) == DB_NOTFOUND) {
+        if (messageDb->get(txn.get(), &key, &header, 0) == DB_NOTFOUND) {
             txn.abort();
             THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
         }
@@ -1156,7 +1166,7 @@
     }
 }
 
-u_int64_t MessageStoreImpl::getRecordSize(Db& db,
+u_int64_t MessageStoreImpl::getRecordSize(db_ptr db,
                                          Dbt& key)
 {
     u_int64_t ret = 0;
@@ -1173,14 +1183,14 @@
 }
 
 u_int64_t MessageStoreImpl::getRecordSize(DbTxn* txn,
-                                         Db& db,
+                                         db_ptr db,
                                          Dbt& key)
 {
     Dbt peek;
     peek.set_flags(DB_DBT_USERMEM);
     peek.set_ulen(0);
     try {
-        int status = db.get(txn, &key, &peek, 0);
+        int status = db->get(txn, &key, &peek, 0);
         if (status != DB_BUFFER_SMALL) {
             THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + std::string(DbEnv::strerror(status)));
         }
@@ -1213,7 +1223,7 @@
             value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
             value.set_doff(offset);
             value.set_dlen(size);
-            messageDb.put(txn.get(), &key, &value, 0);
+            messageDb->put(txn.get(), &key, &value, 0);
             txn.commit();
         } catch (const DbException& e) {
             txn.abort();
@@ -1259,7 +1269,7 @@
             value.set_ulen(length);
             value.set_doff(realOffset);
             value.set_dlen(length);
-            int status = messageDb.get(txn.get(), &key, &value, 0);
+            int status = messageDb->get(txn.get(), &key, &value, 0);
             if (status == DB_NOTFOUND) {
                 delete [] buffer;
                 THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
@@ -1369,7 +1379,7 @@
             /// cct message db
             if (newId) {  // only store in Bd if first time message is stored
                 Dbt data(buff,size);
-                messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
+                messageDb->put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
             }
         }
     } catch (const journal::jexception& e) {
@@ -1454,7 +1464,7 @@
                                      Dbt& messageId)
 {
     if (isUnused(cursor, messageId)) {
-        messageDb.del(txn, &messageId, 0);
+        messageDb->del(txn, &messageId, 0);
         return true;
     } else {
         return false;
@@ -1582,13 +1592,13 @@
     return txn;
 }
 
-void MessageStoreImpl::put(Db& db,
+void MessageStoreImpl::put(db_ptr db,
                           DbTxn* txn,
                           Dbt& key,
                           Dbt& value)
 {
     try {
-        int status = db.put(txn, &key, &value, DB_NODUPDATA);
+        int status = db->put(txn, &key, &value, DB_NODUPDATA);
         if (status == DB_KEYEXIST) {
             THROW_STORE_EXCEPTION("duplicate data");
         } else if (status) {
@@ -1599,7 +1609,7 @@
     }
 }
 
-bool MessageStoreImpl::deleteKeyValuePair(Db& db,
+bool MessageStoreImpl::deleteKeyValuePair(db_ptr db,
                                          DbTxn* txn,
                                          Dbt& key,
                                          Dbt& value)

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2008-10-28 16:16:20 UTC (rev 2692)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2008-10-29 17:33:09 UTC (rev 2693)
@@ -54,6 +54,7 @@
 class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::management::Manageable
 {
   public:
+    typedef boost::shared_ptr<Db> db_ptr;
     struct StoreOptions : public qpid::Options {
         StoreOptions(const std::string& name="Store Options");
         std::string clusterName;
@@ -99,15 +100,15 @@
     static const bool      defAutoJrnlExpand = true;
     static const u_int16_t defAutoJrnlExpandMaxFiles = 16;
 
-    std::list<Db*> dbs;
+    std::list<db_ptr> dbs;
     DbEnv env;
-    Db queueDb;
-    Db configDb;
-    Db exchangeDb;
-    Db messageDb;
-    Db mappingDb;
-    Db bindingDb;
-    Db generalDb;
+    db_ptr queueDb;
+    db_ptr configDb;
+    db_ptr exchangeDb;
+    db_ptr messageDb;
+    db_ptr mappingDb;
+    db_ptr bindingDb;
+    db_ptr generalDb;
 
     // Pointer to Transaction Prepared List (TPL) journal instance
     boost::shared_ptr<TplJournalImpl> tplStorePtr;
@@ -204,14 +205,14 @@
                         Dbt& messageId);
     bool isUnused(Cursor& cursor,
                   Dbt& messageId);
-    void destroy(Db& db,
+    void destroy(db_ptr db,
                  const qpid::broker::Persistable& p);
-    bool create(Db& db,
+    bool create(db_ptr db,
                 IdSequence& seq,
                 const qpid::broker::Persistable& p);
     void completed(TxnCtxt& txn,
                    bool commit);
-    void record2pcOp(Db& db,
+    void record2pcOp(db_ptr db,
                      TPCTxnCtxt& txn,
                      u_int64_t messageId,
                      u_int64_t queueId);
@@ -220,20 +221,20 @@
                        const qpid::broker::PersistableQueue& queue,
                        const std::string& key);
 
-    u_int64_t getRecordSize(Db& db,
+    u_int64_t getRecordSize(db_ptr db,
                             Dbt& key);
     u_int64_t getRecordSize(DbTxn* txn,
-                            Db& db,
+                            db_ptr db,
                             Dbt& key);
-    void put(Db& db,
+    void put(db_ptr db,
              DbTxn* txn,
              Dbt& key,
              Dbt& value);
-    bool deleteKeyValuePair(Db& db,
+    bool deleteKeyValuePair(db_ptr db,
                             DbTxn* txn,
                             Dbt& key,
                             Dbt& value);
-    void open(Db& db,
+    void open(db_ptr db,
               DbTxn* txn,
               const char* file,
               bool dupKey);




More information about the rhmessaging-commits mailing list