[rhmessaging-commits] rhmessaging commits: r3368 - in store/trunk/cpp: lib/jrnl and 2 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri May 8 10:27:05 EDT 2009
Author: kpvdr
Date: 2009-05-08 10:27:05 -0400 (Fri, 08 May 2009)
New Revision: 3368
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/lib/TxnCtxt.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/tests/python_tests/cluster_basic.py
store/trunk/cpp/tests/run_python_tests
Log:
Fix for BZ 483807 - "resolve join state for store recover in cluster for joining nodes" and its dups. This checkin syncs with qpid checkin 773004.
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-05-08 14:27:05 UTC (rev 3368)
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qmf/com/redhat/rhm/store/Package.h"
#include "StoreException.h"
+#include <dirent.h>
#define MAX_AIO_SLEEPS 1000 // ~1 second
#define AIO_SLEEP_TIME 1000 // 1 milisecond
@@ -63,7 +64,6 @@
{}
MessageStoreImpl::MessageStoreImpl(const char* envpath) :
- env(0),
numJrnlFiles(0),
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
@@ -273,15 +273,35 @@
tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
autoJrnlExpand = autoJExpand;
autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
-
if (dir.size()>0) storeDir = dir;
+ init();
+
+ QPID_LOG(notice, "Store module initialized; dir=" << dir);
+ QPID_LOG(info, "> Default files per journal: " << jfiles);
+// TODO: Uncomment these lines when auto-expand is enabled.
+// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
+// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
+ QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
+ QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
+ QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
+ QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
+ QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
+ QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (Kib)");
+ QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
+
+ return isInit;
+}
+
+void MessageStoreImpl::init()
+{
journal::jdir::create_dir(getBdbBaseDir());
try {
- env.set_errpfx("msgstore");
- env.set_lg_regionmax(256000); // default = 65000
- env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
+ dbenv.reset(new DbEnv(0));
+ dbenv->set_errpfx("msgstore");
+ dbenv->set_lg_regionmax(256000); // default = 65000
+ dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
@@ -295,15 +315,15 @@
// Databases are constructed here instead of the constructor so that the DB_RECOVER flag can be used
// against the database environment. Recover can only be performed if no databases have been created
// against the environment at the time of recovery, as recovery invalidates the environment.
- queueDb.reset(new Db(&env, 0));
- configDb.reset(new Db(&env, 0));
- exchangeDb.reset(new Db(&env, 0));
- messageDb.reset(new Db(&env, 0));
- mappingDb.reset(new Db(&env, 0));
- bindingDb.reset(new Db(&env, 0));
- generalDb.reset(new Db(&env, 0));
+ queueDb.reset(new Db(dbenv.get(), 0));
+ configDb.reset(new Db(dbenv.get(), 0));
+ exchangeDb.reset(new Db(dbenv.get(), 0));
+ messageDb.reset(new Db(dbenv.get(), 0));
+ mappingDb.reset(new Db(dbenv.get(), 0));
+ bindingDb.reset(new Db(dbenv.get(), 0));
+ generalDb.reset(new Db(dbenv.get(), 0));
- txn.begin(env, false);
+ txn.begin(dbenv.get(), false);
open(queueDb, txn.get(), "queues.db", false);
open(configDb, txn.get(), "config.db", false);
open(exchangeDb, txn.get(), "exchanges.db", false);
@@ -325,21 +345,52 @@
}
isInit = true;
- QPID_LOG(notice, "Store module initialized; dir=" << dir);
- QPID_LOG(info, "> Default files per journal: " << jfiles);
-// TODO: Uncomment these lines when auto-expand is enabled.
-// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
-// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
- QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
- QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
- QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
- QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
- QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
- QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (Kib)");
- QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
- return true;
}
+void MessageStoreImpl::pushDown(const char* dirName, const char* bakDirName)
+{
+ DIR* dir = ::opendir(dirName);
+ if (dir)
+ {
+ std::ostringstream oss;
+ oss << dirName << "/" << bakDirName;
+ // Delete bak dir if it exists
+ mrg::journal::jdir::delete_dir(oss.str());
+ // Create new bak dir
+ mrg::journal::jdir::create_dir(oss.str());
+
+ // Copy contents of current dir into bak dir
+ struct dirent* entry;
+ while ((entry = ::readdir(dir)) != 0) {
+ // Ignore . and .. and backup dir
+ if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0 && std::strcmp(entry->d_name, bakDirName)) {
+ std::ostringstream oldname;
+ oldname << dirName << "/" << entry->d_name;
+ std::ostringstream newname;
+ newname << oss.str() << "/" << entry->d_name;
+ ::rename(oldname.str().c_str(), newname.str().c_str());
+ }
+ }
+ }
+}
+
+void MessageStoreImpl::discardInit(const bool pushDownStoreFiles)
+{
+ if (isInit) {
+ for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
+ (*i)->close(0);
+ }
+ dbs.clear();
+ if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
+ dbenv->close(0);
+ if (pushDownStoreFiles)
+ pushDown(storeDir.c_str(), "cluster_bak");
+ else
+ mrg::journal::jdir::delete_dir(storeDir.c_str());
+ init();
+ }
+}
+
void MessageStoreImpl::chkTplStoreInit()
{
if (!tplStorePtr->is_ready()) {
@@ -386,7 +437,7 @@
void MessageStoreImpl::truncate()
{
DbTxn* txn;
- env.txn_begin(0, &txn, 0);
+ dbenv->txn_begin(0, &txn, 0);
u_int32_t count;
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
@@ -533,7 +584,7 @@
int status;
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE);
txn.commit();
@@ -565,7 +616,7 @@
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
put(bindingDb, txn.get(), key, value);
txn.commit();
@@ -595,7 +646,7 @@
message_index messages;//id->message
TxnCtxt txn;
- txn.begin(env, false);
+ txn.begin(dbenv.get(), false);
try {
//read all queues, calls recoversMessages
recoverQueues(txn, registry, queues, prepared, messages);
@@ -969,7 +1020,7 @@
value.buffer.record();
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
if (messageDb->get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
txn.abort();
@@ -1159,7 +1210,7 @@
{
checkInit();
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
u_int64_t messageId (msg->getPersistenceId());
if (messageId == 0 || !msg->isContentReleased()) {
@@ -1183,7 +1234,7 @@
if (messageId) {
Dbt key (&messageId, sizeof(messageId));
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
deleteIfUnused(txn.get(), key);
txn.commit();
@@ -1202,7 +1253,7 @@
{
u_int64_t ret = 0;
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
ret = getRecordSize(txn.get(), db, key);
txn.commit();
@@ -1238,7 +1289,7 @@
u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
Dbt key (&messageId, sizeof(messageId));
u_int64_t offset = getRecordSize(messageDb, key);
@@ -1289,7 +1340,7 @@
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
}
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
Dbt key (&messageId, sizeof(messageId));
char *buffer = new char[length];
@@ -1674,7 +1725,7 @@
void MessageStoreImpl::deleteBindingsForQueue(const PersistableQueue& queue)
{
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
{
Cursor bindings;
@@ -1710,7 +1761,7 @@
const std::string& bkey)
{
TxnCtxt txn;
- txn.begin(env, true);
+ txn.begin(dbenv.get(), true);
try {
{
Cursor bindings;
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2009-05-08 14:27:05 UTC (rev 3368)
@@ -55,6 +55,8 @@
{
public:
typedef boost::shared_ptr<Db> db_ptr;
+ typedef boost::shared_ptr<DbEnv> dbEnv_ptr;
+
struct StoreOptions : public qpid::Options {
StoreOptions(const std::string& name="Store Options");
std::string clusterName;
@@ -101,7 +103,7 @@
static const u_int16_t defAutoJrnlExpandMaxFiles = 16;
std::list<db_ptr> dbs;
- DbEnv env;
+ dbEnv_ptr dbenv;
db_ptr queueDb;
db_ptr configDb;
db_ptr exchangeDb;
@@ -152,6 +154,10 @@
const u_int16_t numJrnlFiles,
const std::string& numJrnlFilesParamName);
+ void init();
+
+ void pushDown(const char* dir, const char* bakDirName = "bak");
+
void recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
queue_index& index,
@@ -249,7 +255,7 @@
std::string getTplBaseDir();
inline void checkInit() {
// TODO: change the default dir to ~/.qpidd
- if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ if (!isInit) { init("/tmp"); isInit = true; }
}
void chkTplStoreInit();
@@ -285,6 +291,8 @@
bool autoJExpand = defAutoJrnlExpand,
u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles);
+ void discardInit(const bool pushDownStoreFiles = false);
+
void initManagement (qpid::broker::Broker* broker);
void truncate();
Modified: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp 2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/TxnCtxt.cpp 2009-05-08 14:27:05 UTC (rev 3368)
@@ -112,8 +112,8 @@
}
}
-void TxnCtxt::begin(DbEnv& env, bool sync) {
- env.txn_begin(0, &txn, 0);
+void TxnCtxt::begin(DbEnv* env, bool sync) {
+ env->txn_begin(0, &txn, 0);
if (sync)
globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/TxnCtxt.h 2009-05-08 14:27:05 UTC (rev 3368)
@@ -29,7 +29,7 @@
#include <memory>
#include <set>
#include <string>
-
+
#include "DataTokenImpl.h"
#include "IdSequence.h"
#include "JournalImpl.h"
@@ -83,7 +83,7 @@
*/
void sync();
void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
- void begin(DbEnv& env, bool sync = false);
+ void begin(DbEnv* env, bool sync = false);
void commit();
void abort();
DbTxn* get();
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2009-05-08 14:27:05 UTC (rev 3368)
@@ -242,12 +242,12 @@
DIR* dir = ::opendir(dirname.c_str());
if (!dir)
{
- if (errno == ENOENT) // dir does not exist.
- return;
+ if (errno == ENOENT) // dir does not exist.
+ return;
- std::ostringstream oss;
- oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "delete_dir");
+ std::ostringstream oss;
+ oss << "dir=\"" << dirname << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_JDIR_OPENDIR, oss.str(), "jdir", "delete_dir");
}
else
{
Modified: store/trunk/cpp/tests/python_tests/cluster_basic.py
===================================================================
--- store/trunk/cpp/tests/python_tests/cluster_basic.py 2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/tests/python_tests/cluster_basic.py 2009-05-08 14:27:05 UTC (rev 3368)
@@ -79,33 +79,32 @@
self.killAllClusters()
raise
-# TODO: Un-comment this when the "Exchange already exists: amq.direct" error is fixed
-# def test_Cluster_04_SingleClusterRemoveRestoreNodes(self):
-# if self._runClusterTests == None:
-# print "skipped"
-# return
-# try:
-# clusterName = "test_Cluster_04_SingleClusterRemoveRestoreNodes"
-# self.createCheckCluster(clusterName, 6)
-# self.checkNumBrokers(6)
-# self.killNode(1, clusterName)
-# self.killNode(3, clusterName)
-# self.killNode(4, clusterName)
-# self.checkNumBrokers(3)
-# self.createClusterNode(1, clusterName)
-# self.createClusterNode(3, clusterName)
-# self.createClusterNode(4, clusterName)
-# self.checkNumClusterBrokers(clusterName, 6)
-# self.killNode(2, clusterName)
-# self.killNode(3, clusterName)
-# self.killNode(4, clusterName)
-# self.checkNumBrokers(3)
-# self.createClusterNode(2, clusterName)
-# self.createClusterNode(3, clusterName)
-# self.createClusterNode(4, clusterName)
-# self.checkNumClusterBrokers(clusterName, 6)
-# self.stopCheckAll()
-# except:
-# self.killAllClusters()
-# raise
+ def test_Cluster_04_SingleClusterRemoveRestoreNodes(self):
+ if self._runClusterTests == None:
+ print "skipped"
+ return
+ try:
+ clusterName = "test_Cluster_04_SingleClusterRemoveRestoreNodes"
+ self.createCheckCluster(clusterName, 6)
+ self.checkNumBrokers(6)
+ self.killNode(1, clusterName)
+ self.killNode(3, clusterName)
+ self.killNode(4, clusterName)
+ self.checkNumBrokers(3)
+ self.createClusterNode(1, clusterName)
+ self.createClusterNode(3, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 6)
+ self.killNode(2, clusterName)
+ self.killNode(3, clusterName)
+ self.killNode(4, clusterName)
+ self.checkNumBrokers(3)
+ self.createClusterNode(2, clusterName)
+ self.createClusterNode(3, clusterName)
+ self.createClusterNode(4, clusterName)
+ self.checkNumClusterBrokers(clusterName, 6)
+ self.stopCheckAll()
+ except:
+ self.killAllClusters()
+ raise
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2009-05-08 13:26:35 UTC (rev 3367)
+++ store/trunk/cpp/tests/run_python_tests 2009-05-08 14:27:05 UTC (rev 3368)
@@ -22,7 +22,15 @@
# The GNU Lesser General Public License is available in the file COPYING.
if test -z ${QPID_DIR} ; then
- echo "WARNING: QPID_DIR not set, skipping python tests."
+ cat <<EOF
+
+ =========== WARNING: PYTHON TESTS DISABLED ==============
+
+ QPID_DIR not set.
+
+ ===========================================================
+
+EOF
exit
fi
@@ -33,9 +41,18 @@
PYTHON_TESTS=python_tests
FAILING_PYTHON_TESTS=${abs_srcdir}/failing_python_tests.txt
else
- echo "WARNING: Unable to load python qpid module - skipping python tests."
- echo " QPID_DIR=${QPID_DIR}"
- echo " PYTHONPATH=${PYTHONPATH}"
+ cat <<EOF
+
+ =========== WARNING: PYTHON TESTS DISABLED ==============
+
+ Unable to load python qpid module - skipping python tests.
+
+ QPID_DIR=${QPID_DIR}"
+ PYTHONPATH=${PYTHONPATH}"
+
+ ===========================================================
+
+EOF
exit
fi
@@ -66,7 +83,7 @@
$NOAISEXEC
===========================================================
-
+
EOF
else
export RUN_CLUSTER_TESTS=1
More information about the rhmessaging-commits
mailing list