[rhmessaging-commits] rhmessaging commits: r3368 - in store/trunk/cpp: lib/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri May 8 10:27:05 EDT 2009


Author: kpvdr
Date: 2009-05-08 10:27:05 -0400 (Fri, 08 May 2009)
New Revision: 3368

Modified:
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
   store/trunk/cpp/lib/TxnCtxt.cpp
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/jdir.cpp
   store/trunk/cpp/tests/python_tests/cluster_basic.py
   store/trunk/cpp/tests/run_python_tests
Log:
Fix for BZ 483807 - "resolve join state for store recover in cluster for joining nodes" and its dups. This checkin syncs with qpid checkin 773004.

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-05-08 14:27:05 UTC (rev 3368)
@@ -30,6 +30,7 @@
 #include "qpid/log/Statement.h"
 #include "qmf/com/redhat/rhm/store/Package.h"
 #include "StoreException.h"
+#include <dirent.h>
 
 #define MAX_AIO_SLEEPS 1000 // ~1 second
 #define AIO_SLEEP_TIME 1000 // 1 milisecond
@@ -63,7 +64,6 @@
 {}
 
 MessageStoreImpl::MessageStoreImpl(const char* envpath) :
-                                 env(0),
                                  numJrnlFiles(0),
                                  autoJrnlExpand(false),
                                  autoJrnlExpandMaxFiles(0),
@@ -273,15 +273,35 @@
     tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
     autoJrnlExpand = autoJExpand;
     autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
-
     if (dir.size()>0) storeDir = dir;
 
+    init();
+
+    QPID_LOG(notice, "Store module initialized; dir=" << dir);
+    QPID_LOG(info,   "> Default files per journal: " << jfiles);
+// TODO: Uncomment these lines when auto-expand is enabled.
+//     QPID_LOG(info,   "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
+//     if (autoJrnlExpand) QPID_LOG(info,   "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
+    QPID_LOG(info,   "> Default journal file size: " << jfileSizePgs << " (wpgs)");
+    QPID_LOG(info,   "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
+    QPID_LOG(info,   "> Default number of write cache pages: " << wCacheNumPages);
+    QPID_LOG(info,   "> TPL files per journal: " << tplNumJrnlFiles);
+    QPID_LOG(info,   "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
+    QPID_LOG(info,   "> TPL write cache page size: " << tplWCachePageSizeKib << " (Kib)");
+    QPID_LOG(info,   "> TPL number of write cache pages: " << tplWCacheNumPages);
+
+    return isInit;
+}
+
+void MessageStoreImpl::init()
+{
     journal::jdir::create_dir(getBdbBaseDir());
 
     try {
-        env.set_errpfx("msgstore");
-        env.set_lg_regionmax(256000); // default = 65000
-        env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
+        dbenv.reset(new DbEnv(0));
+        dbenv->set_errpfx("msgstore");
+        dbenv->set_lg_regionmax(256000); // default = 65000
+        dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
     } catch (const DbException& e) {
         if (e.get_errno() == DB_VERSION_MISMATCH)
             THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
@@ -295,15 +315,15 @@
         // Databases are constructed here instead of the constructor so that the DB_RECOVER flag can be used
         // against the database environment. Recover can only be performed if no databases have been created
         // against the environment at the time of recovery, as recovery invalidates the environment.
-        queueDb.reset(new Db(&env, 0));
-        configDb.reset(new Db(&env, 0));
-        exchangeDb.reset(new Db(&env, 0));
-        messageDb.reset(new Db(&env, 0));
-        mappingDb.reset(new Db(&env, 0));
-        bindingDb.reset(new Db(&env, 0));
-        generalDb.reset(new Db(&env, 0));
+        queueDb.reset(new Db(dbenv.get(), 0));
+        configDb.reset(new Db(dbenv.get(), 0));
+        exchangeDb.reset(new Db(dbenv.get(), 0));
+        messageDb.reset(new Db(dbenv.get(), 0));
+        mappingDb.reset(new Db(dbenv.get(), 0));
+        bindingDb.reset(new Db(dbenv.get(), 0));
+        generalDb.reset(new Db(dbenv.get(), 0));
 
-        txn.begin(env, false);
+        txn.begin(dbenv.get(), false);
         open(queueDb, txn.get(), "queues.db", false);
         open(configDb, txn.get(), "config.db", false);
         open(exchangeDb, txn.get(), "exchanges.db", false);
@@ -325,21 +345,52 @@
     }
 
     isInit = true;
-    QPID_LOG(notice, "Store module initialized; dir=" << dir);
-    QPID_LOG(info,   "> Default files per journal: " << jfiles);
-// TODO: Uncomment these lines when auto-expand is enabled.
-//     QPID_LOG(info,   "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
-//     if (autoJrnlExpand) QPID_LOG(info,   "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
-    QPID_LOG(info,   "> Default journal file size: " << jfileSizePgs << " (wpgs)");
-    QPID_LOG(info,   "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
-    QPID_LOG(info,   "> Default number of write cache pages: " << wCacheNumPages);
-    QPID_LOG(info,   "> TPL files per journal: " << tplNumJrnlFiles);
-    QPID_LOG(info,   "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
-    QPID_LOG(info,   "> TPL write cache page size: " << tplWCachePageSizeKib << " (Kib)");
-    QPID_LOG(info,   "> TPL number of write cache pages: " << tplWCacheNumPages);
-    return true;
 }
 
+void MessageStoreImpl::pushDown(const char* dirName, const char* bakDirName)
+{
+    DIR* dir = ::opendir(dirName);
+    if (dir)
+    {
+        std::ostringstream oss;
+        oss << dirName << "/" << bakDirName;
+        // Delete bak dir if it exists
+        mrg::journal::jdir::delete_dir(oss.str());
+        // Create new bak dir
+        mrg::journal::jdir::create_dir(oss.str());
+
+        // Copy contents of current dir into bak dir
+        struct dirent* entry;
+        while ((entry = ::readdir(dir)) != 0) {
+            // Ignore . and .. and backup dir
+            if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0 && std::strcmp(entry->d_name, bakDirName)) {
+                std::ostringstream oldname;
+                oldname << dirName << "/" << entry->d_name;
+                std::ostringstream newname;
+                newname << oss.str() << "/" << entry->d_name;
+                ::rename(oldname.str().c_str(), newname.str().c_str());
+            }
+        }
+    }
+}
+
+void MessageStoreImpl::discardInit(const bool pushDownStoreFiles)
+{
+    if (isInit) {
+        for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
+            (*i)->close(0);
+        }
+        dbs.clear();
+        if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
+        dbenv->close(0);
+        if (pushDownStoreFiles)
+            pushDown(storeDir.c_str(), "cluster_bak");
+        else
+            mrg::journal::jdir::delete_dir(storeDir.c_str());
+        init();
+    }
+}
+
 void MessageStoreImpl::chkTplStoreInit()
 {
     if (!tplStorePtr->is_ready()) {
@@ -386,7 +437,7 @@
 void MessageStoreImpl::truncate()
 {
     DbTxn* txn;
-    env.txn_begin(0, &txn, 0);
+    dbenv->txn_begin(0, &txn, 0);
     u_int32_t count;
 
     for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
@@ -533,7 +584,7 @@
 
     int status;
     TxnCtxt txn;
-    txn.begin(env, true);
+    txn.begin(dbenv.get(), true);
     try {
         status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE);
         txn.commit();
@@ -565,7 +616,7 @@
     IdDbt key(e.getPersistenceId());
     BindingDbt value(e, q, k, a);
     TxnCtxt txn;
-    txn.begin(env, true);
+    txn.begin(dbenv.get(), true);
     try {
         put(bindingDb, txn.get(), key, value);
         txn.commit();
@@ -595,7 +646,7 @@
     message_index messages;//id->message
 
     TxnCtxt txn;
-    txn.begin(env, false);
+    txn.begin(dbenv.get(), false);
     try {
         //read all queues, calls recoversMessages
         recoverQueues(txn, registry, queues, prepared, messages);
@@ -969,7 +1020,7 @@
     value.buffer.record();
 
     TxnCtxt txn;
-    txn.begin(env, true);
+    txn.begin(dbenv.get(), true);
     try {
         if (messageDb->get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
             txn.abort();
@@ -1159,7 +1210,7 @@
 {
     checkInit();
     TxnCtxt txn;
-    txn.begin(env, true);
+    txn.begin(dbenv.get(), true);
 
     u_int64_t messageId (msg->getPersistenceId());
     if (messageId == 0 || !msg->isContentReleased()) {
@@ -1183,7 +1234,7 @@
     if (messageId) {
         Dbt key (&messageId, sizeof(messageId));
         TxnCtxt txn;
-        txn.begin(env, true);
+        txn.begin(dbenv.get(), true);
         try {
             deleteIfUnused(txn.get(), key);
             txn.commit();
@@ -1202,7 +1253,7 @@
 {
     u_int64_t ret = 0;
     TxnCtxt txn;
-    txn.begin(env, true);
+    txn.begin(dbenv.get(), true);
     try {
         ret = getRecordSize(txn.get(), db, key);
         txn.commit();
@@ -1238,7 +1289,7 @@
     u_int64_t messageId (msg->getPersistenceId());
     if (messageId != 0) {
         TxnCtxt txn;
-        txn.begin(env, true);
+        txn.begin(dbenv.get(), true);
         try {
             Dbt key (&messageId, sizeof(messageId));
             u_int64_t offset = getRecordSize(messageDb, key);
@@ -1289,7 +1340,7 @@
             THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
         }
         TxnCtxt txn;
-        txn.begin(env, true);
+        txn.begin(dbenv.get(), true);
         try {
             Dbt key (&messageId, sizeof(messageId));
             char *buffer = new char[length];
@@ -1674,7 +1725,7 @@
 void MessageStoreImpl::deleteBindingsForQueue(const PersistableQueue& queue)
 {
     TxnCtxt txn;
-    txn.begin(env, true);
+    txn.begin(dbenv.get(), true);
     try {
         {
             Cursor bindings;
@@ -1710,7 +1761,7 @@
                                     const std::string& bkey)
 {
     TxnCtxt txn;
-    txn.begin(env, true);
+    txn.begin(dbenv.get(), true);
     try {
         {
             Cursor bindings;

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2009-05-08 14:27:05 UTC (rev 3368)
@@ -55,6 +55,8 @@
 {
   public:
     typedef boost::shared_ptr<Db> db_ptr;
+    typedef boost::shared_ptr<DbEnv> dbEnv_ptr;
+
     struct StoreOptions : public qpid::Options {
         StoreOptions(const std::string& name="Store Options");
         std::string clusterName;
@@ -101,7 +103,7 @@
     static const u_int16_t defAutoJrnlExpandMaxFiles = 16;
 
     std::list<db_ptr> dbs;
-    DbEnv env;
+    dbEnv_ptr dbenv;
     db_ptr queueDb;
     db_ptr configDb;
     db_ptr exchangeDb;
@@ -152,6 +154,10 @@
                                   const u_int16_t numJrnlFiles,
                                   const std::string& numJrnlFilesParamName);
 
+    void init();
+
+    void pushDown(const char* dir, const char* bakDirName = "bak");
+
     void recoverQueues(TxnCtxt& txn,
                        qpid::broker::RecoveryManager& recovery,
                        queue_index& index,
@@ -249,7 +255,7 @@
     std::string getTplBaseDir();
     inline void checkInit() {
         // TODO: change the default dir to ~/.qpidd
-        if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+        if (!isInit) { init("/tmp"); isInit = true; }
     }
     void chkTplStoreInit();
 
@@ -285,6 +291,8 @@
               bool      autoJExpand = defAutoJrnlExpand,
               u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles);
 
+    void discardInit(const bool pushDownStoreFiles = false);
+
     void initManagement (qpid::broker::Broker* broker);
 
     void truncate();

Modified: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp	2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/TxnCtxt.cpp	2009-05-08 14:27:05 UTC (rev 3368)
@@ -112,8 +112,8 @@
     }
 }
 
-void TxnCtxt::begin(DbEnv& env, bool sync) {
-    env.txn_begin(0, &txn, 0);
+void TxnCtxt::begin(DbEnv* env, bool sync) {
+    env->txn_begin(0, &txn, 0);
     if (sync)
         globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
 }

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/TxnCtxt.h	2009-05-08 14:27:05 UTC (rev 3368)
@@ -29,7 +29,7 @@
 #include <memory>
 #include <set>
 #include <string>
- 
+
 #include "DataTokenImpl.h"
 #include "IdSequence.h"
 #include "JournalImpl.h"
@@ -83,7 +83,7 @@
      */
     void sync();
     void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
-    void begin(DbEnv& env, bool sync = false);
+    void begin(DbEnv* env, bool sync = false);
     void commit();
     void abort();
     DbTxn* get();

Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp	2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp	2009-05-08 14:27:05 UTC (rev 3368)
@@ -242,12 +242,12 @@
     DIR* dir = ::opendir(dirname.c_str());
     if (!dir)
     {
-    	if (errno == ENOENT) // dir does not exist.
-	    return;
+        if (errno == ENOENT) // dir does not exist.
+            return;
 
-         std::ostringstream oss;
-         oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
-         throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "delete_dir");
+        std::ostringstream oss;
+        oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "delete_dir");
     }
     else
     {

Modified: store/trunk/cpp/tests/python_tests/cluster_basic.py
===================================================================
--- store/trunk/cpp/tests/python_tests/cluster_basic.py	2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/tests/python_tests/cluster_basic.py	2009-05-08 14:27:05 UTC (rev 3368)
@@ -79,33 +79,32 @@
             self.killAllClusters()
             raise
 
-# TODO: Un-comment this when the "Exchange already exists: amq.direct" error is fixed
-#    def test_Cluster_04_SingleClusterRemoveRestoreNodes(self):
-#        if self._runClusterTests == None:
-#            print "skipped"
-#            return
-#        try:
-#            clusterName = "test_Cluster_04_SingleClusterRemoveRestoreNodes"
-#            self.createCheckCluster(clusterName, 6)
-#            self.checkNumBrokers(6)
-#            self.killNode(1, clusterName)
-#            self.killNode(3, clusterName)
-#            self.killNode(4, clusterName)
-#            self.checkNumBrokers(3)
-#            self.createClusterNode(1, clusterName)
-#            self.createClusterNode(3, clusterName)
-#            self.createClusterNode(4, clusterName)
-#            self.checkNumClusterBrokers(clusterName, 6)
-#            self.killNode(2, clusterName)
-#            self.killNode(3, clusterName)
-#            self.killNode(4, clusterName)
-#            self.checkNumBrokers(3)
-#            self.createClusterNode(2, clusterName)
-#            self.createClusterNode(3, clusterName)
-#            self.createClusterNode(4, clusterName)
-#            self.checkNumClusterBrokers(clusterName, 6)
-#            self.stopCheckAll()
-#        except:
-#            self.killAllClusters()
-#            raise
+    def test_Cluster_04_SingleClusterRemoveRestoreNodes(self):
+        if self._runClusterTests == None:
+            print "skipped"
+            return
+        try:
+            clusterName = "test_Cluster_04_SingleClusterRemoveRestoreNodes"
+            self.createCheckCluster(clusterName, 6)
+            self.checkNumBrokers(6)
+            self.killNode(1, clusterName)
+            self.killNode(3, clusterName)
+            self.killNode(4, clusterName)
+            self.checkNumBrokers(3)
+            self.createClusterNode(1, clusterName)
+            self.createClusterNode(3, clusterName)
+            self.createClusterNode(4, clusterName)
+            self.checkNumClusterBrokers(clusterName, 6)
+            self.killNode(2, clusterName)
+            self.killNode(3, clusterName)
+            self.killNode(4, clusterName)
+            self.checkNumBrokers(3)
+            self.createClusterNode(2, clusterName)
+            self.createClusterNode(3, clusterName)
+            self.createClusterNode(4, clusterName)
+            self.checkNumClusterBrokers(clusterName, 6)
+            self.stopCheckAll()
+        except:
+            self.killAllClusters()
+            raise
         

Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests	2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/tests/run_python_tests	2009-05-08 14:27:05 UTC (rev 3368)
@@ -22,7 +22,15 @@
 # The GNU Lesser General Public License is available in the file COPYING.
 
 if test -z ${QPID_DIR} ; then
-	echo "WARNING: QPID_DIR not set, skipping python tests."
+    cat <<EOF
+
+	===========  WARNING: PYTHON TESTS DISABLED ==============
+	
+	QPID_DIR not set.
+
+	===========================================================
+
+EOF
 	exit
 fi
 
@@ -33,9 +41,18 @@
 	PYTHON_TESTS=python_tests
 	FAILING_PYTHON_TESTS=${abs_srcdir}/failing_python_tests.txt
 else
-    echo "WARNING: Unable to load python qpid module - skipping python tests."
-    echo "         QPID_DIR=${QPID_DIR}"
-    echo "         PYTHONPATH=${PYTHONPATH}"
+    cat <<EOF
+
+	===========  WARNING: PYTHON TESTS DISABLED ==============
+	
+	Unable to load python qpid module - skipping python tests.
+	
+    QPID_DIR=${QPID_DIR}"
+    PYTHONPATH=${PYTHONPATH}"
+
+	===========================================================
+
+EOF
     exit
 fi
 
@@ -66,7 +83,7 @@
     $NOAISEXEC
 
     ===========================================================
-    
+
 EOF
 else
 	export RUN_CLUSTER_TESTS=1




More information about the rhmessaging-commits mailing list