rhmessaging commits: r2211 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-23 15:00:33 -0400 (Wed, 23 Jul 2008)
New Revision: 2211
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
Small enhancements and optimizations for transactional performance
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-23 18:46:28 UTC (rev 2210)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-23 19:00:33 UTC (rev 2211)
@@ -536,7 +536,8 @@
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTxnFlag = citr->second.deq_flag;
- RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, txn);
+ RecoverableTransaction::shared_ptr dtx;
+ if (!incomplTxnFlag) dtx = registry.recoverTransaction(i->xid, txn);
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
@@ -915,7 +916,8 @@
if (!tplStorePtr->is_ready())
getTplRecoverMap(tplRecoverMap);
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- xids.insert(i->first);
+ if (!i->second.deq_flag) // Discard all txns that are to be rolled forward/back
+ xids.insert(i->first);
}
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-07-23 18:46:28 UTC (rev 2210)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-23 19:00:33 UTC (rev 2211)
@@ -86,7 +86,7 @@
try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
- jc->flush(true);
+ if (isTPC()) sync();
} else {
jc->txn_abort(dtokp.get(), getXid());
}
@@ -114,8 +114,8 @@
virtual ~TxnCtxt() { if(txn) abort(); }
-#define MAX_SYNC_SLEEPS 1000 // ~1 second
-#define SYNC_SLEEP_TIME 1000 // 1 milisecond
+#define MAX_SYNC_SLEEPS 5000 // ~1 second
+#define SYNC_SLEEP_TIME 200 // 0.2 ms
void sync() {
bool allWritten = false;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-07-23 18:46:28 UTC (rev 2210)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-07-23 19:00:33 UTC (rev 2211)
@@ -510,8 +510,8 @@
ji.write();
}
-#define AIO_CMPL_SLEEP 1000 // 1 ms
-#define MAX_AIO_CMPL_SLEEPS 10000 // Total: 10 sec
+#define AIO_CMPL_SLEEP 200 // 0.2 ms
+#define MAX_AIO_CMPL_SLEEPS 50000 // Total: 10 sec
void
jcntl::aio_cmpl_wait()
16 years, 5 months
rhmessaging commits: r2210 - in store/branches/mrg-1.0/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-23 14:46:28 -0400 (Wed, 23 Jul 2008)
New Revision: 2210
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
Log:
Small enhancements and optimizations for transactional performance
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-21 11:44:45 UTC (rev 2209)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-23 18:46:28 UTC (rev 2210)
@@ -519,7 +519,8 @@
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTxnFlag = citr->second.deq_flag;
- RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, txn);
+ RecoverableTransaction::shared_ptr dtx;
+ if (!incomplTxnFlag) dtx = registry.recoverTransaction(i->xid, txn);
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
@@ -890,7 +891,8 @@
if (!tplStorePtr->is_ready())
getTplRecoverMap(tplRecoverMap);
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- xids.insert(i->first);
+ if (!i->second.deq_flag) // Discard all txns that are to be rolled forward/back
+ xids.insert(i->first);
}
}
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-21 11:44:45 UTC (rev 2209)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-23 18:46:28 UTC (rev 2210)
@@ -86,7 +86,7 @@
try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
- jc->flush(true);
+ if (isTPC()) sync();
} else {
jc->txn_abort(dtokp.get(), getXid());
}
@@ -114,8 +114,8 @@
virtual ~TxnCtxt() { if(txn) abort(); }
-#define MAX_SYNC_SLEEPS 1000 // ~1 second
-#define SYNC_SLEEP_TIME 1000 // 1 milisecond
+#define MAX_SYNC_SLEEPS 5000 // ~1 second
+#define SYNC_SLEEP_TIME 200 // 0.2 ms
void sync() {
bool allWritten = false;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-21 11:44:45 UTC (rev 2209)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-23 18:46:28 UTC (rev 2210)
@@ -510,8 +510,8 @@
ji.write();
}
-#define AIO_CMPL_SLEEP 1000 // 1 ms
-#define MAX_AIO_CMPL_SLEEPS 10000 // Total: 10 sec
+#define AIO_CMPL_SLEEP 200 // 0.2 ms
+#define MAX_AIO_CMPL_SLEEPS 50000 // Total: 10 sec
void
jcntl::aio_cmpl_wait()
16 years, 5 months
rhmessaging commits: r2209 - store/branches/mrg-1.0/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-21 07:44:45 -0400 (Mon, 21 Jul 2008)
New Revision: 2209
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
Log:
Backport of trunk r.2208: Corrected lock placed in TPL store late initialization check which takes the lock unnecessarily and effectively single-threads all transactions.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-21 11:32:45 UTC (rev 2208)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-21 11:44:45 UTC (rev 2209)
@@ -262,10 +262,13 @@
void BdbMessageStore::chkTplStoreInit()
{
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- journal::jdir::create_dir(getTplBaseDir());
- if (!tplStorePtr->is_ready()) {
- tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ if (!tplStorePtr->is_ready())
+ {
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ if (!tplStorePtr->is_ready()) {
+ journal::jdir::create_dir(getTplBaseDir());
+ tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ }
}
}
16 years, 5 months
rhmessaging commits: r2208 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-21 07:32:45 -0400 (Mon, 21 Jul 2008)
New Revision: 2208
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Corrected lock placed in TPL store late initialization check which takes the lock unnecessarily and effectively single-threads all transactions.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-18 13:30:50 UTC (rev 2207)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-21 11:32:45 UTC (rev 2208)
@@ -275,11 +275,13 @@
void BdbMessageStore::chkTplStoreInit()
{
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
if (!tplStorePtr->is_ready()) {
- journal::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
- if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ if (!tplStorePtr->is_ready()) {
+ journal::jdir::create_dir(getTplBaseDir());
+ tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
+ }
}
}
16 years, 5 months
rhmessaging commits: r2207 - in store/trunk: cpp/lib/gen/qpid/management and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-18 09:30:50 -0400 (Fri, 18 Jul 2008)
New Revision: 2207
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.h
store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/specs/management-schema.xml
Log:
Added new management functionality for the Transaction Prepared List (TPL) instance in BdbMessageStore. Journal instances now keep track of transactional records. Also additional tidy-up on new tpl code. Renamed some functions and variables to keep naming consistent around the Transaction Prepared List (TPL) name. Deleted an old BDB function that escaped detection from a previous BDB cleanup.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -50,7 +50,7 @@
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
const bool _commit_flag) :
rid(_rid),
@@ -167,15 +167,20 @@
{
if (broker != 0) {
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-
- if (agent != 0)
- {
+ if (agent != 0) {
qpid::management::PackageMrgstore packageInitializer(agent);
- mgmtObject = new qpid::management::Store (agent, this, broker);
+ mgmtObject = new qpid::management::Store(agent, this, broker);
mgmtObject->set_location(storeDir);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
+ mgmtObject->set_tplIsInitialized(false);
+ mgmtObject->set_tplDirectory(getTplBaseDir());
+ mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ mgmtObject->set_tplWritePages(tplWCacheNumPages);
+ mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles);
+ mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles);
agent->addObject(mgmtObject, 50, 1);
}
@@ -221,7 +226,6 @@
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
- journal::jdir::create_dir(getPxidBaseDir());
try {
env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
@@ -243,11 +247,11 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
+ tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
txn.commit();
} catch (const journal::jexception& e) {
txn.abort();
- THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
+ THROW_STORE_EXCEPTION_2("Error opening tplStore instance", e.what());
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -257,16 +261,25 @@
}
isInit = true;
- QPID_LOG(info, "BdbMessageStore module initialized: store-dir=" << dir << "; Default files per journal: " << jfiles <<
- "; Default jrournal file size: " << jfileSizePgs << "(wpgs); Default write cache page size: " << wCachePageSizeKib << "(Kib)");
+ QPID_LOG(notice, "Store module initialized; dir=" << dir);
+ QPID_LOG(info, "> Default files per journal: " << jfiles);
+ QPID_LOG(info, "> Default jrournal file size: " << jfileSizePgs << " (wpgs)");
+ QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
+ QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
+ QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
+ QPID_LOG(info, "> TPL jrournal file size: " << tplJfileSizePgs << " (wpgs)");
+ QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (Kib)");
+ QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
return true;
}
-void BdbMessageStore::chkInitPreparedXidStore()
+void BdbMessageStore::chkTplStoreInit()
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- if (!preparedXidStorePtr->is_ready()) {
- preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages);
+ if (!tplStorePtr->is_ready()) {
+ journal::jdir::create_dir(getTplBaseDir());
+ tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -286,7 +299,7 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
- if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
+ if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
@@ -314,7 +327,7 @@
txn->commit(0);
try {
journal::jdir::delete_dir(getJrnlBaseDir(),true);
- journal::jdir::delete_dir(getPxidBaseDir(),true);
+ journal::jdir::delete_dir(getTplBaseDir(),true);
}
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -475,7 +488,7 @@
{
checkInit();
txn_list prepared;
- recoverXids(prepared);
+ recoverLockedMappings(prepared);
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
@@ -506,14 +519,18 @@
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
std::auto_ptr<TPCTransactionContext> txn(tpcc);
- tpcc->prepare(preparedXidStorePtr.get());
+ tpcc->prepare(tplStorePtr.get());
+ if (mgmtObject != 0) {
+ mgmtObject->inc_tplTransactionDepth();
+ mgmtObject->inc_tplTxnPrepares();
+ }
// Restore data token state in TxnCtxt
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
- if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
tpcc->recoverDtok(citr->second.rid, i->xid);
- // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+ // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTxnFlag = citr->second.deq_flag;
@@ -531,8 +548,16 @@
}
}
- if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
+ if (incomplTxnFlag) {
+ tpcc->complete(citr->second.commit_flag);
+ } else {
+ if (mgmtObject != 0) {
+ mgmtObject->inc_tplTransactionDepth();
+ mgmtObject->inc_tplTxnPrepares();
+ }
+ }
}
+
registry.recoveryComplete();
}
@@ -582,8 +607,8 @@
maxQueueId = max(key.id, maxQueueId);
}
- // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
- // the messageIdSequence is used for both queue journals and the preparedXid journal.
+ // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
+ // the messageIdSequence is used for both queue journals and the tpl journal.
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
@@ -670,7 +695,6 @@
generalIdSequence.reset(maxGeneralId + 1);
}
-
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
@@ -731,8 +755,8 @@
if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
std::string xid((char*)xidbuff, xidbuffSize);
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
- if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
+ if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
if (citr->second.commit_flag)
queue->recover(msg);
@@ -794,8 +818,6 @@
return recovery.recoverMessage(header.buffer);
}
-
-
int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
IdDbt& msgId,
RecoverableMessage::shared_ptr& msg,
@@ -827,26 +849,26 @@
return count;
}
-void BdbMessageStore::recoverPreparedXidJournal()
+void BdbMessageStore::recoverTplStore()
{
- if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+ if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- preparedXidStorePtr->recover_complete(); // start journal.
+ tplStorePtr->recover_complete(); // start journal.
}
}
-void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
{
- if (preparedXidStorePtr.get()) {
- if (!preparedXidStorePtr->is_ready())
- recoverPreparedXidJournal();
+ if (tplStorePtr.get()) {
+ if (!tplStorePtr->is_ready())
+ recoverTplStore();
// TODO: The journal will return a const txn_map and the txn_map will support
// const operations at some point. Using non-const txn_map this way is ugly...
- journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
std::vector<std::string> xidList;
tmap.xid_list(xidList);
for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
@@ -866,18 +888,18 @@
}
assert(enqCnt == 1);
assert(deqCnt <= 1);
- prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
}
}
}
-void BdbMessageStore::recoverXids(txn_list& txns)
+void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
// Abort unprepaired xids and populate the locked maps
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -886,25 +908,11 @@
}
}
-void BdbMessageStore::readLockedMappings(Db& db,
- txn_lock_map& mappings)
-{
- Cursor c;
- c.open(db, 0);
-
- Dbt key;
- IdPairDbt value;
- while (c.next(key, value)) {
- std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
- LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
- }
-}
-
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
xids.insert(i->first);
}
}
@@ -1251,16 +1259,24 @@
bool commit)
{
try {
+ chkTplStoreInit(); // Late initialize (if needed)
+
// Nothing to do if not prepared
- chkInitPreparedXidStore();
if (txn.getDtok()->is_enqueued()) {
txn.incrDtokRef();
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
+ tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
+ if (mgmtObject != 0) {
+ mgmtObject->dec_tplTransactionDepth();
+ if (commit)
+ mgmtObject->inc_tplTxnCommits();
+ else
+ mgmtObject->inc_tplTxnAborts();
+ }
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
throw;
@@ -1293,15 +1309,20 @@
void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
{
try {
- chkInitPreparedXidStore();
+ chkTplStoreInit(); // Late initialize (if needed)
+
ctxt->incrDtokRef();
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
- ctxt->prepare(preparedXidStorePtr.get());
+ tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
+ if (mgmtObject != 0) {
+ mgmtObject->inc_tplTransactionDepth();
+ mgmtObject->inc_tplTxnPrepares();
+ }
} catch (const std::exception& e) {
QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
throw;
@@ -1450,10 +1471,10 @@
return dir.str();
}
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getTplBaseDir()
{
std::stringstream dir;
- dir << storeDir << "/rhm/pxid/" ;
+ dir << storeDir << "/rhm/tpl/" ;
return dir.str();
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-18 13:30:50 UTC (rev 2207)
@@ -61,17 +61,17 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for preparedXidStore recover state
- struct PreparedRecoverStruct {
+ // Structs for Transaction Recover List (TPL) recover state
+ struct TplRecoverStruct {
u_int64_t rid;
bool deq_flag;
bool commit_flag;
- PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
};
- typedef PreparedRecoverStruct PreparedRecover;
- typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
- typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
- typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+ typedef TplRecoverStruct TplRecover;
+ typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
+ typedef std::map<std::string, TplRecover> TplRecoverMap;
+ typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -91,9 +91,9 @@
Db bindingDb;
Db generalDb;
- // Pointer to prepared XID journal instance
- boost::shared_ptr<JournalImpl> preparedXidStorePtr;
- PreparedRecoverMap preparedXidStoreRecoverMap;
+ // Pointer to Transaction Prepared List (TPL) journal instance
+ boost::shared_ptr<JournalImpl> tplStorePtr;
+ TplRecoverMap tplRecoverMap;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
@@ -157,11 +157,9 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverPreparedXidJournal();
- void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
- void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db,
- txn_lock_map& mappings);
+ void recoverTplStore();
+ void getTplRecoverMap(TplRecoverMap& tplMap);
+ void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
@@ -218,12 +216,12 @@
std::string getJrnlDir(const char* queueName);
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
- std::string getPxidBaseDir();
+ std::string getTplBaseDir();
inline void checkInit() {
// TODO: change the default dir to ~/.qpidd
if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
- void chkInitPreparedXidStore();
+ void chkTplStoreInit();
// debug aid for printing XIDs that may contain non-printable chars
static std::string xid2str(const std::string xid) {
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -74,7 +74,6 @@
journalTimerPtr->add(inactivityFireEventPtr);
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-
if (agent != 0)
{
_mgmtObject = new qpid::management::Journal(agent, (qpid::management::Manageable*) this);
@@ -84,6 +83,13 @@
_mgmtObject->set_baseFileName(journalBaseFilename);
_mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_readPages(JRNL_RMGR_PAGES);
+
+ // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
+ _mgmtObject->set_initialFileCount(0);
+ _mgmtObject->set_dataFileSize(0);
+ _mgmtObject->set_currentFileCount(0);
+ _mgmtObject->set_writePageSize(0);
+ _mgmtObject->set_writePages(0);
agent->addObject(_mgmtObject);
}
@@ -163,6 +169,15 @@
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
+ if (_mgmtObject != 0)
+ {
+ _mgmtObject->set_initialFileCount(_num_jfiles);
+ _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_currentFileCount(_num_jfiles);
+ _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_writePages(wcache_num_pages);
+ }
+
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
@@ -206,12 +221,10 @@
if (_mgmtObject != 0)
{
- _mgmtObject->set_initialFileCount(_num_jfiles);
- _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_currentFileCount(_num_jfiles);
_mgmtObject->inc_recordDepth(_emap.size());
- _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_writePages(wcache_num_pages);
+ _mgmtObject->inc_enqueues(_emap.size());
+ _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt());
+ _mgmtObject->inc_txnDequeues(_tmap.deq_cnt());
}
}
@@ -301,7 +314,7 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -314,7 +327,7 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -327,7 +340,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_txnEnqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -340,7 +354,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_txnEnqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -352,7 +367,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordDequeues();
+ _mgmtObject->inc_dequeues();
+ _mgmtObject->inc_txnDequeues();
_mgmtObject->dec_recordDepth();
}
}
@@ -364,7 +380,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordDequeues();
+ _mgmtObject->inc_dequeues();
+ _mgmtObject->inc_txnDequeues();
_mgmtObject->dec_recordDepth();
}
}
@@ -373,12 +390,18 @@
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_abort(dtokp, xid));
+
+ if (_mgmtObject != 0)
+ _mgmtObject->inc_txnAborts();
}
void
JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_commit(dtokp, xid));
+
+ if (_mgmtObject != 0)
+ _mgmtObject->inc_txnCommits();
}
void
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -26,7 +26,7 @@
#include "qpid/management/Manageable.h"
#include "qpid/agent/ManagementAgent.h"
#include "Journal.h"
-#include "qpid/management/ArgsJournalExpand.h"
+#include "ArgsJournalExpand.h"
using namespace qpid::management;
@@ -37,15 +37,12 @@
string Journal::packageName = string ("mrgstore");
string Journal::className = string ("journal");
uint8_t Journal::md5Sum[16] =
- {0x0,0xdf,0x9a,0xf7,0x4,0x98,0x29,0x54,0xde,0x42,0xc5,0xf5,0xf5,0x13,0xab,0xa5};
+ {0x1e,0x63,0xa4,0x3d,0xa3,0x1b,0xc0,0x1,0x1,0x70,0x5a,0x2a,0xb4,0xa,0x1b,0x4e};
Journal::Journal (ManagementAgent* _agent, Manageable* _core) :
ManagementObject(_agent, _core)
{
- initialFileCount = 0;
- dataFileSize = 0;
- currentFileCount = 0;
recordDepth = 0;
recordDepthHigh = 0;
recordDepthLow = 0;
@@ -107,24 +104,24 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (8); // Config Element Count
- buf.putShort (27); // Inst Element Count
+ buf.putShort (11); // Config Element Count
+ buf.putShort (28); // Inst Element Count
buf.putShort (1); // Method Count
buf.putShort (0); // Event Count
// Properties
ft = FieldTable ();
- ft.setString (NAME, "name");
- ft.setInt (TYPE, TYPE_SSTR);
+ ft.setString (NAME, "queueRef");
+ ft.setInt (TYPE, TYPE_REF);
ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 1);
+ ft.setInt (INDEX, 0);
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "queueRef");
- ft.setInt (TYPE, TYPE_REF);
+ ft.setString (NAME, "name");
+ ft.setInt (TYPE, TYPE_SSTR);
ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 0);
+ ft.setInt (INDEX, 1);
buf.put (ft);
ft = FieldTable ();
@@ -179,11 +176,11 @@
ft.setString (DESC, "Number of pages in read-page-cache");
buf.put (ft);
-
- // Statistics
ft = FieldTable ();
ft.setString (NAME, "initialFileCount");
ft.setInt (TYPE, TYPE_U16);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
ft.setString (UNIT, "file");
ft.setString (DESC, "Number of files initially allocated to this journal");
buf.put (ft);
@@ -191,6 +188,8 @@
ft = FieldTable ();
ft.setString (NAME, "dataFileSize");
ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
ft.setString (UNIT, "byte");
ft.setString (DESC, "Size of each journal data file");
buf.put (ft);
@@ -198,64 +197,96 @@
ft = FieldTable ();
ft.setString (NAME, "currentFileCount");
ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
ft.setString (UNIT, "file");
ft.setString (DESC, "Number of files currently allocated to this journal");
buf.put (ft);
+
+ // Statistics
ft = FieldTable ();
ft.setString (NAME, "recordDepth");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "record");
- ft.setString (DESC, "Number of enqueued records (durable messages)");
+ ft.setString (DESC, "Number of currently enqueued records (durable messages)");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "recordDepthHigh");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "record");
- ft.setString (DESC, "Number of enqueued records (durable messages) (High)");
+ ft.setString (DESC, "Number of currently enqueued records (durable messages) (High)");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "recordDepthLow");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "record");
- ft.setString (DESC, "Number of enqueued records (durable messages) (Low)");
+ ft.setString (DESC, "Number of currently enqueued records (durable messages) (Low)");
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "recordEnqueues");
+ ft.setString (NAME, "enqueues");
ft.setInt (TYPE, TYPE_U64);
ft.setString (UNIT, "record");
ft.setString (DESC, "Total enqueued records on journal");
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "recordDequeues");
+ ft.setString (NAME, "dequeues");
ft.setInt (TYPE, TYPE_U64);
ft.setString (UNIT, "record");
ft.setString (DESC, "Total dequeued records on journal");
buf.put (ft);
ft = FieldTable ();
+ ft.setString (NAME, "txnEnqueues");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional enqueued records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "txnDequeues");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional dequeued records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "txnCommits");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional commit records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "txnAborts");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional abort records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
ft.setString (NAME, "outstandingAIOs");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "aio_op");
- ft.setString (DESC, "Number of outstanding AIO requests in Async IO system");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "outstandingAIOsHigh");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "aio_op");
- ft.setString (DESC, "Number of outstanding AIO requests in Async IO system (High)");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (High)");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "outstandingAIOsLow");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "aio_op");
- ft.setString (DESC, "Number of outstanding AIO requests in Async IO system (Low)");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (Low)");
buf.put (ft);
ft = FieldTable ();
@@ -393,8 +424,12 @@
void Journal::aggregatePerThreadStats(struct PerThreadStats* totals)
{
- totals->recordEnqueues = 0;
- totals->recordDequeues = 0;
+ totals->enqueues = 0;
+ totals->dequeues = 0;
+ totals->txnEnqueues = 0;
+ totals->txnDequeues = 0;
+ totals->txnCommits = 0;
+ totals->txnAborts = 0;
totals->writeWaitFailures = 0;
totals->writeBusyFailures = 0;
totals->readRecordCount = 0;
@@ -403,8 +438,12 @@
for (int idx = 0; idx < maxThreads; idx++) {
struct PerThreadStats* threadStats = perThreadStatsArray[idx];
if (threadStats != 0) {
- totals->recordEnqueues += threadStats->recordEnqueues;
- totals->recordDequeues += threadStats->recordDequeues;
+ totals->enqueues += threadStats->enqueues;
+ totals->dequeues += threadStats->dequeues;
+ totals->txnEnqueues += threadStats->txnEnqueues;
+ totals->txnDequeues += threadStats->txnDequeues;
+ totals->txnCommits += threadStats->txnCommits;
+ totals->txnAborts += threadStats->txnAborts;
totals->writeWaitFailures += threadStats->writeWaitFailures;
totals->writeBusyFailures += threadStats->writeBusyFailures;
totals->readRecordCount += threadStats->readRecordCount;
@@ -421,14 +460,17 @@
configChanged = false;
writeTimestamps (buf);
+ buf.putLongLong (queueRef);
buf.putShortString (name);
- buf.putLongLong (queueRef);
buf.putShortString (directory);
buf.putShortString (baseFileName);
buf.putLong (writePageSize);
buf.putLong (writePages);
buf.putLong (readPageSize);
buf.putLong (readPages);
+ buf.putShort (initialFileCount);
+ buf.putLong (dataFileSize);
+ buf.putLong (currentFileCount);
}
@@ -444,14 +486,15 @@
if (!skipHeaders)
writeTimestamps (buf);
- buf.putShort (initialFileCount);
- buf.putLong (dataFileSize);
- buf.putLong (currentFileCount);
buf.putLong (recordDepth);
buf.putLong (recordDepthHigh);
buf.putLong (recordDepthLow);
- buf.putLongLong (totals.recordEnqueues);
- buf.putLongLong (totals.recordDequeues);
+ buf.putLongLong (totals.enqueues);
+ buf.putLongLong (totals.dequeues);
+ buf.putLongLong (totals.txnEnqueues);
+ buf.putLongLong (totals.txnDequeues);
+ buf.putLongLong (totals.txnCommits);
+ buf.putLongLong (totals.txnAborts);
buf.putLong (outstandingAIOs);
buf.putLong (outstandingAIOsHigh);
buf.putLong (outstandingAIOsLow);
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-07-18 13:30:50 UTC (rev 2207)
@@ -40,19 +40,19 @@
static uint8_t md5Sum[16];
// Properties
+ uint64_t queueRef;
std::string name;
- uint64_t queueRef;
std::string directory;
std::string baseFileName;
uint32_t writePageSize;
uint32_t writePages;
uint32_t readPageSize;
uint32_t readPages;
+ uint16_t initialFileCount;
+ uint32_t dataFileSize;
+ uint32_t currentFileCount;
// Statistics
- uint16_t initialFileCount;
- uint32_t dataFileSize;
- uint32_t currentFileCount;
uint32_t recordDepth;
uint32_t recordDepthHigh;
uint32_t recordDepthLow;
@@ -75,8 +75,12 @@
// Per-Thread Statistics
struct PerThreadStats {
- uint64_t recordEnqueues;
- uint64_t recordDequeues;
+ uint64_t enqueues;
+ uint64_t dequeues;
+ uint64_t txnEnqueues;
+ uint64_t txnDequeues;
+ uint64_t txnCommits;
+ uint64_t txnAborts;
uint64_t writeWaitFailures;
uint64_t writeBusyFailures;
uint64_t readRecordCount;
@@ -92,8 +96,12 @@
if (threadStats == 0) {
threadStats = new(PerThreadStats);
perThreadStatsArray[index] = threadStats;
- threadStats->recordEnqueues = 0;
- threadStats->recordDequeues = 0;
+ threadStats->enqueues = 0;
+ threadStats->dequeues = 0;
+ threadStats->txnEnqueues = 0;
+ threadStats->txnDequeues = 0;
+ threadStats->txnCommits = 0;
+ threadStats->txnAborts = 0;
threadStats->writeWaitFailures = 0;
threadStats->writeBusyFailures = 0;
threadStats->readRecordCount = 0;
@@ -134,14 +142,14 @@
static const uint32_t METHOD_EXPAND = 1;
// Accessor Methods
- inline void set_name (std::string val){
+ inline void set_queueRef (uint64_t val){
sys::Mutex::ScopedLock mutex(accessLock);
- name = val;
+ queueRef = val;
configChanged = true;
}
- inline void set_queueRef (uint64_t val){
+ inline void set_name (std::string val){
sys::Mutex::ScopedLock mutex(accessLock);
- queueRef = val;
+ name = val;
configChanged = true;
}
inline void set_directory (std::string val){
@@ -177,17 +185,17 @@
inline void set_initialFileCount (uint16_t val){
sys::Mutex::ScopedLock mutex(accessLock);
initialFileCount = val;
- instChanged = true;
+ configChanged = true;
}
inline void set_dataFileSize (uint32_t val){
sys::Mutex::ScopedLock mutex(accessLock);
dataFileSize = val;
- instChanged = true;
+ configChanged = true;
}
inline void set_currentFileCount (uint32_t val){
sys::Mutex::ScopedLock mutex(accessLock);
currentFileCount = val;
- instChanged = true;
+ configChanged = true;
}
inline void inc_recordDepth (uint32_t by = 1){
sys::Mutex::ScopedLock mutex(accessLock);
@@ -203,22 +211,54 @@
recordDepthLow = recordDepth;
instChanged = true;
}
- inline void inc_recordEnqueues (uint64_t by = 1){
- getThreadStats()->recordEnqueues += by;
+ inline void inc_enqueues (uint64_t by = 1){
+ getThreadStats()->enqueues += by;
instChanged = true;
}
- inline void dec_recordEnqueues (uint64_t by = 1){
- getThreadStats()->recordEnqueues -= by;
+ inline void dec_enqueues (uint64_t by = 1){
+ getThreadStats()->enqueues -= by;
instChanged = true;
}
- inline void inc_recordDequeues (uint64_t by = 1){
- getThreadStats()->recordDequeues += by;
+ inline void inc_dequeues (uint64_t by = 1){
+ getThreadStats()->dequeues += by;
instChanged = true;
}
- inline void dec_recordDequeues (uint64_t by = 1){
- getThreadStats()->recordDequeues -= by;
+ inline void dec_dequeues (uint64_t by = 1){
+ getThreadStats()->dequeues -= by;
instChanged = true;
}
+ inline void inc_txnEnqueues (uint64_t by = 1){
+ getThreadStats()->txnEnqueues += by;
+ instChanged = true;
+ }
+ inline void dec_txnEnqueues (uint64_t by = 1){
+ getThreadStats()->txnEnqueues -= by;
+ instChanged = true;
+ }
+ inline void inc_txnDequeues (uint64_t by = 1){
+ getThreadStats()->txnDequeues += by;
+ instChanged = true;
+ }
+ inline void dec_txnDequeues (uint64_t by = 1){
+ getThreadStats()->txnDequeues -= by;
+ instChanged = true;
+ }
+ inline void inc_txnCommits (uint64_t by = 1){
+ getThreadStats()->txnCommits += by;
+ instChanged = true;
+ }
+ inline void dec_txnCommits (uint64_t by = 1){
+ getThreadStats()->txnCommits -= by;
+ instChanged = true;
+ }
+ inline void inc_txnAborts (uint64_t by = 1){
+ getThreadStats()->txnAborts += by;
+ instChanged = true;
+ }
+ inline void dec_txnAborts (uint64_t by = 1){
+ getThreadStats()->txnAborts -= by;
+ instChanged = true;
+ }
inline void inc_outstandingAIOs (uint32_t by = 1){
sys::Mutex::ScopedLock mutex(accessLock);
outstandingAIOs += by;
Modified: store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -21,9 +21,9 @@
// This source file was created by a code generator.
// Please do not edit.
-#include "qpid/management/PackageMrgstore.h"
-#include "qpid/management/Store.h"
-#include "qpid/management/Journal.h"
+#include "PackageMrgstore.h"
+#include "Store.h"
+#include "Journal.h"
using namespace qpid::management;
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -36,19 +36,35 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
+ {0x18,0xd,0xd4,0x15,0xd3,0x9a,0xf,0xbe,0x3a,0x40,0xe1,0x1b,0x9e,0x5b,0x7e,0x86};
Store::Store (ManagementAgent* _agent, Manageable* _core, Manageable* _parent) :
ManagementObject(_agent, _core)
{
brokerRef = _parent->GetManagementObject ()->getObjectId ();
+ tplTransactionDepth = 0;
+ tplTransactionDepthHigh = 0;
+ tplTransactionDepthLow = 0;
+ tplOutstandingAIOs = 0;
+ tplOutstandingAIOsHigh = 0;
+ tplOutstandingAIOsLow = 0;
+ maxThreads = agent->getMaxThreads();
+ perThreadStatsArray = new struct PerThreadStats*[maxThreads];
+ for (int idx = 0; idx < maxThreads; idx++)
+ perThreadStatsArray[idx] = 0;
+
}
Store::~Store ()
{
+ for (int idx = 0; idx < maxThreads; idx++)
+ if (perThreadStatsArray[idx] != 0)
+ delete perThreadStatsArray[idx];
+ delete[] perThreadStatsArray;
+
}
namespace {
@@ -75,8 +91,8 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (4); // Config Element Count
- buf.putShort (0); // Inst Element Count
+ buf.putShort (11); // Config Element Count
+ buf.putShort (9); // Inst Element Count
buf.putShort (0); // Method Count
buf.putShort (0); // Event Count
@@ -114,9 +130,133 @@
ft.setString (DESC, "Default size of each journal data file");
buf.put (ft);
+ ft = FieldTable ();
+ ft.setString (NAME, "tplIsInitialized");
+ ft.setInt (TYPE, TYPE_BOOL);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (DESC, "Transaction prepared list has been initialized by a transactional prepare");
+ buf.put (ft);
+ ft = FieldTable ();
+ ft.setString (NAME, "tplDirectory");
+ ft.setInt (TYPE, TYPE_SSTR);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (DESC, "Transaction prepared list directory");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplWritePageSize");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "byte");
+ ft.setString (DESC, "Page size in transaction prepared list write-page-cache");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplWritePages");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "wpage");
+ ft.setString (DESC, "Number of pages in transaction prepared list write-page-cache");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplInitialFileCount");
+ ft.setInt (TYPE, TYPE_U16);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "file");
+ ft.setString (DESC, "Number of files initially allocated to transaction prepared list journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplDataFileSize");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "byte");
+ ft.setString (DESC, "Size of each journal data file in transaction prepared list journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplCurrentFileCount");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "file");
+ ft.setString (DESC, "Number of files currently allocated to transaction prepared list journal");
+ buf.put (ft);
+
+
// Statistics
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTransactionDepth");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "txn");
+ ft.setString (DESC, "Number of currently enqueued prepared transactions");
+ buf.put (ft);
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTransactionDepthHigh");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "txn");
+ ft.setString (DESC, "Number of currently enqueued prepared transactions (High)");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTransactionDepthLow");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "txn");
+ ft.setString (DESC, "Number of currently enqueued prepared transactions (Low)");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTxnPrepares");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transaction prepares on transaction prepared list");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTxnCommits");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transaction commits on transaction prepared list");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTxnAborts");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transaction aborts on transaction prepared list");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplOutstandingAIOs");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "aio_op");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplOutstandingAIOsHigh");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "aio_op");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (High)");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplOutstandingAIOsLow");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "aio_op");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (Low)");
+ buf.put (ft);
+
+
// Methods
// Events
@@ -124,7 +264,24 @@
}
+void Store::aggregatePerThreadStats(struct PerThreadStats* totals)
+{
+ totals->tplTxnPrepares = 0;
+ totals->tplTxnCommits = 0;
+ totals->tplTxnAborts = 0;
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+ totals->tplTxnPrepares += threadStats->tplTxnPrepares;
+ totals->tplTxnCommits += threadStats->tplTxnCommits;
+ totals->tplTxnAborts += threadStats->tplTxnAborts;
+
+ }
+ }
+}
+
+
void Store::writeProperties (Buffer& buf)
{
sys::Mutex::ScopedLock mutex(accessLock);
@@ -135,6 +292,13 @@
buf.putShortString (location);
buf.putShort (defaultInitialFileCount);
buf.putLong (defaultDataFileSize);
+ buf.putOctet (tplIsInitialized?1:0);
+ buf.putShortString (tplDirectory);
+ buf.putLong (tplWritePageSize);
+ buf.putLong (tplWritePages);
+ buf.putShort (tplInitialFileCount);
+ buf.putLong (tplDataFileSize);
+ buf.putLong (tplCurrentFileCount);
}
@@ -144,12 +308,28 @@
instChanged = false;
+ struct PerThreadStats totals;
+ aggregatePerThreadStats(&totals);
+
if (!skipHeaders)
writeTimestamps (buf);
+ buf.putLong (tplTransactionDepth);
+ buf.putLong (tplTransactionDepthHigh);
+ buf.putLong (tplTransactionDepthLow);
+ buf.putLongLong (totals.tplTxnPrepares);
+ buf.putLongLong (totals.tplTxnCommits);
+ buf.putLongLong (totals.tplTxnAborts);
+ buf.putLong (tplOutstandingAIOs);
+ buf.putLong (tplOutstandingAIOsHigh);
+ buf.putLong (tplOutstandingAIOsLow);
// Maintenance of hi-lo statistics
+ tplTransactionDepthHigh = tplTransactionDepth;
+ tplTransactionDepthLow = tplTransactionDepth;
+ tplOutstandingAIOsHigh = tplOutstandingAIOs;
+ tplOutstandingAIOsLow = tplOutstandingAIOs;
}
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-18 13:30:50 UTC (rev 2207)
@@ -44,10 +44,49 @@
std::string location;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
+ uint8_t tplIsInitialized;
+ std::string tplDirectory;
+ uint32_t tplWritePageSize;
+ uint32_t tplWritePages;
+ uint16_t tplInitialFileCount;
+ uint32_t tplDataFileSize;
+ uint32_t tplCurrentFileCount;
// Statistics
+ uint32_t tplTransactionDepth;
+ uint32_t tplTransactionDepthHigh;
+ uint32_t tplTransactionDepthLow;
+ uint32_t tplOutstandingAIOs;
+ uint32_t tplOutstandingAIOsHigh;
+ uint32_t tplOutstandingAIOsLow;
+ // Per-Thread Statistics
+ struct PerThreadStats {
+ uint64_t tplTxnPrepares;
+ uint64_t tplTxnCommits;
+ uint64_t tplTxnAborts;
+
+ };
+
+ struct PerThreadStats** perThreadStatsArray;
+
+ inline struct PerThreadStats* getThreadStats() {
+ int index = getThreadIndex();
+ struct PerThreadStats* threadStats = perThreadStatsArray[index];
+ if (threadStats == 0) {
+ threadStats = new(PerThreadStats);
+ perThreadStatsArray[index] = threadStats;
+ threadStats->tplTxnPrepares = 0;
+ threadStats->tplTxnCommits = 0;
+ threadStats->tplTxnAborts = 0;
+
+ }
+ return threadStats;
+ }
+
+ void aggregatePerThreadStats(struct PerThreadStats*);
+
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
void writeProperties (qpid::framing::Buffer& buf);
@@ -58,9 +97,6 @@
qpid::framing::Buffer& outBuf);
writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
- // Stub for getInstChanged. There are no statistics in this class.
- bool getInstChanged (void) { return false; }
-
public:
friend class PackageMrgstore;
@@ -98,6 +134,93 @@
defaultDataFileSize = val;
configChanged = true;
}
+ inline void set_tplIsInitialized (uint8_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplIsInitialized = val;
+ configChanged = true;
+ }
+ inline void set_tplDirectory (std::string val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplDirectory = val;
+ configChanged = true;
+ }
+ inline void set_tplWritePageSize (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplWritePageSize = val;
+ configChanged = true;
+ }
+ inline void set_tplWritePages (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplWritePages = val;
+ configChanged = true;
+ }
+ inline void set_tplInitialFileCount (uint16_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplInitialFileCount = val;
+ configChanged = true;
+ }
+ inline void set_tplDataFileSize (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplDataFileSize = val;
+ configChanged = true;
+ }
+ inline void set_tplCurrentFileCount (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplCurrentFileCount = val;
+ configChanged = true;
+ }
+ inline void inc_tplTransactionDepth (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplTransactionDepth += by;
+ if (tplTransactionDepthHigh < tplTransactionDepth)
+ tplTransactionDepthHigh = tplTransactionDepth;
+ instChanged = true;
+ }
+ inline void dec_tplTransactionDepth (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplTransactionDepth -= by;
+ if (tplTransactionDepthLow > tplTransactionDepth)
+ tplTransactionDepthLow = tplTransactionDepth;
+ instChanged = true;
+ }
+ inline void inc_tplTxnPrepares (uint64_t by = 1){
+ getThreadStats()->tplTxnPrepares += by;
+ instChanged = true;
+ }
+ inline void dec_tplTxnPrepares (uint64_t by = 1){
+ getThreadStats()->tplTxnPrepares -= by;
+ instChanged = true;
+ }
+ inline void inc_tplTxnCommits (uint64_t by = 1){
+ getThreadStats()->tplTxnCommits += by;
+ instChanged = true;
+ }
+ inline void dec_tplTxnCommits (uint64_t by = 1){
+ getThreadStats()->tplTxnCommits -= by;
+ instChanged = true;
+ }
+ inline void inc_tplTxnAborts (uint64_t by = 1){
+ getThreadStats()->tplTxnAborts += by;
+ instChanged = true;
+ }
+ inline void dec_tplTxnAborts (uint64_t by = 1){
+ getThreadStats()->tplTxnAborts -= by;
+ instChanged = true;
+ }
+ inline void inc_tplOutstandingAIOs (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplOutstandingAIOs += by;
+ if (tplOutstandingAIOsHigh < tplOutstandingAIOs)
+ tplOutstandingAIOsHigh = tplOutstandingAIOs;
+ instChanged = true;
+ }
+ inline void dec_tplOutstandingAIOs (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplOutstandingAIOs -= by;
+ if (tplOutstandingAIOsLow > tplOutstandingAIOs)
+ tplOutstandingAIOsLow = tplOutstandingAIOs;
+ instChanged = true;
+ }
};
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -146,6 +146,38 @@
return itr->second.size();
}
+u_int32_t
+txn_map::cnt(const bool enq_flag)
+{
+ slock s(&_mutex);
+ u_int32_t c = 0;
+ for (xmap_itr i = _map.begin(); i != _map.end(); i++)
+ {
+ for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+ {
+ if (j->_enq_flag == enq_flag)
+ c++;
+ }
+ }
+ return c;
+}
+
+u_int32_t
+txn_map::cnt(const std::string& xid, const bool enq_flag)
+{
+ slock s(&_mutex);
+ u_int32_t c = 0;
+ xmap_itr i = _map.find(xid);
+ if (i == _map.end()) // not found in map
+ return 0;
+ for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+ {
+ if (j->_enq_flag == enq_flag)
+ c++;
+ }
+ return c;
+}
+
bool
txn_map::is_txn_synced(const std::string& xid)
{
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -132,6 +132,10 @@
const txn_data_list get_remove_tdata_list(const std::string& xid);
bool in_map(const std::string& xid);
u_int32_t get_rid_count(const std::string& xid);
+ inline u_int32_t enq_cnt() { return cnt(true); }
+ inline u_int32_t enq_cnt(const std::string& xid) { return cnt(xid, true); }
+ inline u_int32_t deq_cnt() { return cnt(true); }
+ inline u_int32_t deq_cnt(const std::string& xid) { return cnt(xid, false); }
bool is_txn_synced(const std::string& xid);
bool set_aio_compl(const std::string& xid, const u_int64_t rid);
const txn_data& get_data(const std::string& xid, const u_int64_t rid);
@@ -140,6 +144,8 @@
inline u_int32_t size() const { return u_int32_t(_map.size()); }
void xid_list(std::vector<std::string>& xv);
private:
+ u_int32_t cnt(const bool enq_flag);
+ u_int32_t cnt(const std::string& xid, const bool enq_flag);
static std::string xid_format(const std::string& xid);
};
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -189,7 +189,7 @@
return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
}
u_int32_t getRemainingPreparedListTxns() {
- return preparedXidStorePtr->get_open_txn_cnt();
+ return tplStorePtr->get_open_txn_cnt();
}
};
Modified: store/trunk/specs/management-schema.xml
===================================================================
--- store/trunk/specs/management-schema.xml 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/specs/management-schema.xml 2008-07-18 13:30:50 UTC (rev 2207)
@@ -4,30 +4,47 @@
License Text
-->
<class name="Store">
- <property name="brokerRef" type="objId" references="qpid.Broker" access="RO" index="y" parentRef="y"/>
+ <property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/>
<property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
<property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/>
<property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/>
+ <property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/>
+ <property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/>
+ <property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/>
+ <property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/>
+ <property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/>
+ <property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/>
+ <property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/>
+
+ <statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/>
+ <statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/>
+ <statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/>
+ <statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/>
+ <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
</class>
<class name="Journal">
- <property name="name" type="sstr" access="RO" index="y"/>
- <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/>
- <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/>
- <property name="baseFileName" type="sstr" access="RO" desc="Base filename prefix for journal"/>
- <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Page size in write-page-cache"/>
- <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
- <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Page size in read-page-cache"/>
- <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
+ <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/>
+ <property name="name" type="sstr" access="RO" index="y"/>
+ <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/>
+ <property name="baseFileName" type="sstr" access="RO" desc="Base filename prefix for journal"/>
+ <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Page size in write-page-cache"/>
+ <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
+ <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Page size in read-page-cache"/>
+ <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
+ <property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/>
+ <property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/>
+ <property name="currentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to this journal"/>
+
+ <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/>
+ <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
+ <statistic name="dequeues" type="count64" unit="record" desc="Total dequeued records on journal"/>
+ <statistic name="txnEnqueues" type="count64" unit="record" desc="Total transactional enqueued records on journal"/>
+ <statistic name="txnDequeues" type="count64" unit="record" desc="Total transactional dequeued records on journal"/>
+ <statistic name="txnCommits" type="count64" unit="record" desc="Total transactional commit records on journal"/>
+ <statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/>
+ <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
- <statistic name="initialFileCount" type="uint16" unit="file" desc="Number of files initially allocated to this journal"/>
- <statistic name="dataFileSize" type="uint32" unit="byte" desc="Size of each journal data file"/>
- <statistic name="currentFileCount" type="uint32" unit="file" desc="Number of files currently allocated to this journal"/>
- <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of enqueued records (durable messages)"/>
- <statistic name="recordEnqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
- <statistic name="recordDequeues" type="count64" unit="record" desc="Total dequeued records on journal"/>
- <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of outstanding AIO requests in Async IO system"/>
-
<!--
The following are not yet "wired up" in JournalImpl.cpp
-->
16 years, 5 months
rhmessaging commits: r2206 - in store/branches/mrg-1.0/cpp: tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-18 09:27:59 -0400 (Fri, 18 Jul 2008)
New Revision: 2206
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Additional tidy-up on new code. Renamed some functions and variables to keep naming consistent around the Transaction Prepared List (TPL) name. Deleted an old BDB function that escaped detection from a previous BDB cleanup.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-18 13:27:59 UTC (rev 2206)
@@ -50,7 +50,7 @@
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
const bool _commit_flag) :
rid(_rid),
@@ -241,11 +241,11 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
+ tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
txn.commit();
} catch (const journal::jexception& e) {
txn.abort();
- THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
+ THROW_STORE_EXCEPTION_2("Error opening tplStore instance", e.what());
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -260,12 +260,12 @@
return true;
}
-void BdbMessageStore::chkInitPreparedXidStore()
+void BdbMessageStore::chkTplStoreInit()
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- journal::jdir::create_dir(getPxidBaseDir());
- if (!preparedXidStorePtr->is_ready()) {
- preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ journal::jdir::create_dir(getTplBaseDir());
+ if (!tplStorePtr->is_ready()) {
+ tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
}
}
@@ -285,7 +285,7 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
- if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
+ if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
@@ -313,7 +313,7 @@
txn->commit(0);
try {
journal::jdir::delete_dir(getJrnlBaseDir(),true);
- journal::jdir::delete_dir(getPxidBaseDir(),true);
+ journal::jdir::delete_dir(getTplBaseDir(),true);
}
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -474,7 +474,7 @@
{
checkInit();
txn_list prepared;
- recoverXids(prepared);
+ recoverLockedMappings(prepared);
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
@@ -505,14 +505,14 @@
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
std::auto_ptr<TPCTransactionContext> txn(tpcc);
- tpcc->prepare(preparedXidStorePtr.get());
+ tpcc->prepare(tplStorePtr.get());
// Restore data token state in TxnCtxt
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
- if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
tpcc->recoverDtok(citr->second.rid, i->xid);
- // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+ // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTxnFlag = citr->second.deq_flag;
@@ -581,8 +581,8 @@
maxQueueId = max(key.id, maxQueueId);
}
- // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
- // the messageIdSequence is used for both queue journals and the preparedXid journal.
+ // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
+ // the messageIdSequence is used for both queue journals and the tpl journal.
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
@@ -669,7 +669,6 @@
generalIdSequence.reset(maxGeneralId + 1);
}
-
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
@@ -730,8 +729,8 @@
if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
std::string xid((char*)xidbuff, xidbuffSize);
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
- if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
+ if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
if (citr->second.commit_flag)
queue->recover(msg);
@@ -793,8 +792,6 @@
return recovery.recoverMessage(header.buffer);
}
-
-
int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
IdDbt& msgId,
RecoverableMessage::shared_ptr& msg,
@@ -826,26 +823,26 @@
return count;
}
-void BdbMessageStore::recoverPreparedXidJournal()
+void BdbMessageStore::recoverTplStore()
{
- if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+ if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- preparedXidStorePtr->recover_complete(); // start journal.
+ tplStorePtr->recover_complete(); // start journal.
}
}
-void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
{
- if (preparedXidStorePtr.get()) {
- if (!preparedXidStorePtr->is_ready())
- recoverPreparedXidJournal();
+ if (tplStorePtr.get()) {
+ if (!tplStorePtr->is_ready())
+ recoverTplStore();
// TODO: The journal will return a const txn_map and the txn_map will support
// const operations at some point. Using non-const txn_map this way is ugly...
- journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
std::vector<std::string> xidList;
tmap.xid_list(xidList);
for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
@@ -865,18 +862,18 @@
}
assert(enqCnt == 1);
assert(deqCnt <= 1);
- prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
}
}
}
-void BdbMessageStore::recoverXids(txn_list& txns)
+void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
// Abort unprepaired xids and populate the locked maps
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -885,25 +882,11 @@
}
}
-void BdbMessageStore::readLockedMappings(Db& db,
- txn_lock_map& mappings)
-{
- Cursor c;
- c.open(db, 0);
-
- Dbt key;
- IdPairDbt value;
- while (c.next(key, value)) {
- std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
- LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
- }
-}
-
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
xids.insert(i->first);
}
}
@@ -1250,14 +1233,15 @@
bool commit)
{
try {
+ chkTplStoreInit(); // Late initialize (if needed)
+
// Nothing to do if not prepared
- chkInitPreparedXidStore();
if (txn.getDtok()->is_enqueued()) {
txn.incrDtokRef();
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
+ tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
} catch (const std::exception& e) {
@@ -1292,13 +1276,14 @@
void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
{
try {
- chkInitPreparedXidStore();
+ chkTplStoreInit(); // Late initialize (if needed)
+
ctxt->incrDtokRef();
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
- ctxt->prepare(preparedXidStorePtr.get());
+ tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
} catch (const std::exception& e) {
@@ -1449,10 +1434,10 @@
return dir.str();
}
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getTplBaseDir()
{
std::stringstream dir;
- dir << storeDir << "/rhm/pxid/" ;
+ dir << storeDir << "/rhm/tpl/" ;
return dir.str();
}
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-18 13:27:59 UTC (rev 2206)
@@ -61,17 +61,17 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for preparedXidStore recover state
- struct PreparedRecoverStruct {
+ // Structs for Transaction Recover List (TPL) recover state
+ struct TplRecoverStruct {
u_int64_t rid;
bool deq_flag;
bool commit_flag;
- PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
};
- typedef PreparedRecoverStruct PreparedRecover;
- typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
- typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
- typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+ typedef TplRecoverStruct TplRecover;
+ typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
+ typedef std::map<std::string, TplRecover> TplRecoverMap;
+ typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -91,9 +91,9 @@
Db bindingDb;
Db generalDb;
- // Pointer to prepared XID journal instance
- boost::shared_ptr<JournalImpl> preparedXidStorePtr;
- PreparedRecoverMap preparedXidStoreRecoverMap;
+ // Pointer to Transaction Prepared List (TPL) journal instance
+ boost::shared_ptr<JournalImpl> tplStorePtr;
+ TplRecoverMap tplRecoverMap;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
@@ -157,11 +157,9 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverPreparedXidJournal();
- void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
- void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db,
- txn_lock_map& mappings);
+ void recoverTplStore();
+ void getTplRecoverMap(TplRecoverMap& tplMap);
+ void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
@@ -218,12 +216,12 @@
std::string getJrnlDir(const char* queueName);
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
- std::string getPxidBaseDir();
+ std::string getTplBaseDir();
inline void checkInit() {
// TODO: change the default dir to ~/.qpidd
if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
- void chkInitPreparedXidStore();
+ void chkTplStoreInit();
// debug aid for printing XIDs that may contain non-printable chars
static std::string xid2str(const std::string xid) {
Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-18 13:27:59 UTC (rev 2206)
@@ -189,7 +189,7 @@
return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
}
u_int32_t getRemainingPreparedListTxns() {
- return preparedXidStorePtr->get_open_txn_cnt();
+ return tplStorePtr->get_open_txn_cnt();
}
};
16 years, 5 months
rhmessaging commits: r2205 - store/branches/mrg-1.0/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-17 15:49:11 -0400 (Thu, 17 Jul 2008)
New Revision: 2205
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
Log:
Bugfix from r2193: two params reversed on JournalImpl::initialize(). Also moved creation of TPL dir to time of TPL initialize.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-16 20:25:52 UTC (rev 2204)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-17 19:49:11 UTC (rev 2205)
@@ -220,7 +220,6 @@
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
- journal::jdir::create_dir(getPxidBaseDir());
try {
env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
@@ -264,8 +263,9 @@
void BdbMessageStore::chkInitPreparedXidStore()
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
+ journal::jdir::create_dir(getPxidBaseDir());
if (!preparedXidStorePtr->is_ready()) {
- preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages);
+ preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
}
}
16 years, 5 months
rhmessaging commits: r2204 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2008-07-16 16:25:52 -0400 (Wed, 16 Jul 2008)
New Revision: 2204
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
Log:
Updated with change to the Qpid management-agent factory interface
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-16 19:58:43 UTC (rev 2203)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-16 20:25:52 UTC (rev 2204)
@@ -166,7 +166,7 @@
void BdbMessageStore::initManagement (Broker* broker)
{
if (broker != 0) {
- ManagementAgent* agent = ManagementAgent::getAgent ();
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0)
{
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-07-16 19:58:43 UTC (rev 2203)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-07-16 20:25:52 UTC (rev 2204)
@@ -73,7 +73,7 @@
journalTimerPtr->start();
journalTimerPtr->add(inactivityFireEventPtr);
- ManagementAgent* agent = ManagementAgent::getAgent ();
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0)
{
16 years, 5 months
rhmessaging commits: r2203 - store/branches/mrg-1.0/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-16 15:58:43 -0400 (Wed, 16 Jul 2008)
New Revision: 2203
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
Log:
Backport of trunk r.2202: Added command-line params for prepared list journal geometry to store module. Tidied up handling of command-line params.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-16 19:20:44 UTC (rev 2202)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-16 19:58:43 UTC (rev 2203)
@@ -67,15 +67,101 @@
mappingDb(&env, 0),
bindingDb(&env, 0),
generalDb(&env, 0),
- numJrnlFiles(defNumJrnlFiles),
- jrnlFsizePgs(defJrnlFileSizePgs),
- wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
- wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ numJrnlFiles(0),
+ jrnlFsizeSblks(0),
+ wCachePgSizeSblks(0),
+ wCacheNumPages(0),
+ tplNumJrnlFiles(0),
+ tplJrnlFsizeSblks(0),
+ tplWCachePgSizeSblks(0),
+ tplWCacheNumPages(0),
highestRid(0),
isInit(false),
envPath(envpath)
{}
+u_int16_t BdbMessageStore::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName)
+{
+ u_int16_t p = param;
+ if (p < JRNL_MIN_NUM_FILES) {
+ p = JRNL_MIN_NUM_FILES;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter to minimum value.");
+ } else if (p > JRNL_MAX_NUM_FILES) {
+ p = JRNL_MAX_NUM_FILES;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
+ }
+ return p;
+}
+
+u_int32_t BdbMessageStore::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName)
+{
+ u_int32_t p = param;
+ u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+ u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+ if (p < min) {
+ p = min;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << min << "); changing this parameter to minimum value.");
+ } else if (p > max) {
+ p = max;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value.");
+ }
+ return p;
+}
+
+u_int32_t BdbMessageStore::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName)
+{
+ u_int32_t p = param;
+ switch (p)
+ {
+ case 1:
+ case 2:
+ case 4:
+ case 8:
+ case 16:
+ case 32:
+ case 64:
+ case 128:
+ break;
+ default:
+ if (p == 0) {
+ // For zero value, use default
+ p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
+ } else {
+ // For any positive value, use closest value
+ if (p < 6) p = 4;
+ else if (p < 12) p = 8;
+ else if (p < 24) p = 16;
+ else if (p < 48) p = 32;
+ else if (p < 96) p = 64;
+ else if (p > 128) p = 128;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
+ }
+ }
+ return p;
+}
+
+u_int16_t BdbMessageStore::getJrnlWrNumPages(const u_int32_t wrPageSizeKib)
+{
+ u_int32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+ switch (wrPageSizeKib)
+ {
+ case 1:
+ case 2:
+ case 4:
+ // 256 KiB total cache
+ return defTotWCacheSize / wrPageSizeSblks / 4;
+ case 8:
+ case 16:
+ // 512 KiB total cache
+ return defTotWCacheSize / wrPageSizeSblks / 2;
+ default: // 32, 64, 128
+ // 1 MiB total cache
+ return defTotWCacheSize / wrPageSizeSblks;
+ }
+}
+
void BdbMessageStore::initManagement (Broker* broker)
{
if (broker != 0) {
@@ -88,44 +174,49 @@
mgmtObject->set_location(storeDir);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
- mgmtObject->set_defaultDataFileSize(jrnlFsizePgs);
+ mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
agent->addObject(mgmtObject, 50, 1);
}
}
}
+bool BdbMessageStore::init(const qpid::Options* options)
+{
+ // Extract and check options
+ const Options* opts = static_cast<const Options*>(options);
+ u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles");
+ u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs");
+ u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
+ u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles");
+ u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs");
+ u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
+
+ // Pass option values to init(...)
+ return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib);
+}
+
+// These params, taken from options, are assumed to be correct and verified
bool BdbMessageStore::init(const std::string& dir,
u_int16_t jfiles,
u_int32_t jfileSizePgs,
- uint32_t wCachePageSize)
+ u_int32_t wCachePageSizeKib,
+ u_int16_t tplJfiles,
+ u_int32_t tplJfileSizePgs,
+ u_int32_t tplWCachePageSizeKib)
{
if (isInit) return true;
+ // Set geometry members (converting to correct units where req'd)
numJrnlFiles = jfiles;
- jrnlFsizePgs = jfileSizePgs;
+ jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+ wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib);
+ tplNumJrnlFiles = tplJfiles;
+ tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+ tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
- // set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
- wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
- u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
- switch (wCachePageSize)
- {
- case 1:
- case 2:
- case 4:
- // 256 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
- break;
- case 8:
- case 16:
- // 512 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
- break;
- default: // 32, 64, 128
- // 1 MiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
- }
-
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
@@ -165,7 +256,8 @@
}
isInit = true;
- QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
+ QPID_LOG(info, "BdbMessageStore module initialized: Journal dir: " << dir << "; Default files per journal: " << jfiles <<
+ "; Default jrournal file size: " << jfileSizePgs << "(wpgs); Default write cache page size: " << wCachePageSizeKib << "(Kib)");
return true;
}
@@ -173,74 +265,10 @@
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
if (!preparedXidStorePtr->is_ready()) {
- u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
- preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
+ preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages);
}
}
-bool BdbMessageStore::init(const qpid::Options* options)
-{
- const Options* opts = static_cast<const Options*>(options);
-
- u_int16_t numJrnlFiles = opts->numJrnlFiles;
- if (numJrnlFiles < JRNL_MIN_NUM_FILES) {
- numJrnlFiles = JRNL_MIN_NUM_FILES;
- QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
- } else if (numJrnlFiles > JRNL_MAX_NUM_FILES) {
- numJrnlFiles = JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
- }
-
- u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
- u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- if (jrnlFsizePgs < jrnlMinFsizePgs) {
- jrnlFsizePgs = jrnlMinFsizePgs;
- QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
- } else if (jrnlFsizePgs > jrnlMaxFsizePgs) {
- jrnlFsizePgs = jrnlMaxFsizePgs;
- QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
- }
-
- u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
- switch (jrnlWrCachePageSize)
- {
- case 1:
- case 2:
- case 4:
- case 8:
- case 16:
- case 32:
- case 64:
- case 128:
- break;
- default:
- u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0) {
- // For zero value, use default
- jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- } else {
- // For any positive value, use closest value
- if (oldJrnlWrCachePageSize < 6)
- jrnlWrCachePageSize = 4;
- else if (oldJrnlWrCachePageSize < 12)
- jrnlWrCachePageSize = 8;
- else if (oldJrnlWrCachePageSize < 24)
- jrnlWrCachePageSize = 16;
- else if (oldJrnlWrCachePageSize < 48)
- jrnlWrCachePageSize = 32;
- else if (oldJrnlWrCachePageSize < 96)
- jrnlWrCachePageSize = 64;
- else if (oldJrnlWrCachePageSize > 128)
- jrnlWrCachePageSize = 128;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
- }
- }
-
- return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
-}
-
void BdbMessageStore::open(Db& db,
DbTxn* txn,
const char* file,
@@ -302,16 +330,16 @@
JournalImpl* jQueue = 0;
FieldTable::ValuePtr value;
- uint16_t localFileCount = numJrnlFiles;
- uint32_t localFileSize = jrnlFsizePgs;
+ u_int16_t localFileCount = numJrnlFiles;
+ u_int32_t localFileSizeSblks = jrnlFsizeSblks;
value = args.get ("qpid.file_count");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileCount = (uint16_t) value->get<int>();
+ localFileCount = (u_int16_t) value->get<int>();
value = args.get ("qpid.file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileSize = (uint32_t) value->get<int>();
+ localFileSizeSblks = (u_int32_t) value->get<int>() * JRNL_RMGR_PAGE_SIZE;
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
@@ -323,10 +351,9 @@
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
- jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
+ jQueue->initialize(localFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": create() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
try {
if (!create(queueDb, queueIdSequence, queue)) {
@@ -540,7 +567,7 @@
try
{
u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
+ jQueue->recover(numJrnlFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages);
@@ -803,9 +830,7 @@
{
if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
- JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
- 0, thisHighestRid, 0);
+ preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
preparedXidStorePtr->recover_complete(); // start journal.
@@ -883,7 +908,7 @@
}
}
-void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
+void BdbMessageStore::stage(intrusive_ptr<PersistableMessage>& msg)
{
checkInit();
TxnCtxt txn;
@@ -1453,21 +1478,32 @@
BdbMessageStore::Options::Options(const std::string& name) :
qpid::Options(name),
- numJrnlFiles(8),
- jrnlFsizePgs(24),
- wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
+ numJrnlFiles(defNumJrnlFiles),
+ jrnlFsizePgs(defJrnlFileSizePgs),
+ wCachePageSizeKib(defWCachePageSize),
+ tplNumJrnlFiles(defTplNumJrnlFiles),
+ tplJrnlFsizePgs(defTplJrnlFileSizePgs),
+ tplWCachePageSizeKib(defTplWCachePageSize)
{
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Must be supplied if --no-data-dir is also used.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
- "Number of files in persistence journal")
+ "Default number of files for each journal instance")
("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
- "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
- ("wcache-page-size", qpid::optValue(wCachePageSize, "N"),
+ "Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
+ ("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"),
"Size of the pages in the write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
"Lower values decrease latency at the expense of throughput.")
+ ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"),
+ "Number of files for transaction prepared list journal instance")
+ ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"),
+ "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64kiB)")
+ ("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"),
+ "Size of the pages in the transaction prepared list write page cache in KiB. "
+ "Allowable values - powers of 2: 1, 2, 4, ... , 128. "
+ "Lower values decrease latency at the expense of throughput.")
;
}
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-16 19:20:44 UTC (rev 2202)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-16 19:58:43 UTC (rev 2203)
@@ -77,9 +77,9 @@
static const u_int16_t defNumJrnlFiles = 8;
static const u_int32_t defJrnlFileSizePgs = 24;
static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- static const u_int16_t defXidStoreNumJrnlFiles = 8;
- static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
- static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ static const u_int16_t defTplNumJrnlFiles = 8;
+ static const u_int32_t defTplJrnlFileSizePgs = 24;
+ static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8;
std::list<Db*> dbs;
DbEnv env;
@@ -101,9 +101,13 @@
IdSequence messageIdSequence;
std::string storeDir;
u_int16_t numJrnlFiles;
- u_int32_t jrnlFsizePgs;
- u_int32_t wcache_pgsize_sblks;
- u_int16_t wcache_num_pages;
+ u_int32_t jrnlFsizeSblks;
+ u_int32_t wCachePgSizeSblks;
+ u_int16_t wCacheNumPages;
+ u_int16_t tplNumJrnlFiles;
+ u_int32_t tplJrnlFsizeSblks;
+ u_int32_t tplWCachePgSizeSblks;
+ u_int16_t tplWCacheNumPages;
u_int64_t highestRid;
bool isInit;
const char* envPath;
@@ -112,6 +116,15 @@
qpid::management::Store::shared_ptr mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
+ // Parameter validation and calculation
+ static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
+ const std::string paramName);
+ static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
+ const std::string paramName);
+ static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param,
+ const std::string paramName);
+ static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib);
+
void recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
queue_index& index,
@@ -207,7 +220,8 @@
std::string getBdbBaseDir();
std::string getPxidBaseDir();
inline void checkInit() {
- if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ // TODO: change the default dir to ~/.qpidd
+ if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
void chkInitPreparedXidStore();
@@ -229,11 +243,12 @@
Options(const std::string& name="Store Options");
std::string clusterName;
std::string storeDir;
- bool storeAsync;
- bool storeForce;
- uint16_t numJrnlFiles;
- uint32_t jrnlFsizePgs;
- uint32_t wCachePageSize;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
+ u_int32_t wCachePageSizeKib;
+ u_int16_t tplNumJrnlFiles;
+ u_int32_t tplJrnlFsizePgs;
+ u_int32_t tplWCachePageSizeKib;
};
typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
@@ -245,9 +260,12 @@
bool init(const qpid::Options* options);
bool init(const std::string& dir,
- u_int16_t jfiles,
- u_int32_t jfileSizePgs,
- uint32_t wCachePageSize);
+ u_int16_t jfiles = defNumJrnlFiles,
+ u_int32_t jfileSizePgs = defJrnlFileSizePgs,
+ u_int32_t wCachePageSize = defWCachePageSize,
+ u_int16_t tplJfiles = defTplNumJrnlFiles,
+ u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,
+ u_int32_t tplWCachePageSize = defTplWCachePageSize);
void initManagement (qpid::broker::Broker* broker);
16 years, 5 months
rhmessaging commits: r2202 - in store/trunk/cpp: lib and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-16 15:20:44 -0400 (Wed, 16 Jul 2008)
New Revision: 2202
Modified:
store/trunk/cpp/Makefile.am
store/trunk/cpp/configure.ac
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/TxnCtxt.h
Log:
Fixed thread sync problem for prepared list initialization; added command-line params for prepared list journal geometry to store module. Tidied up handling of command-line params.
Modified: store/trunk/cpp/Makefile.am
===================================================================
--- store/trunk/cpp/Makefile.am 2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/Makefile.am 2008-07-16 19:20:44 UTC (rev 2202)
@@ -27,4 +27,8 @@
rpmbuild: $(SPEC) dist-gzip
mkdir -p $(RPMDIRS)
rpmbuild $(RPMMACROS) $(RPMOPTS) rhm.spec
+if HAS_RPMLINT
rpmlint `find rpm -name '*.rpm'`
+else
+ @echo "WARNING: rpmlint not found, could not validate RPMs."
+endif
Modified: store/trunk/cpp/configure.ac
===================================================================
--- store/trunk/cpp/configure.ac 2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/configure.ac 2008-07-16 19:20:44 UTC (rev 2202)
@@ -175,6 +175,10 @@
# We use valgrind for the tests. See if it's available.
AC_CHECK_PROG([VALGRIND], [valgrind], [valgrind])
+# If rpmlint is available we'll run it when building RPMs.
+AC_CHECK_PROG([RPMLINT], [rpmlint], [rpmlint])
+AM_CONDITIONAL([HAS_RPMLINT], [test -n "$RPMLINT"])
+
# Also doxygen for documentation...
AC_CHECK_PROG([do_doxygen], [doxygen], [yes])
AM_CONDITIONAL([DOXYGEN], [test x$do_doxygen = xyes])
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-16 19:20:44 UTC (rev 2202)
@@ -67,17 +67,102 @@
mappingDb(&env, 0),
bindingDb(&env, 0),
generalDb(&env, 0),
- numJrnlFiles(defNumJrnlFiles),
- jrnlFsizePgs(defJrnlFileSizePgs),
- wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
- wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ numJrnlFiles(0),
+ jrnlFsizeSblks(0),
+ wCachePgSizeSblks(0),
+ wCacheNumPages(0),
+ tplNumJrnlFiles(0),
+ tplJrnlFsizeSblks(0),
+ tplWCachePgSizeSblks(0),
+ tplWCacheNumPages(0),
highestRid(0),
isInit(false),
envPath(envpath),
mgmtObject(0)
-
{}
+u_int16_t BdbMessageStore::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName)
+{
+ u_int16_t p = param;
+ if (p < JRNL_MIN_NUM_FILES) {
+ p = JRNL_MIN_NUM_FILES;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter to minimum value.");
+ } else if (p > JRNL_MAX_NUM_FILES) {
+ p = JRNL_MAX_NUM_FILES;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
+ }
+ return p;
+}
+
+u_int32_t BdbMessageStore::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName)
+{
+ u_int32_t p = param;
+ u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+ u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+ if (p < min) {
+ p = min;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << min << "); changing this parameter to minimum value.");
+ } else if (p > max) {
+ p = max;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value.");
+ }
+ return p;
+}
+
+u_int32_t BdbMessageStore::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName)
+{
+ u_int32_t p = param;
+ switch (p)
+ {
+ case 1:
+ case 2:
+ case 4:
+ case 8:
+ case 16:
+ case 32:
+ case 64:
+ case 128:
+ break;
+ default:
+ if (p == 0) {
+ // For zero value, use default
+ p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
+ } else {
+ // For any positive value, use closest value
+ if (p < 6) p = 4;
+ else if (p < 12) p = 8;
+ else if (p < 24) p = 16;
+ else if (p < 48) p = 32;
+ else if (p < 96) p = 64;
+ else if (p > 128) p = 128;
+ QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
+ }
+ }
+ return p;
+}
+
+u_int16_t BdbMessageStore::getJrnlWrNumPages(const u_int32_t wrPageSizeKib)
+{
+ u_int32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+ switch (wrPageSizeKib)
+ {
+ case 1:
+ case 2:
+ case 4:
+ // 256 KiB total cache
+ return defTotWCacheSize / wrPageSizeSblks / 4;
+ case 8:
+ case 16:
+ // 512 KiB total cache
+ return defTotWCacheSize / wrPageSizeSblks / 2;
+ default: // 32, 64, 128
+ // 1 MiB total cache
+ return defTotWCacheSize / wrPageSizeSblks;
+ }
+}
+
void BdbMessageStore::initManagement (Broker* broker)
{
if (broker != 0) {
@@ -90,44 +175,49 @@
mgmtObject->set_location(storeDir);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
- mgmtObject->set_defaultDataFileSize(jrnlFsizePgs);
+ mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
agent->addObject(mgmtObject, 50, 1);
}
}
}
+bool BdbMessageStore::init(const qpid::Options* options)
+{
+ // Extract and check options
+ const Options* opts = static_cast<const Options*>(options);
+ u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles");
+ u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs");
+ u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
+ u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles");
+ u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs");
+ u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
+
+ // Pass option values to init(...)
+ return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib);
+}
+
+// These params, taken from options, are assumed to be correct and verified
bool BdbMessageStore::init(const std::string& dir,
u_int16_t jfiles,
u_int32_t jfileSizePgs,
- uint32_t wCachePageSize)
+ u_int32_t wCachePageSizeKib,
+ u_int16_t tplJfiles,
+ u_int32_t tplJfileSizePgs,
+ u_int32_t tplWCachePageSizeKib)
{
if (isInit) return true;
+ // Set geometry members (converting to correct units where req'd)
numJrnlFiles = jfiles;
- jrnlFsizePgs = jfileSizePgs;
+ jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+ wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib);
+ tplNumJrnlFiles = tplJfiles;
+ tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
+ tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
+ tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
- // set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
- wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
- u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
- switch (wCachePageSize)
- {
- case 1:
- case 2:
- case 4:
- // 256 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 4;
- break;
- case 8:
- case 16:
- // 512 KiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks / 2;
- break;
- default: // 32, 64, 128
- // 1 MiB total cache
- wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
- }
-
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
@@ -167,81 +257,19 @@
}
isInit = true;
- QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
+ QPID_LOG(info, "BdbMessageStore module initialized: store-dir=" << dir << "; Default files per journal: " << jfiles <<
+ "; Default jrournal file size: " << jfileSizePgs << "(wpgs); Default write cache page size: " << wCachePageSizeKib << "(Kib)");
return true;
}
void BdbMessageStore::chkInitPreparedXidStore()
{
+ qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
if (!preparedXidStorePtr->is_ready()) {
- u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
- preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
+ preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages);
}
}
-bool BdbMessageStore::init(const qpid::Options* options)
-{
- const Options* opts = static_cast<const Options*>(options);
-
- u_int16_t numJrnlFiles = opts->numJrnlFiles;
- if (numJrnlFiles < JRNL_MIN_NUM_FILES) {
- numJrnlFiles = JRNL_MIN_NUM_FILES;
- QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
- } else if (numJrnlFiles > JRNL_MAX_NUM_FILES) {
- numJrnlFiles = JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
- }
-
- u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
- u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- if (jrnlFsizePgs < jrnlMinFsizePgs) {
- jrnlFsizePgs = jrnlMinFsizePgs;
- QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
- } else if (jrnlFsizePgs > jrnlMaxFsizePgs) {
- jrnlFsizePgs = jrnlMaxFsizePgs;
- QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
- }
-
- u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
- switch (jrnlWrCachePageSize)
- {
- case 1:
- case 2:
- case 4:
- case 8:
- case 16:
- case 32:
- case 64:
- case 128:
- break;
- default:
- u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0) {
- // For zero value, use default
- jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- } else {
- // For any positive value, use closest value
- if (oldJrnlWrCachePageSize < 6)
- jrnlWrCachePageSize = 4;
- else if (oldJrnlWrCachePageSize < 12)
- jrnlWrCachePageSize = 8;
- else if (oldJrnlWrCachePageSize < 24)
- jrnlWrCachePageSize = 16;
- else if (oldJrnlWrCachePageSize < 48)
- jrnlWrCachePageSize = 32;
- else if (oldJrnlWrCachePageSize < 96)
- jrnlWrCachePageSize = 64;
- else if (oldJrnlWrCachePageSize > 128)
- jrnlWrCachePageSize = 128;
- QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << jrnlWrCachePageSize << ")");
- }
- }
-
- return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
-}
-
void BdbMessageStore::open(Db& db,
DbTxn* txn,
const char* file,
@@ -303,16 +331,16 @@
JournalImpl* jQueue = 0;
FieldTable::ValuePtr value;
- uint16_t localFileCount = numJrnlFiles;
- uint32_t localFileSize = jrnlFsizePgs;
+ u_int16_t localFileCount = numJrnlFiles;
+ u_int32_t localFileSizeSblks = jrnlFsizeSblks;
value = args.get ("qpid.file_count");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileCount = (uint16_t) value->get<int>();
+ localFileCount = (u_int16_t) value->get<int>();
value = args.get ("qpid.file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
- localFileSize = (uint32_t) value->get<int>();
+ localFileSizeSblks = (u_int32_t) value->get<int>() * JRNL_RMGR_PAGE_SIZE;
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
@@ -324,10 +352,9 @@
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
- jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
+ jQueue->initialize(localFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": create() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
try {
if (!create(queueDb, queueIdSequence, queue)) {
@@ -541,7 +568,7 @@
try
{
u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
+ jQueue->recover(numJrnlFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages);
@@ -735,7 +762,7 @@
read = false;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
- assert( "Store Error: Unexpected msg state");
+ assert("Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
@@ -804,9 +831,7 @@
{
if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
- JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
- 0, thisHighestRid, 0);
+ preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
preparedXidStorePtr->recover_complete(); // start journal.
@@ -1454,21 +1479,32 @@
BdbMessageStore::Options::Options(const std::string& name) :
qpid::Options(name),
- numJrnlFiles(8),
- jrnlFsizePgs(24),
- wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
+ numJrnlFiles(defNumJrnlFiles),
+ jrnlFsizePgs(defJrnlFileSizePgs),
+ wCachePageSizeKib(defWCachePageSize),
+ tplNumJrnlFiles(defTplNumJrnlFiles),
+ tplJrnlFsizePgs(defTplJrnlFileSizePgs),
+ tplWCachePageSizeKib(defTplWCachePageSize)
{
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Must be supplied if --no-data-dir is also used.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
- "Number of files in persistence journal")
+ "Default number of files for each journal instance")
("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
- "Size of each journal file in multiples of read pages (1 read page = 64kiB)")
- ("wcache-page-size", qpid::optValue(wCachePageSize, "N"),
+ "Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
+ ("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"),
"Size of the pages in the write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
"Lower values decrease latency at the expense of throughput.")
+ ("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"),
+ "Number of files for transaction prepared list journal instance")
+ ("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"),
+ "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64kiB)")
+ ("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"),
+ "Size of the pages in the transaction prepared list write page cache in KiB. "
+ "Allowable values - powers of 2: 1, 2, 4, ... , 128. "
+ "Lower values decrease latency at the expense of throughput.")
;
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-16 19:20:44 UTC (rev 2202)
@@ -77,9 +77,9 @@
static const u_int16_t defNumJrnlFiles = 8;
static const u_int32_t defJrnlFileSizePgs = 24;
static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
- static const u_int16_t defXidStoreNumJrnlFiles = 8;
- static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
- static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+ static const u_int16_t defTplNumJrnlFiles = 8;
+ static const u_int32_t defTplJrnlFileSizePgs = 24;
+ static const u_int32_t defTplWCachePageSize = defWCachePageSize / 8;
std::list<Db*> dbs;
DbEnv env;
@@ -101,9 +101,13 @@
IdSequence messageIdSequence;
std::string storeDir;
u_int16_t numJrnlFiles;
- u_int32_t jrnlFsizePgs;
- u_int32_t wcache_pgsize_sblks;
- u_int16_t wcache_num_pages;
+ u_int32_t jrnlFsizeSblks;
+ u_int32_t wCachePgSizeSblks;
+ u_int16_t wCacheNumPages;
+ u_int16_t tplNumJrnlFiles;
+ u_int32_t tplJrnlFsizeSblks;
+ u_int32_t tplWCachePgSizeSblks;
+ u_int16_t tplWCacheNumPages;
u_int64_t highestRid;
bool isInit;
const char* envPath;
@@ -112,6 +116,15 @@
qpid::management::Store* mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
+ // Parameter validation and calculation
+ static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
+ const std::string paramName);
+ static u_int32_t chkJrnlFileSizeParam(const u_int32_t param,
+ const std::string paramName);
+ static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param,
+ const std::string paramName);
+ static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib);
+
void recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
queue_index& index,
@@ -128,7 +141,8 @@
txn_list& locked,
message_index& prepared);
qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
+ uint64_t mId,
+ unsigned& headerSize);
void recoverExchanges(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
exchange_index& index);
@@ -206,7 +220,8 @@
std::string getBdbBaseDir();
std::string getPxidBaseDir();
inline void checkInit() {
- if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
+ // TODO: change the default dir to ~/.qpidd
+ if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
void chkInitPreparedXidStore();
@@ -228,11 +243,12 @@
Options(const std::string& name="Store Options");
std::string clusterName;
std::string storeDir;
- bool storeAsync;
- bool storeForce;
- uint16_t numJrnlFiles;
- uint32_t jrnlFsizePgs;
- uint32_t wCachePageSize;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
+ u_int32_t wCachePageSizeKib;
+ u_int16_t tplNumJrnlFiles;
+ u_int32_t tplJrnlFsizePgs;
+ u_int32_t tplWCachePageSizeKib;
};
typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
@@ -244,9 +260,12 @@
bool init(const qpid::Options* options);
bool init(const std::string& dir,
- u_int16_t jfiles,
- u_int32_t jfileSizePgs,
- uint32_t wCachePageSize);
+ u_int16_t jfiles = defNumJrnlFiles,
+ u_int32_t jfileSizePgs = defJrnlFileSizePgs,
+ u_int32_t wCachePageSize = defWCachePageSize,
+ u_int16_t tplJfiles = defTplNumJrnlFiles,
+ u_int32_t tplJfileSizePgs = defTplJrnlFileSizePgs,
+ u_int32_t tplWCachePageSize = defTplWCachePageSize);
void initManagement (qpid::broker::Broker* broker);
@@ -318,9 +337,9 @@
void abort(qpid::broker::TransactionContext& ctxt);
qpid::management::ManagementObject* GetManagementObject (void) const
- { return mgmtObject; }
+ { return mgmtObject; }
- inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&)
{ return qpid::management::Manageable::STATUS_OK; }
}; // class BdbMessageStore
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-07-16 19:12:28 UTC (rev 2201)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-16 19:20:44 UTC (rev 2202)
@@ -1,24 +1,24 @@
/*
- Copyright (C) 2007 Red Hat Software
+ Copyright (C) 2007 Red Hat Software
- This file is part of Red Hat Messaging.
+ This file is part of Red Hat Messaging.
- Red Hat Messaging is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ Red Hat Messaging is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
+ The GNU Lesser General Public License is available in the file COPYING.
*/
#ifndef _TxnCtxt_
@@ -170,7 +170,7 @@
DbTxn* get() { return txn; }
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
-
+
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
void complete(bool commit) { completeTxn(commit); }
16 years, 5 months