[rhmessaging-commits] rhmessaging commits: r2207 - in store/trunk: cpp/lib/gen/qpid/management and 3 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Jul 18 09:30:50 EDT 2008
Author: kpvdr
Date: 2008-07-18 09:30:50 -0400 (Fri, 18 Jul 2008)
New Revision: 2207
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
store/trunk/cpp/lib/gen/qpid/management/Journal.h
store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.cpp
store/trunk/cpp/lib/gen/qpid/management/Store.h
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/specs/management-schema.xml
Log:
Added new management functionality for the Transaction Prepared List (TPL) instance in BdbMessageStore. Journal instances now keep track of transactional records. Also additional tidy-up on new tpl code. Renamed some functions and variables to keep naming consistent around the Transaction Prepared List (TPL) name. Deleted an old BDB function that escaped detection from a previous BDB cleanup.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -50,7 +50,7 @@
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
const bool _commit_flag) :
rid(_rid),
@@ -167,15 +167,20 @@
{
if (broker != 0) {
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-
- if (agent != 0)
- {
+ if (agent != 0) {
qpid::management::PackageMrgstore packageInitializer(agent);
- mgmtObject = new qpid::management::Store (agent, this, broker);
+ mgmtObject = new qpid::management::Store(agent, this, broker);
mgmtObject->set_location(storeDir);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
+ mgmtObject->set_tplIsInitialized(false);
+ mgmtObject->set_tplDirectory(getTplBaseDir());
+ mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ mgmtObject->set_tplWritePages(tplWCacheNumPages);
+ mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles);
+ mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles);
agent->addObject(mgmtObject, 50, 1);
}
@@ -221,7 +226,6 @@
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
- journal::jdir::create_dir(getPxidBaseDir());
try {
env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
@@ -243,11 +247,11 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
+ tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
txn.commit();
} catch (const journal::jexception& e) {
txn.abort();
- THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
+ THROW_STORE_EXCEPTION_2("Error opening tplStore instance", e.what());
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -257,16 +261,25 @@
}
isInit = true;
- QPID_LOG(info, "BdbMessageStore module initialized: store-dir=" << dir << "; Default files per journal: " << jfiles <<
- "; Default jrournal file size: " << jfileSizePgs << "(wpgs); Default write cache page size: " << wCachePageSizeKib << "(Kib)");
+ QPID_LOG(notice, "Store module initialized; dir=" << dir);
+ QPID_LOG(info, "> Default files per journal: " << jfiles);
+ QPID_LOG(info, "> Default jrournal file size: " << jfileSizePgs << " (wpgs)");
+ QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
+ QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
+ QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
+ QPID_LOG(info, "> TPL jrournal file size: " << tplJfileSizePgs << " (wpgs)");
+ QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (Kib)");
+ QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
return true;
}
-void BdbMessageStore::chkInitPreparedXidStore()
+void BdbMessageStore::chkTplStoreInit()
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- if (!preparedXidStorePtr->is_ready()) {
- preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages);
+ if (!tplStorePtr->is_ready()) {
+ journal::jdir::create_dir(getTplBaseDir());
+ tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -286,7 +299,7 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
- if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
+ if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
@@ -314,7 +327,7 @@
txn->commit(0);
try {
journal::jdir::delete_dir(getJrnlBaseDir(),true);
- journal::jdir::delete_dir(getPxidBaseDir(),true);
+ journal::jdir::delete_dir(getTplBaseDir(),true);
}
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -475,7 +488,7 @@
{
checkInit();
txn_list prepared;
- recoverXids(prepared);
+ recoverLockedMappings(prepared);
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
@@ -506,14 +519,18 @@
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
std::auto_ptr<TPCTransactionContext> txn(tpcc);
- tpcc->prepare(preparedXidStorePtr.get());
+ tpcc->prepare(tplStorePtr.get());
+ if (mgmtObject != 0) {
+ mgmtObject->inc_tplTransactionDepth();
+ mgmtObject->inc_tplTxnPrepares();
+ }
// Restore data token state in TxnCtxt
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
- if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
tpcc->recoverDtok(citr->second.rid, i->xid);
- // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+ // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTxnFlag = citr->second.deq_flag;
@@ -531,8 +548,16 @@
}
}
- if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
+ if (incomplTxnFlag) {
+ tpcc->complete(citr->second.commit_flag);
+ } else {
+ if (mgmtObject != 0) {
+ mgmtObject->inc_tplTransactionDepth();
+ mgmtObject->inc_tplTxnPrepares();
+ }
+ }
}
+
registry.recoveryComplete();
}
@@ -582,8 +607,8 @@
maxQueueId = max(key.id, maxQueueId);
}
- // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
- // the messageIdSequence is used for both queue journals and the preparedXid journal.
+ // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
+ // the messageIdSequence is used for both queue journals and the tpl journal.
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
@@ -670,7 +695,6 @@
generalIdSequence.reset(maxGeneralId + 1);
}
-
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
@@ -731,8 +755,8 @@
if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
std::string xid((char*)xidbuff, xidbuffSize);
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
- if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
+ if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
if (citr->second.commit_flag)
queue->recover(msg);
@@ -794,8 +818,6 @@
return recovery.recoverMessage(header.buffer);
}
-
-
int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
IdDbt& msgId,
RecoverableMessage::shared_ptr& msg,
@@ -827,26 +849,26 @@
return count;
}
-void BdbMessageStore::recoverPreparedXidJournal()
+void BdbMessageStore::recoverTplStore()
{
- if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+ if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- preparedXidStorePtr->recover_complete(); // start journal.
+ tplStorePtr->recover_complete(); // start journal.
}
}
-void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
{
- if (preparedXidStorePtr.get()) {
- if (!preparedXidStorePtr->is_ready())
- recoverPreparedXidJournal();
+ if (tplStorePtr.get()) {
+ if (!tplStorePtr->is_ready())
+ recoverTplStore();
// TODO: The journal will return a const txn_map and the txn_map will support
// const operations at some point. Using non-const txn_map this way is ugly...
- journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
std::vector<std::string> xidList;
tmap.xid_list(xidList);
for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
@@ -866,18 +888,18 @@
}
assert(enqCnt == 1);
assert(deqCnt <= 1);
- prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
}
}
}
-void BdbMessageStore::recoverXids(txn_list& txns)
+void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
// Abort unprepaired xids and populate the locked maps
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -886,25 +908,11 @@
}
}
-void BdbMessageStore::readLockedMappings(Db& db,
- txn_lock_map& mappings)
-{
- Cursor c;
- c.open(db, 0);
-
- Dbt key;
- IdPairDbt value;
- while (c.next(key, value)) {
- std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
- LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
- }
-}
-
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
xids.insert(i->first);
}
}
@@ -1251,16 +1259,24 @@
bool commit)
{
try {
+ chkTplStoreInit(); // Late initialize (if needed)
+
// Nothing to do if not prepared
- chkInitPreparedXidStore();
if (txn.getDtok()->is_enqueued()) {
txn.incrDtokRef();
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
+ tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
+ if (mgmtObject != 0) {
+ mgmtObject->dec_tplTransactionDepth();
+ if (commit)
+ mgmtObject->inc_tplTxnCommits();
+ else
+ mgmtObject->inc_tplTxnAborts();
+ }
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
throw;
@@ -1293,15 +1309,20 @@
void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
{
try {
- chkInitPreparedXidStore();
+ chkTplStoreInit(); // Late initialize (if needed)
+
ctxt->incrDtokRef();
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
- ctxt->prepare(preparedXidStorePtr.get());
+ tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
+ if (mgmtObject != 0) {
+ mgmtObject->inc_tplTransactionDepth();
+ mgmtObject->inc_tplTxnPrepares();
+ }
} catch (const std::exception& e) {
QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
throw;
@@ -1450,10 +1471,10 @@
return dir.str();
}
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getTplBaseDir()
{
std::stringstream dir;
- dir << storeDir << "/rhm/pxid/" ;
+ dir << storeDir << "/rhm/tpl/" ;
return dir.str();
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-18 13:30:50 UTC (rev 2207)
@@ -61,17 +61,17 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for preparedXidStore recover state
- struct PreparedRecoverStruct {
+ // Structs for Transaction Recover List (TPL) recover state
+ struct TplRecoverStruct {
u_int64_t rid;
bool deq_flag;
bool commit_flag;
- PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
};
- typedef PreparedRecoverStruct PreparedRecover;
- typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
- typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
- typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+ typedef TplRecoverStruct TplRecover;
+ typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
+ typedef std::map<std::string, TplRecover> TplRecoverMap;
+ typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -91,9 +91,9 @@
Db bindingDb;
Db generalDb;
- // Pointer to prepared XID journal instance
- boost::shared_ptr<JournalImpl> preparedXidStorePtr;
- PreparedRecoverMap preparedXidStoreRecoverMap;
+ // Pointer to Transaction Prepared List (TPL) journal instance
+ boost::shared_ptr<JournalImpl> tplStorePtr;
+ TplRecoverMap tplRecoverMap;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
@@ -157,11 +157,9 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverPreparedXidJournal();
- void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
- void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db,
- txn_lock_map& mappings);
+ void recoverTplStore();
+ void getTplRecoverMap(TplRecoverMap& tplMap);
+ void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
@@ -218,12 +216,12 @@
std::string getJrnlDir(const char* queueName);
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
- std::string getPxidBaseDir();
+ std::string getTplBaseDir();
inline void checkInit() {
// TODO: change the default dir to ~/.qpidd
if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
- void chkInitPreparedXidStore();
+ void chkTplStoreInit();
// debug aid for printing XIDs that may contain non-printable chars
static std::string xid2str(const std::string xid) {
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -74,7 +74,6 @@
journalTimerPtr->add(inactivityFireEventPtr);
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-
if (agent != 0)
{
_mgmtObject = new qpid::management::Journal(agent, (qpid::management::Manageable*) this);
@@ -84,6 +83,13 @@
_mgmtObject->set_baseFileName(journalBaseFilename);
_mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_readPages(JRNL_RMGR_PAGES);
+
+ // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
+ _mgmtObject->set_initialFileCount(0);
+ _mgmtObject->set_dataFileSize(0);
+ _mgmtObject->set_currentFileCount(0);
+ _mgmtObject->set_writePageSize(0);
+ _mgmtObject->set_writePages(0);
agent->addObject(_mgmtObject);
}
@@ -163,6 +169,15 @@
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
+ if (_mgmtObject != 0)
+ {
+ _mgmtObject->set_initialFileCount(_num_jfiles);
+ _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_currentFileCount(_num_jfiles);
+ _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+ _mgmtObject->set_writePages(wcache_num_pages);
+ }
+
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
@@ -206,12 +221,10 @@
if (_mgmtObject != 0)
{
- _mgmtObject->set_initialFileCount(_num_jfiles);
- _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_currentFileCount(_num_jfiles);
_mgmtObject->inc_recordDepth(_emap.size());
- _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- _mgmtObject->set_writePages(wcache_num_pages);
+ _mgmtObject->inc_enqueues(_emap.size());
+ _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt());
+ _mgmtObject->inc_txnDequeues(_tmap.deq_cnt());
}
}
@@ -301,7 +314,7 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -314,7 +327,7 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -327,7 +340,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_txnEnqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -340,7 +354,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordEnqueues();
+ _mgmtObject->inc_enqueues();
+ _mgmtObject->inc_txnEnqueues();
_mgmtObject->inc_recordDepth();
}
}
@@ -352,7 +367,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordDequeues();
+ _mgmtObject->inc_dequeues();
+ _mgmtObject->inc_txnDequeues();
_mgmtObject->dec_recordDepth();
}
}
@@ -364,7 +380,8 @@
if (_mgmtObject != 0)
{
- _mgmtObject->inc_recordDequeues();
+ _mgmtObject->inc_dequeues();
+ _mgmtObject->inc_txnDequeues();
_mgmtObject->dec_recordDepth();
}
}
@@ -373,12 +390,18 @@
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_abort(dtokp, xid));
+
+ if (_mgmtObject != 0)
+ _mgmtObject->inc_txnAborts();
}
void
JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_commit(dtokp, xid));
+
+ if (_mgmtObject != 0)
+ _mgmtObject->inc_txnCommits();
}
void
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -26,7 +26,7 @@
#include "qpid/management/Manageable.h"
#include "qpid/agent/ManagementAgent.h"
#include "Journal.h"
-#include "qpid/management/ArgsJournalExpand.h"
+#include "ArgsJournalExpand.h"
using namespace qpid::management;
@@ -37,15 +37,12 @@
string Journal::packageName = string ("mrgstore");
string Journal::className = string ("journal");
uint8_t Journal::md5Sum[16] =
- {0x0,0xdf,0x9a,0xf7,0x4,0x98,0x29,0x54,0xde,0x42,0xc5,0xf5,0xf5,0x13,0xab,0xa5};
+ {0x1e,0x63,0xa4,0x3d,0xa3,0x1b,0xc0,0x1,0x1,0x70,0x5a,0x2a,0xb4,0xa,0x1b,0x4e};
Journal::Journal (ManagementAgent* _agent, Manageable* _core) :
ManagementObject(_agent, _core)
{
- initialFileCount = 0;
- dataFileSize = 0;
- currentFileCount = 0;
recordDepth = 0;
recordDepthHigh = 0;
recordDepthLow = 0;
@@ -107,24 +104,24 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (8); // Config Element Count
- buf.putShort (27); // Inst Element Count
+ buf.putShort (11); // Config Element Count
+ buf.putShort (28); // Inst Element Count
buf.putShort (1); // Method Count
buf.putShort (0); // Event Count
// Properties
ft = FieldTable ();
- ft.setString (NAME, "name");
- ft.setInt (TYPE, TYPE_SSTR);
+ ft.setString (NAME, "queueRef");
+ ft.setInt (TYPE, TYPE_REF);
ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 1);
+ ft.setInt (INDEX, 0);
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "queueRef");
- ft.setInt (TYPE, TYPE_REF);
+ ft.setString (NAME, "name");
+ ft.setInt (TYPE, TYPE_SSTR);
ft.setInt (ACCESS, ACCESS_RO);
- ft.setInt (INDEX, 0);
+ ft.setInt (INDEX, 1);
buf.put (ft);
ft = FieldTable ();
@@ -179,11 +176,11 @@
ft.setString (DESC, "Number of pages in read-page-cache");
buf.put (ft);
-
- // Statistics
ft = FieldTable ();
ft.setString (NAME, "initialFileCount");
ft.setInt (TYPE, TYPE_U16);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
ft.setString (UNIT, "file");
ft.setString (DESC, "Number of files initially allocated to this journal");
buf.put (ft);
@@ -191,6 +188,8 @@
ft = FieldTable ();
ft.setString (NAME, "dataFileSize");
ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
ft.setString (UNIT, "byte");
ft.setString (DESC, "Size of each journal data file");
buf.put (ft);
@@ -198,64 +197,96 @@
ft = FieldTable ();
ft.setString (NAME, "currentFileCount");
ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
ft.setString (UNIT, "file");
ft.setString (DESC, "Number of files currently allocated to this journal");
buf.put (ft);
+
+ // Statistics
ft = FieldTable ();
ft.setString (NAME, "recordDepth");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "record");
- ft.setString (DESC, "Number of enqueued records (durable messages)");
+ ft.setString (DESC, "Number of currently enqueued records (durable messages)");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "recordDepthHigh");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "record");
- ft.setString (DESC, "Number of enqueued records (durable messages) (High)");
+ ft.setString (DESC, "Number of currently enqueued records (durable messages) (High)");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "recordDepthLow");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "record");
- ft.setString (DESC, "Number of enqueued records (durable messages) (Low)");
+ ft.setString (DESC, "Number of currently enqueued records (durable messages) (Low)");
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "recordEnqueues");
+ ft.setString (NAME, "enqueues");
ft.setInt (TYPE, TYPE_U64);
ft.setString (UNIT, "record");
ft.setString (DESC, "Total enqueued records on journal");
buf.put (ft);
ft = FieldTable ();
- ft.setString (NAME, "recordDequeues");
+ ft.setString (NAME, "dequeues");
ft.setInt (TYPE, TYPE_U64);
ft.setString (UNIT, "record");
ft.setString (DESC, "Total dequeued records on journal");
buf.put (ft);
ft = FieldTable ();
+ ft.setString (NAME, "txnEnqueues");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional enqueued records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "txnDequeues");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional dequeued records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "txnCommits");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional commit records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "txnAborts");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transactional abort records on journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
ft.setString (NAME, "outstandingAIOs");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "aio_op");
- ft.setString (DESC, "Number of outstanding AIO requests in Async IO system");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "outstandingAIOsHigh");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "aio_op");
- ft.setString (DESC, "Number of outstanding AIO requests in Async IO system (High)");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (High)");
buf.put (ft);
ft = FieldTable ();
ft.setString (NAME, "outstandingAIOsLow");
ft.setInt (TYPE, TYPE_U32);
ft.setString (UNIT, "aio_op");
- ft.setString (DESC, "Number of outstanding AIO requests in Async IO system (Low)");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (Low)");
buf.put (ft);
ft = FieldTable ();
@@ -393,8 +424,12 @@
void Journal::aggregatePerThreadStats(struct PerThreadStats* totals)
{
- totals->recordEnqueues = 0;
- totals->recordDequeues = 0;
+ totals->enqueues = 0;
+ totals->dequeues = 0;
+ totals->txnEnqueues = 0;
+ totals->txnDequeues = 0;
+ totals->txnCommits = 0;
+ totals->txnAborts = 0;
totals->writeWaitFailures = 0;
totals->writeBusyFailures = 0;
totals->readRecordCount = 0;
@@ -403,8 +438,12 @@
for (int idx = 0; idx < maxThreads; idx++) {
struct PerThreadStats* threadStats = perThreadStatsArray[idx];
if (threadStats != 0) {
- totals->recordEnqueues += threadStats->recordEnqueues;
- totals->recordDequeues += threadStats->recordDequeues;
+ totals->enqueues += threadStats->enqueues;
+ totals->dequeues += threadStats->dequeues;
+ totals->txnEnqueues += threadStats->txnEnqueues;
+ totals->txnDequeues += threadStats->txnDequeues;
+ totals->txnCommits += threadStats->txnCommits;
+ totals->txnAborts += threadStats->txnAborts;
totals->writeWaitFailures += threadStats->writeWaitFailures;
totals->writeBusyFailures += threadStats->writeBusyFailures;
totals->readRecordCount += threadStats->readRecordCount;
@@ -421,14 +460,17 @@
configChanged = false;
writeTimestamps (buf);
+ buf.putLongLong (queueRef);
buf.putShortString (name);
- buf.putLongLong (queueRef);
buf.putShortString (directory);
buf.putShortString (baseFileName);
buf.putLong (writePageSize);
buf.putLong (writePages);
buf.putLong (readPageSize);
buf.putLong (readPages);
+ buf.putShort (initialFileCount);
+ buf.putLong (dataFileSize);
+ buf.putLong (currentFileCount);
}
@@ -444,14 +486,15 @@
if (!skipHeaders)
writeTimestamps (buf);
- buf.putShort (initialFileCount);
- buf.putLong (dataFileSize);
- buf.putLong (currentFileCount);
buf.putLong (recordDepth);
buf.putLong (recordDepthHigh);
buf.putLong (recordDepthLow);
- buf.putLongLong (totals.recordEnqueues);
- buf.putLongLong (totals.recordDequeues);
+ buf.putLongLong (totals.enqueues);
+ buf.putLongLong (totals.dequeues);
+ buf.putLongLong (totals.txnEnqueues);
+ buf.putLongLong (totals.txnDequeues);
+ buf.putLongLong (totals.txnCommits);
+ buf.putLongLong (totals.txnAborts);
buf.putLong (outstandingAIOs);
buf.putLong (outstandingAIOsHigh);
buf.putLong (outstandingAIOsLow);
Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.h 2008-07-18 13:30:50 UTC (rev 2207)
@@ -40,19 +40,19 @@
static uint8_t md5Sum[16];
// Properties
+ uint64_t queueRef;
std::string name;
- uint64_t queueRef;
std::string directory;
std::string baseFileName;
uint32_t writePageSize;
uint32_t writePages;
uint32_t readPageSize;
uint32_t readPages;
+ uint16_t initialFileCount;
+ uint32_t dataFileSize;
+ uint32_t currentFileCount;
// Statistics
- uint16_t initialFileCount;
- uint32_t dataFileSize;
- uint32_t currentFileCount;
uint32_t recordDepth;
uint32_t recordDepthHigh;
uint32_t recordDepthLow;
@@ -75,8 +75,12 @@
// Per-Thread Statistics
struct PerThreadStats {
- uint64_t recordEnqueues;
- uint64_t recordDequeues;
+ uint64_t enqueues;
+ uint64_t dequeues;
+ uint64_t txnEnqueues;
+ uint64_t txnDequeues;
+ uint64_t txnCommits;
+ uint64_t txnAborts;
uint64_t writeWaitFailures;
uint64_t writeBusyFailures;
uint64_t readRecordCount;
@@ -92,8 +96,12 @@
if (threadStats == 0) {
threadStats = new(PerThreadStats);
perThreadStatsArray[index] = threadStats;
- threadStats->recordEnqueues = 0;
- threadStats->recordDequeues = 0;
+ threadStats->enqueues = 0;
+ threadStats->dequeues = 0;
+ threadStats->txnEnqueues = 0;
+ threadStats->txnDequeues = 0;
+ threadStats->txnCommits = 0;
+ threadStats->txnAborts = 0;
threadStats->writeWaitFailures = 0;
threadStats->writeBusyFailures = 0;
threadStats->readRecordCount = 0;
@@ -134,14 +142,14 @@
static const uint32_t METHOD_EXPAND = 1;
// Accessor Methods
- inline void set_name (std::string val){
+ inline void set_queueRef (uint64_t val){
sys::Mutex::ScopedLock mutex(accessLock);
- name = val;
+ queueRef = val;
configChanged = true;
}
- inline void set_queueRef (uint64_t val){
+ inline void set_name (std::string val){
sys::Mutex::ScopedLock mutex(accessLock);
- queueRef = val;
+ name = val;
configChanged = true;
}
inline void set_directory (std::string val){
@@ -177,17 +185,17 @@
inline void set_initialFileCount (uint16_t val){
sys::Mutex::ScopedLock mutex(accessLock);
initialFileCount = val;
- instChanged = true;
+ configChanged = true;
}
inline void set_dataFileSize (uint32_t val){
sys::Mutex::ScopedLock mutex(accessLock);
dataFileSize = val;
- instChanged = true;
+ configChanged = true;
}
inline void set_currentFileCount (uint32_t val){
sys::Mutex::ScopedLock mutex(accessLock);
currentFileCount = val;
- instChanged = true;
+ configChanged = true;
}
inline void inc_recordDepth (uint32_t by = 1){
sys::Mutex::ScopedLock mutex(accessLock);
@@ -203,22 +211,54 @@
recordDepthLow = recordDepth;
instChanged = true;
}
- inline void inc_recordEnqueues (uint64_t by = 1){
- getThreadStats()->recordEnqueues += by;
+ inline void inc_enqueues (uint64_t by = 1){
+ getThreadStats()->enqueues += by;
instChanged = true;
}
- inline void dec_recordEnqueues (uint64_t by = 1){
- getThreadStats()->recordEnqueues -= by;
+ inline void dec_enqueues (uint64_t by = 1){
+ getThreadStats()->enqueues -= by;
instChanged = true;
}
- inline void inc_recordDequeues (uint64_t by = 1){
- getThreadStats()->recordDequeues += by;
+ inline void inc_dequeues (uint64_t by = 1){
+ getThreadStats()->dequeues += by;
instChanged = true;
}
- inline void dec_recordDequeues (uint64_t by = 1){
- getThreadStats()->recordDequeues -= by;
+ inline void dec_dequeues (uint64_t by = 1){
+ getThreadStats()->dequeues -= by;
instChanged = true;
}
+ inline void inc_txnEnqueues (uint64_t by = 1){
+ getThreadStats()->txnEnqueues += by;
+ instChanged = true;
+ }
+ inline void dec_txnEnqueues (uint64_t by = 1){
+ getThreadStats()->txnEnqueues -= by;
+ instChanged = true;
+ }
+ inline void inc_txnDequeues (uint64_t by = 1){
+ getThreadStats()->txnDequeues += by;
+ instChanged = true;
+ }
+ inline void dec_txnDequeues (uint64_t by = 1){
+ getThreadStats()->txnDequeues -= by;
+ instChanged = true;
+ }
+ inline void inc_txnCommits (uint64_t by = 1){
+ getThreadStats()->txnCommits += by;
+ instChanged = true;
+ }
+ inline void dec_txnCommits (uint64_t by = 1){
+ getThreadStats()->txnCommits -= by;
+ instChanged = true;
+ }
+ inline void inc_txnAborts (uint64_t by = 1){
+ getThreadStats()->txnAborts += by;
+ instChanged = true;
+ }
+ inline void dec_txnAborts (uint64_t by = 1){
+ getThreadStats()->txnAborts -= by;
+ instChanged = true;
+ }
inline void inc_outstandingAIOs (uint32_t by = 1){
sys::Mutex::ScopedLock mutex(accessLock);
outstandingAIOs += by;
Modified: store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -21,9 +21,9 @@
// This source file was created by a code generator.
// Please do not edit.
-#include "qpid/management/PackageMrgstore.h"
-#include "qpid/management/Store.h"
-#include "qpid/management/Journal.h"
+#include "PackageMrgstore.h"
+#include "Store.h"
+#include "Journal.h"
using namespace qpid::management;
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -36,19 +36,35 @@
string Store::packageName = string ("mrgstore");
string Store::className = string ("store");
uint8_t Store::md5Sum[16] =
- {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
+ {0x18,0xd,0xd4,0x15,0xd3,0x9a,0xf,0xbe,0x3a,0x40,0xe1,0x1b,0x9e,0x5b,0x7e,0x86};
Store::Store (ManagementAgent* _agent, Manageable* _core, Manageable* _parent) :
ManagementObject(_agent, _core)
{
brokerRef = _parent->GetManagementObject ()->getObjectId ();
+ tplTransactionDepth = 0;
+ tplTransactionDepthHigh = 0;
+ tplTransactionDepthLow = 0;
+ tplOutstandingAIOs = 0;
+ tplOutstandingAIOsHigh = 0;
+ tplOutstandingAIOsLow = 0;
+ maxThreads = agent->getMaxThreads();
+ perThreadStatsArray = new struct PerThreadStats*[maxThreads];
+ for (int idx = 0; idx < maxThreads; idx++)
+ perThreadStatsArray[idx] = 0;
+
}
Store::~Store ()
{
+ for (int idx = 0; idx < maxThreads; idx++)
+ if (perThreadStatsArray[idx] != 0)
+ delete perThreadStatsArray[idx];
+ delete[] perThreadStatsArray;
+
}
namespace {
@@ -75,8 +91,8 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
- buf.putShort (4); // Config Element Count
- buf.putShort (0); // Inst Element Count
+ buf.putShort (11); // Config Element Count
+ buf.putShort (9); // Inst Element Count
buf.putShort (0); // Method Count
buf.putShort (0); // Event Count
@@ -114,9 +130,133 @@
ft.setString (DESC, "Default size of each journal data file");
buf.put (ft);
+ ft = FieldTable ();
+ ft.setString (NAME, "tplIsInitialized");
+ ft.setInt (TYPE, TYPE_BOOL);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (DESC, "Transaction prepared list has been initialized by a transactional prepare");
+ buf.put (ft);
+ ft = FieldTable ();
+ ft.setString (NAME, "tplDirectory");
+ ft.setInt (TYPE, TYPE_SSTR);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (DESC, "Transaction prepared list directory");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplWritePageSize");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "byte");
+ ft.setString (DESC, "Page size in transaction prepared list write-page-cache");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplWritePages");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "wpage");
+ ft.setString (DESC, "Number of pages in transaction prepared list write-page-cache");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplInitialFileCount");
+ ft.setInt (TYPE, TYPE_U16);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "file");
+ ft.setString (DESC, "Number of files initially allocated to transaction prepared list journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplDataFileSize");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "byte");
+ ft.setString (DESC, "Size of each journal data file in transaction prepared list journal");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplCurrentFileCount");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setInt (ACCESS, ACCESS_RO);
+ ft.setInt (INDEX, 0);
+ ft.setString (UNIT, "file");
+ ft.setString (DESC, "Number of files currently allocated to transaction prepared list journal");
+ buf.put (ft);
+
+
// Statistics
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTransactionDepth");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "txn");
+ ft.setString (DESC, "Number of currently enqueued prepared transactions");
+ buf.put (ft);
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTransactionDepthHigh");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "txn");
+ ft.setString (DESC, "Number of currently enqueued prepared transactions (High)");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTransactionDepthLow");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "txn");
+ ft.setString (DESC, "Number of currently enqueued prepared transactions (Low)");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTxnPrepares");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transaction prepares on transaction prepared list");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTxnCommits");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transaction commits on transaction prepared list");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplTxnAborts");
+ ft.setInt (TYPE, TYPE_U64);
+ ft.setString (UNIT, "record");
+ ft.setString (DESC, "Total transaction aborts on transaction prepared list");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplOutstandingAIOs");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "aio_op");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplOutstandingAIOsHigh");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "aio_op");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (High)");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString (NAME, "tplOutstandingAIOsLow");
+ ft.setInt (TYPE, TYPE_U32);
+ ft.setString (UNIT, "aio_op");
+ ft.setString (DESC, "Number of currently outstanding AIO requests in Async IO system (Low)");
+ buf.put (ft);
+
+
// Methods
// Events
@@ -124,7 +264,24 @@
}
+void Store::aggregatePerThreadStats(struct PerThreadStats* totals)
+{
+ totals->tplTxnPrepares = 0;
+ totals->tplTxnCommits = 0;
+ totals->tplTxnAborts = 0;
+ for (int idx = 0; idx < maxThreads; idx++) {
+ struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+ if (threadStats != 0) {
+ totals->tplTxnPrepares += threadStats->tplTxnPrepares;
+ totals->tplTxnCommits += threadStats->tplTxnCommits;
+ totals->tplTxnAborts += threadStats->tplTxnAborts;
+
+ }
+ }
+}
+
+
void Store::writeProperties (Buffer& buf)
{
sys::Mutex::ScopedLock mutex(accessLock);
@@ -135,6 +292,13 @@
buf.putShortString (location);
buf.putShort (defaultInitialFileCount);
buf.putLong (defaultDataFileSize);
+ buf.putOctet (tplIsInitialized?1:0);
+ buf.putShortString (tplDirectory);
+ buf.putLong (tplWritePageSize);
+ buf.putLong (tplWritePages);
+ buf.putShort (tplInitialFileCount);
+ buf.putLong (tplDataFileSize);
+ buf.putLong (tplCurrentFileCount);
}
@@ -144,12 +308,28 @@
instChanged = false;
+ struct PerThreadStats totals;
+ aggregatePerThreadStats(&totals);
+
if (!skipHeaders)
writeTimestamps (buf);
+ buf.putLong (tplTransactionDepth);
+ buf.putLong (tplTransactionDepthHigh);
+ buf.putLong (tplTransactionDepthLow);
+ buf.putLongLong (totals.tplTxnPrepares);
+ buf.putLongLong (totals.tplTxnCommits);
+ buf.putLongLong (totals.tplTxnAborts);
+ buf.putLong (tplOutstandingAIOs);
+ buf.putLong (tplOutstandingAIOsHigh);
+ buf.putLong (tplOutstandingAIOsLow);
// Maintenance of hi-lo statistics
+ tplTransactionDepthHigh = tplTransactionDepth;
+ tplTransactionDepthLow = tplTransactionDepth;
+ tplOutstandingAIOsHigh = tplOutstandingAIOs;
+ tplOutstandingAIOsLow = tplOutstandingAIOs;
}
Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h 2008-07-18 13:30:50 UTC (rev 2207)
@@ -44,10 +44,49 @@
std::string location;
uint16_t defaultInitialFileCount;
uint32_t defaultDataFileSize;
+ uint8_t tplIsInitialized;
+ std::string tplDirectory;
+ uint32_t tplWritePageSize;
+ uint32_t tplWritePages;
+ uint16_t tplInitialFileCount;
+ uint32_t tplDataFileSize;
+ uint32_t tplCurrentFileCount;
// Statistics
+ uint32_t tplTransactionDepth;
+ uint32_t tplTransactionDepthHigh;
+ uint32_t tplTransactionDepthLow;
+ uint32_t tplOutstandingAIOs;
+ uint32_t tplOutstandingAIOsHigh;
+ uint32_t tplOutstandingAIOsLow;
+ // Per-Thread Statistics
+ struct PerThreadStats {
+ uint64_t tplTxnPrepares;
+ uint64_t tplTxnCommits;
+ uint64_t tplTxnAborts;
+
+ };
+
+ struct PerThreadStats** perThreadStatsArray;
+
+ inline struct PerThreadStats* getThreadStats() {
+ int index = getThreadIndex();
+ struct PerThreadStats* threadStats = perThreadStatsArray[index];
+ if (threadStats == 0) {
+ threadStats = new(PerThreadStats);
+ perThreadStatsArray[index] = threadStats;
+ threadStats->tplTxnPrepares = 0;
+ threadStats->tplTxnCommits = 0;
+ threadStats->tplTxnAborts = 0;
+
+ }
+ return threadStats;
+ }
+
+ void aggregatePerThreadStats(struct PerThreadStats*);
+
// Private Methods
static void writeSchema (qpid::framing::Buffer& buf);
void writeProperties (qpid::framing::Buffer& buf);
@@ -58,9 +97,6 @@
qpid::framing::Buffer& outBuf);
writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
- // Stub for getInstChanged. There are no statistics in this class.
- bool getInstChanged (void) { return false; }
-
public:
friend class PackageMrgstore;
@@ -98,6 +134,93 @@
defaultDataFileSize = val;
configChanged = true;
}
+ inline void set_tplIsInitialized (uint8_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplIsInitialized = val;
+ configChanged = true;
+ }
+ inline void set_tplDirectory (std::string val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplDirectory = val;
+ configChanged = true;
+ }
+ inline void set_tplWritePageSize (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplWritePageSize = val;
+ configChanged = true;
+ }
+ inline void set_tplWritePages (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplWritePages = val;
+ configChanged = true;
+ }
+ inline void set_tplInitialFileCount (uint16_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplInitialFileCount = val;
+ configChanged = true;
+ }
+ inline void set_tplDataFileSize (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplDataFileSize = val;
+ configChanged = true;
+ }
+ inline void set_tplCurrentFileCount (uint32_t val){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplCurrentFileCount = val;
+ configChanged = true;
+ }
+ inline void inc_tplTransactionDepth (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplTransactionDepth += by;
+ if (tplTransactionDepthHigh < tplTransactionDepth)
+ tplTransactionDepthHigh = tplTransactionDepth;
+ instChanged = true;
+ }
+ inline void dec_tplTransactionDepth (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplTransactionDepth -= by;
+ if (tplTransactionDepthLow > tplTransactionDepth)
+ tplTransactionDepthLow = tplTransactionDepth;
+ instChanged = true;
+ }
+ inline void inc_tplTxnPrepares (uint64_t by = 1){
+ getThreadStats()->tplTxnPrepares += by;
+ instChanged = true;
+ }
+ inline void dec_tplTxnPrepares (uint64_t by = 1){
+ getThreadStats()->tplTxnPrepares -= by;
+ instChanged = true;
+ }
+ inline void inc_tplTxnCommits (uint64_t by = 1){
+ getThreadStats()->tplTxnCommits += by;
+ instChanged = true;
+ }
+ inline void dec_tplTxnCommits (uint64_t by = 1){
+ getThreadStats()->tplTxnCommits -= by;
+ instChanged = true;
+ }
+ inline void inc_tplTxnAborts (uint64_t by = 1){
+ getThreadStats()->tplTxnAborts += by;
+ instChanged = true;
+ }
+ inline void dec_tplTxnAborts (uint64_t by = 1){
+ getThreadStats()->tplTxnAborts -= by;
+ instChanged = true;
+ }
+ inline void inc_tplOutstandingAIOs (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplOutstandingAIOs += by;
+ if (tplOutstandingAIOsHigh < tplOutstandingAIOs)
+ tplOutstandingAIOsHigh = tplOutstandingAIOs;
+ instChanged = true;
+ }
+ inline void dec_tplOutstandingAIOs (uint32_t by = 1){
+ sys::Mutex::ScopedLock mutex(accessLock);
+ tplOutstandingAIOs -= by;
+ if (tplOutstandingAIOsLow > tplOutstandingAIOs)
+ tplOutstandingAIOsLow = tplOutstandingAIOs;
+ instChanged = true;
+ }
};
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -146,6 +146,38 @@
return itr->second.size();
}
+u_int32_t
+txn_map::cnt(const bool enq_flag)
+{
+ slock s(&_mutex);
+ u_int32_t c = 0;
+ for (xmap_itr i = _map.begin(); i != _map.end(); i++)
+ {
+ for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+ {
+ if (j->_enq_flag == enq_flag)
+ c++;
+ }
+ }
+ return c;
+}
+
+u_int32_t
+txn_map::cnt(const std::string& xid, const bool enq_flag)
+{
+ slock s(&_mutex);
+ u_int32_t c = 0;
+ xmap_itr i = _map.find(xid);
+ if (i == _map.end()) // not found in map
+ return 0;
+ for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+ {
+ if (j->_enq_flag == enq_flag)
+ c++;
+ }
+ return c;
+}
+
bool
txn_map::is_txn_synced(const std::string& xid)
{
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -132,6 +132,10 @@
const txn_data_list get_remove_tdata_list(const std::string& xid);
bool in_map(const std::string& xid);
u_int32_t get_rid_count(const std::string& xid);
+ inline u_int32_t enq_cnt() { return cnt(true); }
+ inline u_int32_t enq_cnt(const std::string& xid) { return cnt(xid, true); }
+ inline u_int32_t deq_cnt() { return cnt(true); }
+ inline u_int32_t deq_cnt(const std::string& xid) { return cnt(xid, false); }
bool is_txn_synced(const std::string& xid);
bool set_aio_compl(const std::string& xid, const u_int64_t rid);
const txn_data& get_data(const std::string& xid, const u_int64_t rid);
@@ -140,6 +144,8 @@
inline u_int32_t size() const { return u_int32_t(_map.size()); }
void xid_list(std::vector<std::string>& xv);
private:
+ u_int32_t cnt(const bool enq_flag);
+ u_int32_t cnt(const std::string& xid, const bool enq_flag);
static std::string xid_format(const std::string& xid);
};
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-18 13:30:50 UTC (rev 2207)
@@ -189,7 +189,7 @@
return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
}
u_int32_t getRemainingPreparedListTxns() {
- return preparedXidStorePtr->get_open_txn_cnt();
+ return tplStorePtr->get_open_txn_cnt();
}
};
Modified: store/trunk/specs/management-schema.xml
===================================================================
--- store/trunk/specs/management-schema.xml 2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/specs/management-schema.xml 2008-07-18 13:30:50 UTC (rev 2207)
@@ -4,30 +4,47 @@
License Text
-->
<class name="Store">
- <property name="brokerRef" type="objId" references="qpid.Broker" access="RO" index="y" parentRef="y"/>
+ <property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/>
<property name="location" type="sstr" access="RO" desc="Logical directory on disk"/>
<property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/>
<property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/>
+ <property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/>
+ <property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/>
+ <property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/>
+ <property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/>
+ <property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/>
+ <property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/>
+ <property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/>
+
+ <statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/>
+ <statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/>
+ <statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/>
+ <statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/>
+ <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
</class>
<class name="Journal">
- <property name="name" type="sstr" access="RO" index="y"/>
- <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/>
- <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/>
- <property name="baseFileName" type="sstr" access="RO" desc="Base filename prefix for journal"/>
- <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Page size in write-page-cache"/>
- <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
- <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Page size in read-page-cache"/>
- <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
+ <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/>
+ <property name="name" type="sstr" access="RO" index="y"/>
+ <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/>
+ <property name="baseFileName" type="sstr" access="RO" desc="Base filename prefix for journal"/>
+ <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Page size in write-page-cache"/>
+ <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
+ <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Page size in read-page-cache"/>
+ <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
+ <property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/>
+ <property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/>
+ <property name="currentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to this journal"/>
+
+ <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/>
+ <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
+ <statistic name="dequeues" type="count64" unit="record" desc="Total dequeued records on journal"/>
+ <statistic name="txnEnqueues" type="count64" unit="record" desc="Total transactional enqueued records on journal"/>
+ <statistic name="txnDequeues" type="count64" unit="record" desc="Total transactional dequeued records on journal"/>
+ <statistic name="txnCommits" type="count64" unit="record" desc="Total transactional commit records on journal"/>
+ <statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/>
+ <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
- <statistic name="initialFileCount" type="uint16" unit="file" desc="Number of files initially allocated to this journal"/>
- <statistic name="dataFileSize" type="uint32" unit="byte" desc="Size of each journal data file"/>
- <statistic name="currentFileCount" type="uint32" unit="file" desc="Number of files currently allocated to this journal"/>
- <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of enqueued records (durable messages)"/>
- <statistic name="recordEnqueues" type="count64" unit="record" desc="Total enqueued records on journal"/>
- <statistic name="recordDequeues" type="count64" unit="record" desc="Total dequeued records on journal"/>
- <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of outstanding AIO requests in Async IO system"/>
-
<!--
The following are not yet "wired up" in JournalImpl.cpp
-->
More information about the rhmessaging-commits
mailing list