[rhmessaging-commits] rhmessaging commits: r3735 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Dec 3 16:01:31 EST 2009


Author: kpvdr
Date: 2009-12-03 16:01:30 -0500 (Thu, 03 Dec 2009)
New Revision: 3735

Modified:
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jdir.cpp
   store/trunk/cpp/lib/jrnl/jdir.hpp
   store/trunk/cpp/tools/resize
Log:
Updates to the persistence id recovery logic, also a new push down function (in jdir this time) which creates a serialized backup dir instead of deleting the previous bak dir.

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-12-03 21:01:30 UTC (rev 3735)
@@ -373,33 +373,6 @@
     }
 }
 
-void MessageStoreImpl::pushDown(const std::string& dirName, const std::string& targetDir, const std::string& bakDirName)
-{
-    DIR* dir = ::opendir(dirName.c_str());
-    if (dir)
-    {
-        std::ostringstream oss;
-        oss << dirName << "/" << bakDirName;
-        // Delete bak dir if it exists
-        mrg::journal::jdir::delete_dir(oss.str());
-        // Create new bak dir
-        mrg::journal::jdir::create_dir(oss.str());
-
-        // Copy contents of targetDir into bak dir
-        struct dirent* entry;
-        while ((entry = ::readdir(dir)) != 0) {
-            // Search for targetDir in dirName
-            if (std::strcmp(entry->d_name, targetDir.c_str()) == 0) {
-                std::ostringstream oldname;
-                oldname << dirName << "/" << targetDir;
-                std::ostringstream newname;
-                newname << oss.str() << "/" << targetDir;
-                ::rename(oldname.str().c_str(), newname.str().c_str());
-            }
-        }
-    }
-}
-
 void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
 {
     if (isInit) {
@@ -415,12 +388,12 @@
         if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
         dbenv->close(0);
     }
-    if (pushDownStoreFiles)
-        pushDown(storeDir, storeTopLevelDir, "cluster_bak");
-    else {
-        std::ostringstream oss;
-        oss << storeDir << "/" << storeTopLevelDir;
-        QPID_LOG(notice, "Store in " << oss.str() << " truncated.");
+    std::ostringstream oss;
+    oss << storeDir << "/" << storeTopLevelDir;
+    if (pushDownStoreFiles) {
+        QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down into directory " << mrg::journal::jdir::push_down(storeDir, storeTopLevelDir, "cluster") << ".");
+    } else {
+        QPID_LOG(notice, "Store directory " << oss.str() << " was truncated.");
         mrg::journal::jdir::delete_dir(oss.str().c_str());
     }
     init();
@@ -793,11 +766,12 @@
         {
             long rcnt = 0L;     // recovered msg count
             long idcnt = 0L;    // in-doubt msg count
-            u_int64_t thisHighestRid = 0;
+            u_int64_t thisHighestRid = 0ULL;
             jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
-            // RFC 1982 comparison for unsigned 64-bit
-            if (thisHighestRid - highestRid < 0x8000000000000000ULL)
+            if (highestRid == 0ULL)
                 highestRid = thisHighestRid;
+            else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
+                highestRid = thisHighestRid;
             recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
             QPID_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
             jQueue->recover_complete(); // start journal.
@@ -1188,11 +1162,12 @@
 void MessageStoreImpl::recoverTplStore()
 {
     if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
-        u_int64_t thisHighestRid;
+        u_int64_t thisHighestRid = 0ULL;
         tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
-        // RFC 1982 comparison for unsigned 64-bit
-        if (thisHighestRid - highestRid  < 0x8000000000000000ULL)
+        if (highestRid == 0ULL)
             highestRid = thisHighestRid;
+        else if (thisHighestRid - highestRid  < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
+            highestRid = thisHighestRid;
 
         // Load tplRecoverMap by reading the TPL store
         readTplStore();

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2009-12-03 21:01:30 UTC (rev 3735)
@@ -169,8 +169,6 @@
 
     void init();
 
-    void pushDown(const std::string& dir, const std::string& targetDir, const std::string& bakDirName = "bak");
-
     void recoverQueues(TxnCtxt& txn,
                        qpid::broker::RecoveryManager& recovery,
                        queue_index& index,

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2009-12-03 21:01:30 UTC (rev 3735)
@@ -932,9 +932,10 @@
         throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl",
                 "check_owi");
     }
-    // RFC 1982 comparison for unsigned 64-bit
-    if (h._rid - rd._h_rid < 0x8000000000000000ULL)
+    if (rd._h_rid == 0)
         rd._h_rid = h._rid;
+    else if (h._rid - rd._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
+        rd._h_rid = h._rid;
     return true;
 }
 

Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp	2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp	2009-12-03 21:01:30 UTC (rev 3735)
@@ -168,7 +168,43 @@
     close_dir(dir, dirname, "clear_dir");
 }
 
+// === push_down ===
 
+std::string
+jdir::push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base)
+{
+    std::string bak_dir_name = create_bak_dir(dirname, bak_dir_base);
+
+    DIR* dir = ::opendir(dirname.c_str());
+    if (!dir)
+    {
+        std::ostringstream oss;
+        oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "push_down");
+    }
+    // Copy contents of targetDirName into bak dir
+    struct dirent* entry;
+    while ((entry = ::readdir(dir)) != 0)
+    {
+        // Search for targetDirName in storeDirName
+        if (std::strcmp(entry->d_name, target_dir.c_str()) == 0)
+        {
+            std::ostringstream oldname;
+            oldname << dirname << "/" << target_dir;
+            std::ostringstream newname;
+            newname << bak_dir_name << "/" << target_dir;
+            if (::rename(oldname.str().c_str(), newname.str().c_str()))
+            {
+                std::ostringstream oss;
+                oss << "file=\"" << oldname.str() << "\" dest=\"" <<  newname.str() << "\"" << FORMAT_SYSERR(errno);
+                throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "jdir", "push_down");
+            }
+            break;
+        }
+    }
+    return bak_dir_name;
+}
+
 // === verify_dir ===
 
 void

Modified: store/trunk/cpp/lib/jrnl/jdir.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.hpp	2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/jrnl/jdir.hpp	2009-12-03 21:01:30 UTC (rev 3735)
@@ -153,7 +153,21 @@
                 const bool create_flag = true);
 
 
+
         /**
+         * \brief Move (push down) the directory target_dir located in directory dirname into a backup directory
+         * named _bak_dir_base.XXXX (note prepended underscore), where XXXX is an increasing hex serial number
+         * starting at 0000.
+         *
+         * \param dirname Full path to directory containing directory to be pushed down.
+         * \param target_dir Name of directory in dirname to be pushed down.
+         * \param bak_dir_base Base name for backup directory to be created in dirname, into which target_dir will be moved.
+         * \return Name of backup dir into which target_dir was pushed.
+         */
+        static std::string push_down(const std::string& dirname, const std::string& target_dir, const std::string& bak_dir_base);
+
+
+        /**
         * \brief Verify that dirname is a valid %journal directory.
         *
         * The validation reads the .%jinf file, and using this information verifies that all the expected %journal

Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize	2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/tools/resize	2009-12-03 21:01:30 UTC (rev 3735)
@@ -193,7 +193,7 @@
         elif self._fNum == self._jrnlInfo.getNumJrnlFiles() - 1: return False
         else: self._fNum += 1
         self._fileRecWrCnt = 0
-        self._fName = os.path.join(self._jrnlInfo.getJrnlDir(), "%s.%04d.jdat" % (self._jrnlInfo.getJrnlBaseName(), self._fNum))
+        self._fName = os.path.join(self._jrnlInfo.getJrnlDir(), "%s.%04x.jdat" % (self._jrnlInfo.getJrnlBaseName(), self._fNum))
         if self._opts.vflag: print "* Opening file %s" % self._fName,
         self._file = open(self._fName, "w")
         if rid == None or fro == None:



More information about the rhmessaging-commits mailing list