Author: kpvdr
Date: 2009-08-05 13:48:12 -0400 (Wed, 05 Aug 2009)
New Revision: 3545
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Completed truncate tidy-up, and added a --truncate option to the store.
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-08-05 13:53:56 UTC (rev 3544)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-08-05 17:48:12 UTC (rev 3545)
@@ -33,7 +33,7 @@
#include <dirent.h>
#define MAX_AIO_SLEEPS 1000 // ~1 second
-#define AIO_SLEEP_TIME 1000 // 1 milisecond
+#define AIO_SLEEP_TIME 1000 // 1 Millisecond
using namespace mrg::msgstore;
using namespace qpid::broker;
@@ -47,8 +47,7 @@
using qpid::management::ManagementAgent;
namespace _qmf = qmf::com::redhat::rhm::store;
-static const u_int8_t MESSAGE_MESSAGE = 1;
-static const u_int8_t BASIC_MESSAGE = 2;
+const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the
top-level store dir name
qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(10 *
qpid::sys::TIME_MSEC); // 10ms
qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC);
// 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
@@ -68,6 +67,7 @@
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
jrnlFsizeSblks(0),
+ truncateFlag(false),
wCachePgSizeSblks(0),
wCacheNumPages(0),
tplNumJrnlFiles(0),
@@ -253,13 +253,14 @@
chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles,
"auto-expand-max-jfiles", numJrnlFiles, "num-jfiles");
// Pass option values to init(...)
- return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSizeKib,
tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib, autoJrnlExpand,
autoJrnlExpandMaxFiles);
+ return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, opts->truncateFlag,
jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib,
autoJrnlExpand, autoJrnlExpandMaxFiles);
}
// These params, taken from options, are assumed to be correct and verified
bool MessageStoreImpl::init(const std::string& dir,
u_int16_t jfiles,
u_int32_t jfileSizePgs,
+ const bool truncateFlag,
u_int32_t wCachePageSizeKib,
u_int16_t tplJfiles,
u_int32_t tplJfileSizePgs,
@@ -282,7 +283,10 @@
autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
if (dir.size()>0) storeDir = dir;
- init();
+ if (truncateFlag)
+ truncateInit(false);
+ else
+ init();
QPID_LOG(notice, "Store module initialized; dir=" << dir);
QPID_LOG(info, "> Default files per journal: " << jfiles);
@@ -339,7 +343,6 @@
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(),
"tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
- journalList["TplStore"]=tplStorePtr.get();
txn.commit();
} catch (const journal::jexception& e) {
txn.abort();
@@ -357,6 +360,7 @@
void MessageStoreImpl::finalize()
{
+ if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
{
JournalImpl* jQueue = i->second;
@@ -391,21 +395,30 @@
}
}
-void MessageStoreImpl::discardInit(const bool pushDownStoreFiles)
+void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
{
if (isInit) {
+ if (journalList.size()) { // check no queues exist
+ std::ostringstream oss;
+ oss << "truncateInit() called with " <<
journalList.size() << " queues still in existence";
+ THROW_STORE_EXCEPTION(oss.str());
+ }
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
dbs.clear();
if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
dbenv->close(0);
- if (pushDownStoreFiles)
- pushDown(storeDir.c_str(), "cluster_bak");
- else
- mrg::journal::jdir::delete_dir(storeDir.c_str());
- init();
}
+ if (pushDownStoreFiles)
+ pushDown(storeDir.c_str(), "cluster_bak");
+ else {
+ QPID_LOG(notice, "Store in " << storeDir << "
truncated.");
+ std::ostringstream oss;
+ oss << storeDir << "/" << storeTopLevelDir;
+ mrg::journal::jdir::delete_dir(oss.str().c_str());
+ }
+ init();
}
void MessageStoreImpl::chkTplStoreInit()
@@ -451,19 +464,6 @@
mgmtObject->resourceDestroy();
}
-void MessageStoreImpl::truncate()
-{
- DbTxn* txn;
- dbenv->txn_begin(0, &txn, 0);
- u_int32_t count;
-
- for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
- (*i)->truncate(txn, &count, 0);
- }
-
- txn->commit(0);
-}
-
void MessageStoreImpl::create(PersistableQueue& queue,
const FieldTable& args)
{
@@ -1815,22 +1815,22 @@
std::string MessageStoreImpl::getJrnlBaseDir()
{
- std::stringstream dir;
- dir << storeDir << "/rhm/jrnl/" ;
+ std::ostringstream dir;
+ dir << storeDir << "/" << storeTopLevelDir <<
"/jrnl/" ;
return dir.str();
}
std::string MessageStoreImpl::getBdbBaseDir()
{
- std::stringstream dir;
- dir << storeDir << "/rhm/dat/" ;
+ std::ostringstream dir;
+ dir << storeDir << "/" << storeTopLevelDir <<
"/dat/" ;
return dir.str();
}
std::string MessageStoreImpl::getTplBaseDir()
{
- std::stringstream dir;
- dir << storeDir << "/rhm/tpl/" ;
+ std::ostringstream dir;
+ dir << storeDir << "/" << storeTopLevelDir <<
"/tpl/" ;
return dir.str();
}
@@ -1863,6 +1863,7 @@
autoJrnlExpand(defAutoJrnlExpand),
autoJrnlExpandMaxFiles(defAutoJrnlExpandMaxFiles),
jrnlFsizePgs(defJrnlFileSizePgs),
+ truncateFlag(defTruncateFlag),
wCachePageSizeKib(defWCachePageSize),
tplNumJrnlFiles(defTplNumJrnlFiles),
tplJrnlFsizePgs(defTplJrnlFileSizePgs),
@@ -1882,6 +1883,9 @@
// "Maximum number of journal files allowed from auto-expanding;
must be greater than --num-jfiles parameter.")
("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
"Default size for each journal file in multiples of read pages (1
read page = 64kiB)")
+ ("truncate", qpid::optValue(truncateFlag, "yes|no"),
+ "If yes|true|1, will truncate the store (discard any existing
records). If no|false|0, will preserve "
+ "the existing store files for recovery.")
("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"),
"Size of the pages in the write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2009-08-05 13:53:56 UTC (rev 3544)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2009-08-05 17:48:12 UTC (rev 3545)
@@ -65,6 +65,7 @@
bool autoJrnlExpand;
u_int16_t autoJrnlExpandMaxFiles;
u_int32_t jrnlFsizePgs;
+ bool truncateFlag;
u_int32_t wCachePageSizeKib;
u_int16_t tplNumJrnlFiles;
u_int32_t tplJrnlFsizePgs;
@@ -99,6 +100,7 @@
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
static const u_int32_t defJrnlFileSizePgs = 24;
+ static const bool defTruncateFlag = false;
static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE *
JRNL_SBLK_SIZE / 1024;
static const u_int16_t defTplNumJrnlFiles = 8;
static const u_int32_t defTplJrnlFileSizePgs = 24;
@@ -107,6 +109,10 @@
static const bool defAutoJrnlExpand = false;
static const u_int16_t defAutoJrnlExpandMaxFiles = 0;
+ static const std::string storeTopLevelDir;
+ static qpid::sys::Duration defJournalGetEventsTimeout;
+ static qpid::sys::Duration defJournalFlushTimeout;
+
std::list<db_ptr> dbs;
dbEnv_ptr dbenv;
db_ptr queueDb;
@@ -131,6 +137,7 @@
bool autoJrnlExpand;
u_int16_t autoJrnlExpandMaxFiles;
u_int32_t jrnlFsizeSblks;
+ bool truncateFlag;
u_int32_t wCachePgSizeSblks;
u_int16_t wCacheNumPages;
u_int16_t tplNumJrnlFiles;
@@ -140,8 +147,6 @@
u_int64_t highestRid;
bool isInit;
const char* envPath;
- static qpid::sys::Duration defJournalGetEventsTimeout;
- static qpid::sys::Duration defJournalFlushTimeout;
qmf::com::redhat::rhm::store::Store* mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
@@ -292,6 +297,7 @@
bool init(const std::string& dir,
u_int16_t jfiles = defNumJrnlFiles,
u_int32_t jfileSizePgs = defJrnlFileSizePgs,
+ const bool truncateFlag = false,
u_int32_t wCachePageSize = defWCachePageSize,
u_int16_t tplJfiles = defTplNumJrnlFiles,
u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,
@@ -299,14 +305,12 @@
bool autoJExpand = defAutoJrnlExpand,
u_int16_t autoJExpandMaxFiles = defAutoJrnlExpandMaxFiles);
- void discardInit(const bool pushDownStoreFiles = false);
+ void truncateInit(const bool pushDownStoreFiles = false);
void initManagement (qpid::broker::Broker* broker);
void finalize();
- void truncate();
-
void create(qpid::broker::PersistableQueue& queue,
const qpid::framing::FieldTable& args);
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2009-08-05 13:53:56 UTC (rev 3544)
+++ store/trunk/cpp/tests/Makefile.am 2009-08-05 17:48:12 UTC (rev 3545)
@@ -29,7 +29,7 @@
TMP_DATA_DIR=$(abs_srcdir)/tmp_data_dir
if DO_CLUSTER_TESTS
-SUBDIRS = jrnl cluster .
+SUBDIRS = jrnl . cluster
else
SUBDIRS = jrnl .
endif
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2009-08-05 13:53:56 UTC (rev 3544)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2009-08-05 17:48:12 UTC (rev 3545)
@@ -60,8 +60,7 @@
void setup()
{
store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
- store->init(test_dir, 4, 1, 8);
- store->truncate();
+ store->init(test_dir, 4, 1, true); // truncate store
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
FieldTable settings;
@@ -100,7 +99,7 @@
store.reset();
store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
- store->init(test_dir, 4, 1, 8);
+ store->init(test_dir, 4, 1);
ExchangeRegistry exchanges;
LinkRegistry links;
sys::Timer t;
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2009-08-05 13:53:56 UTC (rev 3544)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2009-08-05 17:48:12 UTC (rev 3545)
@@ -93,8 +93,7 @@
{
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
store.create(*exchange, qpid::framing::FieldTable());
@@ -104,7 +103,7 @@
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
@@ -119,7 +118,7 @@
}
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
@@ -142,8 +141,7 @@
cout << test_filename << ".CreateDelete: " << flush;
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -159,8 +157,7 @@
{
cout << test_filename << ".EmptyRecover: " << flush;
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -177,8 +174,7 @@
string name("MyDurableQueue");
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
BOOST_REQUIRE(queue.getPersistenceId());
@@ -186,7 +182,7 @@
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -206,8 +202,7 @@
string name("MyDurableQueue");
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
FieldTable settings;
policy->update(settings);
@@ -216,7 +211,7 @@
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -237,15 +232,14 @@
string name("MyDurableQueue");
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
store.destroy(queue);
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -271,8 +265,7 @@
string data2("hijklmn");
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
queue->create(settings);
@@ -290,7 +283,7 @@
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -331,8 +324,7 @@
Uuid messageId(true);
string data("abcdefg");
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
queue->create(settings);
@@ -348,7 +340,7 @@
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
recover(store, registry);
@@ -372,8 +364,7 @@
const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
//create & stage a message
boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange,
routingKey, messageId, (data1.size() + data2.size()));
@@ -414,7 +405,7 @@
{
//recover
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
ExchangeRegistry exchanges;
@@ -468,8 +459,7 @@
cout << test_filename << ".DestroyStagedMessage: " <<
flush;
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
const string data("abcdefg");
boost::intrusive_ptr<Message>
msg(MessageUtils::createMessage("my_exchange", "my_routing_key",
"my_message", data.length()));
@@ -496,8 +486,7 @@
cout << test_filename << ".DestroyEnqueuedMessage: " <<
flush;
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
const string data("abcdefg");
boost::intrusive_ptr<Message>
msg(MessageUtils::createMessage("my_exchange", "my_routing_key",
"my_message", data.length()));
@@ -532,8 +521,7 @@
args.setString("a", "A");
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
store.create(*exchange, qpid::framing::FieldTable());
@@ -542,7 +530,7 @@
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
ExchangeRegistry registry;
recover(store, registry);
@@ -556,7 +544,7 @@
}
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
ExchangeRegistry registry;
recover(store, registry);
@@ -604,8 +592,7 @@
FieldTable args;
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
- store.truncate();//make sure it is empty to begin with
+ store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
@@ -619,7 +606,7 @@
}//db will be closed
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
@@ -636,7 +623,7 @@
}
{
MessageStoreImpl store;
- store.init(test_dir, 4, 1, 8);
+ store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
LinkRegistry links;
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2009-08-05 13:53:56 UTC (rev 3544)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2009-08-05 17:48:12 UTC (rev 3545)
@@ -109,8 +109,7 @@
void setup()
{
store = std::auto_ptr<T>(new T());
- store->init(test_dir, 4, 1, 8);
- store->truncate();
+ store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
FieldTable settings;
@@ -129,7 +128,7 @@
store.reset();
store = std::auto_ptr<T>(new T());
- store->init(test_dir, 4, 1, 8);
+ store->init(test_dir, 4, 1);
queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
LinkRegistry links;
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2009-08-05 13:53:56 UTC (rev 3544)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2009-08-05 17:48:12 UTC (rev 3545)
@@ -326,8 +326,7 @@
void setup()
{
store = std::auto_ptr<T>(new T());
- store->init(test_dir, 4, 1, 8);
- store->truncate();
+ store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
FieldTable settings;
@@ -355,7 +354,7 @@
links.reset();
store = std::auto_ptr<T>(new T());
- store->init(test_dir, 4, 1, 8);
+ store->init(test_dir, 4, 1);
sys::Timer t;
ExchangeRegistry exchanges;
queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);