Author: kpvdr
Date: 2009-12-03 16:01:30 -0500 (Thu, 03 Dec 2009)
New Revision: 3735
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jdir.hpp
store/trunk/cpp/tools/resize
Log:
Updates to the persistence id recovery logic, also a new push down function (in jdir this
time) which creates a serialized backup dir instead of deleting the previous bak dir.
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-12-03 21:01:30 UTC (rev 3735)
@@ -373,33 +373,6 @@
}
}
-void MessageStoreImpl::pushDown(const std::string& dirName, const std::string&
targetDir, const std::string& bakDirName)
-{
- DIR* dir = ::opendir(dirName.c_str());
- if (dir)
- {
- std::ostringstream oss;
- oss << dirName << "/" << bakDirName;
- // Delete bak dir if it exists
- mrg::journal::jdir::delete_dir(oss.str());
- // Create new bak dir
- mrg::journal::jdir::create_dir(oss.str());
-
- // Copy contents of targetDir into bak dir
- struct dirent* entry;
- while ((entry = ::readdir(dir)) != 0) {
- // Search for targetDir in dirName
- if (std::strcmp(entry->d_name, targetDir.c_str()) == 0) {
- std::ostringstream oldname;
- oldname << dirName << "/" << targetDir;
- std::ostringstream newname;
- newname << oss.str() << "/" << targetDir;
- ::rename(oldname.str().c_str(), newname.str().c_str());
- }
- }
- }
-}
-
void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
{
if (isInit) {
@@ -415,12 +388,12 @@
if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
dbenv->close(0);
}
- if (pushDownStoreFiles)
- pushDown(storeDir, storeTopLevelDir, "cluster_bak");
- else {
- std::ostringstream oss;
- oss << storeDir << "/" << storeTopLevelDir;
- QPID_LOG(notice, "Store in " << oss.str() << "
truncated.");
+ std::ostringstream oss;
+ oss << storeDir << "/" << storeTopLevelDir;
+ if (pushDownStoreFiles) {
+ QPID_LOG(notice, "Store directory " << oss.str() << "
was pushed down into directory " << mrg::journal::jdir::push_down(storeDir,
storeTopLevelDir, "cluster") << ".");
+ } else {
+ QPID_LOG(notice, "Store directory " << oss.str() << "
was truncated.");
mrg::journal::jdir::delete_dir(oss.str().c_str());
}
init();
@@ -793,11 +766,12 @@
{
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
- u_int64_t thisHighestRid = 0;
+ u_int64_t thisHighestRid = 0ULL;
jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles,
jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id);
// start recovery
- // RFC 1982 comparison for unsigned 64-bit
- if (thisHighestRid - highestRid < 0x8000000000000000ULL)
+ if (highestRid == 0ULL)
highestRid = thisHighestRid;
+ else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982
comparison for unsigned 64-bit
+ highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
QPID_LOG(info, "Recovered queue \"" << queueName
<< "\": " << rcnt << " messages recovered; "
<< idcnt << " messages in-doubt.");
jQueue->recover_complete(); // start journal.
@@ -1188,11 +1162,12 @@
void MessageStoreImpl::recoverTplStore()
{
if (journal::jdir::exists(tplStorePtr->jrnl_dir() +
tplStorePtr->base_filename() + ".jinf")) {
- u_int64_t thisHighestRid;
+ u_int64_t thisHighestRid = 0ULL;
tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,
tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
- // RFC 1982 comparison for unsigned 64-bit
- if (thisHighestRid - highestRid < 0x8000000000000000ULL)
+ if (highestRid == 0ULL)
highestRid = thisHighestRid;
+ else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982
comparison for unsigned 64-bit
+ highestRid = thisHighestRid;
// Load tplRecoverMap by reading the TPL store
readTplStore();
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2009-12-03 21:01:30 UTC (rev 3735)
@@ -169,8 +169,6 @@
void init();
- void pushDown(const std::string& dir, const std::string& targetDir, const
std::string& bakDirName = "bak");
-
void recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
queue_index& index,
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2009-12-03 21:01:30 UTC (rev 3735)
@@ -932,9 +932,10 @@
throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl",
"check_owi");
}
- // RFC 1982 comparison for unsigned 64-bit
- if (h._rid - rd._h_rid < 0x8000000000000000ULL)
+ if (rd._h_rid == 0)
rd._h_rid = h._rid;
+ else if (h._rid - rd._h_rid < 0x8000000000000000ULL) // RFC 1982 comparison for
unsigned 64-bit
+ rd._h_rid = h._rid;
return true;
}
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2009-12-03 21:01:30 UTC (rev 3735)
@@ -168,7 +168,43 @@
close_dir(dir, dirname, "clear_dir");
}
+// === push_down ===
+std::string
+jdir::push_down(const std::string& dirname, const std::string& target_dir, const
std::string& bak_dir_base)
+{
+ std::string bak_dir_name = create_bak_dir(dirname, bak_dir_base);
+
+ DIR* dir = ::opendir(dirname.c_str());
+ if (!dir)
+ {
+ std::ostringstream oss;
+ oss << "dir=\"" << dirname <<
"\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir",
"push_down");
+ }
+ // Copy contents of targetDirName into bak dir
+ struct dirent* entry;
+ while ((entry = ::readdir(dir)) != 0)
+ {
+ // Search for targetDirName in storeDirName
+ if (std::strcmp(entry->d_name, target_dir.c_str()) == 0)
+ {
+ std::ostringstream oldname;
+ oldname << dirname << "/" << target_dir;
+ std::ostringstream newname;
+ newname << bak_dir_name << "/" << target_dir;
+ if (::rename(oldname.str().c_str(), newname.str().c_str()))
+ {
+ std::ostringstream oss;
+ oss << "file=\"" << oldname.str() <<
"\" dest=\"" << newname.str() << "\""
<< FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "jdir",
"push_down");
+ }
+ break;
+ }
+ }
+ return bak_dir_name;
+}
+
// === verify_dir ===
void
Modified: store/trunk/cpp/lib/jrnl/jdir.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.hpp 2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/lib/jrnl/jdir.hpp 2009-12-03 21:01:30 UTC (rev 3735)
@@ -153,7 +153,21 @@
const bool create_flag = true);
+
/**
+ * \brief Move (push down) the directory target_dir located in directory dirname
into a backup directory
+ * named _bak_dir_base.XXXX (note prepended underscore), where XXXX is an
increasing hex serial number
+ * starting at 0000.
+ *
+ * \param dirname Full path to directory containing directory to be pushed down.
+ * \param target_dir Name of directory in dirname to be pushed down.
+ * \param bak_dir_base Base name for backup directory to be created in dirname,
into which target_dir will be moved.
+ * \return Name of backup dir into which target_dir was pushed.
+ */
+ static std::string push_down(const std::string& dirname, const
std::string& target_dir, const std::string& bak_dir_base);
+
+
+ /**
* \brief Verify that dirname is a valid %journal directory.
*
* The validation reads the .%jinf file, and using this information verifies that
all the expected %journal
Modified: store/trunk/cpp/tools/resize
===================================================================
--- store/trunk/cpp/tools/resize 2009-12-03 15:42:14 UTC (rev 3734)
+++ store/trunk/cpp/tools/resize 2009-12-03 21:01:30 UTC (rev 3735)
@@ -193,7 +193,7 @@
elif self._fNum == self._jrnlInfo.getNumJrnlFiles() - 1: return False
else: self._fNum += 1
self._fileRecWrCnt = 0
- self._fName = os.path.join(self._jrnlInfo.getJrnlDir(), "%s.%04d.jdat"
% (self._jrnlInfo.getJrnlBaseName(), self._fNum))
+ self._fName = os.path.join(self._jrnlInfo.getJrnlDir(), "%s.%04x.jdat"
% (self._jrnlInfo.getJrnlBaseName(), self._fNum))
if self._opts.vflag: print "* Opening file %s" % self._fName,
self._file = open(self._fName, "w")
if rid == None or fro == None: