[rhmessaging-commits] rhmessaging commits: r2207 - in store/trunk: cpp/lib/gen/qpid/management and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Jul 18 09:30:50 EDT 2008


Author: kpvdr
Date: 2008-07-18 09:30:50 -0400 (Fri, 18 Jul 2008)
New Revision: 2207

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
   store/trunk/cpp/lib/gen/qpid/management/Journal.h
   store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
   store/trunk/cpp/lib/gen/qpid/management/Store.cpp
   store/trunk/cpp/lib/gen/qpid/management/Store.h
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
   store/trunk/specs/management-schema.xml
Log:
Added new management functionality for the Transaction Prepared List (TPL) instance in BdbMessageStore. Journal instances now keep track of transactional records. Also additional tidy-up on new tpl code. Renamed some functions and variables to keep naming consistent around the Transaction Prepared List (TPL) name. Deleted an old BDB function that escaped detection from a previous BDB cleanup.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -50,7 +50,7 @@
 qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
 qpid::sys::Mutex TxnCtxt::globalSerialiser;
 
-BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
                                                               const bool _deq_flag,
                                                               const bool _commit_flag) :
                                                               rid(_rid),
@@ -167,15 +167,20 @@
 {
     if (broker != 0) {
         ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-
-        if (agent != 0)
-        {
+        if (agent != 0) {
             qpid::management::PackageMrgstore packageInitializer(agent);
-            mgmtObject = new qpid::management::Store (agent, this, broker);
+            mgmtObject = new qpid::management::Store(agent, this, broker);
 
             mgmtObject->set_location(storeDir);
             mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
             mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
+            mgmtObject->set_tplIsInitialized(false);
+            mgmtObject->set_tplDirectory(getTplBaseDir());
+            mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+            mgmtObject->set_tplWritePages(tplWCacheNumPages);
+            mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles);
+            mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+            mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles);
 
             agent->addObject(mgmtObject, 50, 1);
         }
@@ -221,7 +226,6 @@
     if (dir.size()>0) storeDir = dir;
 
     journal::jdir::create_dir(getBdbBaseDir());
-    journal::jdir::create_dir(getPxidBaseDir());
 
     try {
         env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
@@ -243,11 +247,11 @@
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
-        preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
+        tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
         txn.commit();
     } catch (const journal::jexception& e) {
         txn.abort();
-        THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
+        THROW_STORE_EXCEPTION_2("Error opening tplStore instance", e.what());
     } catch (const DbException& e) {
         txn.abort();
         THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -257,16 +261,25 @@
     }
 
     isInit = true;
-    QPID_LOG(info, "BdbMessageStore module initialized: store-dir=" << dir << "; Default files per journal: " << jfiles <<
-                   "; Default jrournal file size: " << jfileSizePgs << "(wpgs); Default write cache page size: " << wCachePageSizeKib << "(Kib)");
+    QPID_LOG(notice, "Store module initialized; dir=" << dir);
+    QPID_LOG(info,   "> Default files per journal: " << jfiles);
+    QPID_LOG(info,   "> Default jrournal file size: " << jfileSizePgs << " (wpgs)");
+    QPID_LOG(info,   "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
+    QPID_LOG(info,   "> Default number of write cache pages: " << wCacheNumPages);
+    QPID_LOG(info,   "> TPL files per journal: " << tplNumJrnlFiles);
+    QPID_LOG(info,   "> TPL jrournal file size: " << tplJfileSizePgs << " (wpgs)");
+    QPID_LOG(info,   "> TPL write cache page size: " << tplWCachePageSizeKib << " (Kib)");
+    QPID_LOG(info,   "> TPL number of write cache pages: " << tplWCacheNumPages);
     return true;
 }
 
-void BdbMessageStore::chkInitPreparedXidStore()
+void BdbMessageStore::chkTplStoreInit()
 {
     qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-    if (!preparedXidStorePtr->is_ready()) {
-        preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages);
+    if (!tplStorePtr->is_ready()) {
+        journal::jdir::create_dir(getTplBaseDir());
+        tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+        if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
     }
 }
 
@@ -286,7 +299,7 @@
         for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
         }
-        if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
+        if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
     } catch (const DbException& e) {
         QPID_LOG(error, "Error closing BDB databases: " <<  e.what());
     } catch (const journal::jexception& e) {
@@ -314,7 +327,7 @@
     txn->commit(0);
     try {
         journal::jdir::delete_dir(getJrnlBaseDir(),true);
-        journal::jdir::delete_dir(getPxidBaseDir(),true);
+        journal::jdir::delete_dir(getTplBaseDir(),true);
     }
     catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -475,7 +488,7 @@
 {
     checkInit();
     txn_list prepared;
-    recoverXids(prepared);
+    recoverLockedMappings(prepared);
 
     queue_index queues;//id->queue
     exchange_index exchanges;//id->exchange
@@ -506,14 +519,18 @@
         TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
         std::auto_ptr<TPCTransactionContext> txn(tpcc);
 
-        tpcc->prepare(preparedXidStorePtr.get());
+        tpcc->prepare(tplStorePtr.get());
+        if (mgmtObject != 0) {
+            mgmtObject->inc_tplTransactionDepth();
+            mgmtObject->inc_tplTxnPrepares();
+        } 
 
         // Restore data token state in TxnCtxt
-        PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
-        if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+        TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+        if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
         tpcc->recoverDtok(citr->second.rid, i->xid);
 
-        // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+        // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
         // was interrupted part way through committing/aborting the impacted queues. Complete this process.
         bool incomplTxnFlag = citr->second.deq_flag;
 
@@ -531,8 +548,16 @@
             }
         }
 
-        if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
+        if (incomplTxnFlag) {
+            tpcc->complete(citr->second.commit_flag);
+        } else {
+            if (mgmtObject != 0) {
+                mgmtObject->inc_tplTransactionDepth();
+                mgmtObject->inc_tplTxnPrepares();
+            } 
+        }
     }
+
     registry.recoveryComplete();
 }
 
@@ -582,8 +607,8 @@
         maxQueueId = max(key.id, maxQueueId);
     }
 
-    // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
-    // the messageIdSequence is used for both queue journals and the preparedXid journal.
+    // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
+    // the messageIdSequence is used for both queue journals and the tpl journal.
     messageIdSequence.reset(highestRid + 1);
 
     queueIdSequence.reset(maxQueueId + 1);
@@ -670,7 +695,6 @@
     generalIdSequence.reset(maxGeneralId + 1);
 }
 
-
 void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
                                       qpid::broker::RecoveryManager& recovery,
                                       qpid::broker::RecoverableQueue::shared_ptr& queue,
@@ -731,8 +755,8 @@
 
                 if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
                     std::string xid((char*)xidbuff, xidbuffSize);
-                    PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
-                    if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+                    TplRecoverMapCitr citr = tplRecoverMap.find(xid);
+                    if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
                     if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
                         if (citr->second.commit_flag)
                             queue->recover(msg);
@@ -794,8 +818,6 @@
     return recovery.recoverMessage(header.buffer);
 }
 
-
-
 int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
                                     IdDbt& msgId,
                                     RecoverableMessage::shared_ptr& msg,
@@ -827,26 +849,26 @@
     return count;
 }
 
-void BdbMessageStore::recoverPreparedXidJournal()
+void BdbMessageStore::recoverTplStore()
 {
-    if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+    if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
         u_int64_t thisHighestRid;
-        preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+        tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
         if (thisHighestRid > highestRid)
             highestRid = thisHighestRid;
-        preparedXidStorePtr->recover_complete(); // start journal.
+        tplStorePtr->recover_complete(); // start journal.
     }
 }
 
-void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
 {
-    if (preparedXidStorePtr.get()) {
-        if (!preparedXidStorePtr->is_ready())
-            recoverPreparedXidJournal();
+    if (tplStorePtr.get()) {
+        if (!tplStorePtr->is_ready())
+            recoverTplStore();
 
         // TODO: The journal will return a const txn_map and the txn_map will support
         // const operations at some point. Using non-const txn_map this way is ugly...
-        journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+        journal::txn_map& tmap = tplStorePtr->get_txn_map();
         std::vector<std::string> xidList;
         tmap.xid_list(xidList);
         for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
@@ -866,18 +888,18 @@
             }
             assert(enqCnt == 1);
             assert(deqCnt <= 1);
-            prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+            tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
         }
     }
 }
 
-void BdbMessageStore::recoverXids(txn_list& txns)
+void BdbMessageStore::recoverLockedMappings(txn_list& txns)
 {
-    if (!preparedXidStorePtr->is_ready())
-        getPreparedXidMap(preparedXidStoreRecoverMap);
+    if (!tplStorePtr->is_ready())
+        getTplRecoverMap(tplRecoverMap);
 
     // Abort unprepaired xids and populate the locked maps
-    for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+    for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
         LockedMappings::shared_ptr enq_ptr;
         enq_ptr.reset(new LockedMappings);
         LockedMappings::shared_ptr deq_ptr;
@@ -886,25 +908,11 @@
     }
 }
 
-void BdbMessageStore::readLockedMappings(Db& db,
-                                         txn_lock_map& mappings)
-{
-    Cursor c;
-    c.open(db, 0);
-
-    Dbt key;
-    IdPairDbt value;
-    while (c.next(key, value)) {
-        std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
-        LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
-    }
-}
-
 void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
 {
-    if (!preparedXidStorePtr->is_ready())
-        getPreparedXidMap(preparedXidStoreRecoverMap);
-    for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+    if (!tplStorePtr->is_ready())
+        getTplRecoverMap(tplRecoverMap);
+    for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
         xids.insert(i->first);
     }
 }
@@ -1251,16 +1259,24 @@
                                 bool commit)
 {
     try {
+        chkTplStoreInit(); // Late initialize (if needed)
+
         // Nothing to do if not prepared
-        chkInitPreparedXidStore();
         if (txn.getDtok()->is_enqueued()) {
             txn.incrDtokRef();
             DataTokenImpl* dtokp = txn.getDtok();
             dtokp->set_dequeue_rid(dtokp->rid());
             dtokp->set_rid(messageIdSequence.next());
-            preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
+            tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
         }
         txn.complete(commit);
+        if (mgmtObject != 0) {
+            mgmtObject->dec_tplTransactionDepth();
+            if (commit)
+                mgmtObject->inc_tplTxnCommits();
+            else
+                mgmtObject->inc_tplTxnAborts();
+        }
     } catch (const std::exception& e) {
         QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
         throw;
@@ -1293,15 +1309,20 @@
 void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
 {
     try {
-        chkInitPreparedXidStore();
+        chkTplStoreInit(); // Late initialize (if needed)
+
         ctxt->incrDtokRef();
         DataTokenImpl* dtokp = ctxt->getDtok();
         dtokp->set_external_rid(true);
         dtokp->set_rid(messageIdSequence.next());
-        preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
-        ctxt->prepare(preparedXidStorePtr.get());
+        tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+        ctxt->prepare(tplStorePtr.get());
         // make sure all the data is written to disk before returning
         ctxt->sync();
+        if (mgmtObject != 0) {
+            mgmtObject->inc_tplTransactionDepth();
+            mgmtObject->inc_tplTxnPrepares();
+        }
     } catch (const std::exception& e) {
         QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
         throw;
@@ -1450,10 +1471,10 @@
     return dir.str();
 }
 
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getTplBaseDir()
 {
     std::stringstream dir;
-    dir << storeDir << "/rhm/pxid/" ;
+    dir << storeDir << "/rhm/tpl/" ;
     return dir.str();
 }
 

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-07-18 13:30:50 UTC (rev 2207)
@@ -61,17 +61,17 @@
     typedef LockedMappings::map txn_lock_map;
     typedef boost::ptr_list<PreparedTransaction> txn_list;
 
-    // Structs for preparedXidStore recover state
-    struct PreparedRecoverStruct {
+    // Structs for Transaction Recover List (TPL) recover state
+    struct TplRecoverStruct {
         u_int64_t rid;
         bool deq_flag;
         bool commit_flag;
-        PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
     };
-    typedef PreparedRecoverStruct PreparedRecover;
-    typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
-    typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
-    typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+    typedef TplRecoverStruct TplRecover;
+    typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
+    typedef std::map<std::string, TplRecover> TplRecoverMap;
+    typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
 
     // Default store settings
     static const u_int16_t defNumJrnlFiles = 8;
@@ -91,9 +91,9 @@
     Db bindingDb;
     Db generalDb;
 
-    // Pointer to prepared XID journal instance
-    boost::shared_ptr<JournalImpl> preparedXidStorePtr;
-    PreparedRecoverMap preparedXidStoreRecoverMap;
+    // Pointer to Transaction Prepared List (TPL) journal instance
+    boost::shared_ptr<JournalImpl> tplStorePtr;
+    TplRecoverMap tplRecoverMap;
 
     IdSequence queueIdSequence;
     IdSequence exchangeIdSequence;
@@ -157,11 +157,9 @@
                        queue_index& index,
                        txn_list& locked,
                        message_index& prepared);
-    void recoverPreparedXidJournal();
-    void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
-    void recoverXids(txn_list& txns);
-    void readLockedMappings(Db& db,
-                            txn_lock_map& mappings);
+    void recoverTplStore();
+    void getTplRecoverMap(TplRecoverMap& tplMap);
+    void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
     void store(const qpid::broker::PersistableQueue* queue,
                TxnCtxt* txn,
@@ -218,12 +216,12 @@
     std::string getJrnlDir(const char* queueName);
     std::string getJrnlBaseDir();
     std::string getBdbBaseDir();
-    std::string getPxidBaseDir();
+    std::string getTplBaseDir();
     inline void checkInit() {
         // TODO: change the default dir to ~/.qpidd
         if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
     }
-    void chkInitPreparedXidStore();
+    void chkTplStoreInit();
 
     // debug aid for printing XIDs that may contain non-printable chars
     static std::string xid2str(const std::string xid) {

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -74,7 +74,6 @@
     journalTimerPtr->add(inactivityFireEventPtr);
 
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-
     if (agent != 0)
     {
         _mgmtObject = new qpid::management::Journal(agent, (qpid::management::Manageable*) this);
@@ -84,6 +83,13 @@
         _mgmtObject->set_baseFileName(journalBaseFilename);
         _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
         _mgmtObject->set_readPages(JRNL_RMGR_PAGES);
+        
+        // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
+        _mgmtObject->set_initialFileCount(0);
+        _mgmtObject->set_dataFileSize(0);
+        _mgmtObject->set_currentFileCount(0);
+        _mgmtObject->set_writePageSize(0);
+        _mgmtObject->set_writePages(0);
 
         agent->addObject(_mgmtObject);
     }
@@ -163,6 +169,15 @@
     oss1 << " wcache_num_pages=" << wcache_num_pages;
     log(LOG_DEBUG, oss1.str());
 
+    if (_mgmtObject != 0)
+    {
+        _mgmtObject->set_initialFileCount(_num_jfiles);
+        _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+        _mgmtObject->set_currentFileCount(_num_jfiles);
+        _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+        _mgmtObject->set_writePages(wcache_num_pages);
+    }
+
     if (prep_tx_list_ptr) {
         // Create list of prepared xids
         std::vector<std::string> prep_xid_list;
@@ -206,12 +221,10 @@
 
     if (_mgmtObject != 0)
     {
-        _mgmtObject->set_initialFileCount(_num_jfiles);
-        _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
-        _mgmtObject->set_currentFileCount(_num_jfiles);
         _mgmtObject->inc_recordDepth(_emap.size());
-        _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
-        _mgmtObject->set_writePages(wcache_num_pages);
+        _mgmtObject->inc_enqueues(_emap.size());
+        _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt());
+        _mgmtObject->inc_txnDequeues(_tmap.deq_cnt());
     }
 }
 
@@ -301,7 +314,7 @@
     
     if (_mgmtObject != 0)
     {
-        _mgmtObject->inc_recordEnqueues();
+        _mgmtObject->inc_enqueues();
         _mgmtObject->inc_recordDepth();
     }
 }
@@ -314,7 +327,7 @@
     
     if (_mgmtObject != 0)
     {
-        _mgmtObject->inc_recordEnqueues();
+        _mgmtObject->inc_enqueues();
         _mgmtObject->inc_recordDepth();
     }
 }
@@ -327,7 +340,8 @@
     
     if (_mgmtObject != 0)
     {
-        _mgmtObject->inc_recordEnqueues();
+        _mgmtObject->inc_enqueues();
+        _mgmtObject->inc_txnEnqueues();
         _mgmtObject->inc_recordDepth();
     }
 }
@@ -340,7 +354,8 @@
     
     if (_mgmtObject != 0)
     {
-        _mgmtObject->inc_recordEnqueues();
+        _mgmtObject->inc_enqueues();
+        _mgmtObject->inc_txnEnqueues();
         _mgmtObject->inc_recordDepth();
     }
 }
@@ -352,7 +367,8 @@
     
     if (_mgmtObject != 0)
     {
-        _mgmtObject->inc_recordDequeues();
+        _mgmtObject->inc_dequeues();
+        _mgmtObject->inc_txnDequeues();
         _mgmtObject->dec_recordDepth();
     }
 }
@@ -364,7 +380,8 @@
     
     if (_mgmtObject != 0)
     {
-        _mgmtObject->inc_recordDequeues();
+        _mgmtObject->inc_dequeues();
+        _mgmtObject->inc_txnDequeues();
         _mgmtObject->dec_recordDepth();
     }
 }
@@ -373,12 +390,18 @@
 JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
 {
     handleIoResult(jcntl::txn_abort(dtokp, xid));
+    
+    if (_mgmtObject != 0)
+        _mgmtObject->inc_txnAborts();
 }
 
 void
 JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
 {
     handleIoResult(jcntl::txn_commit(dtokp, xid));
+    
+    if (_mgmtObject != 0)
+        _mgmtObject->inc_txnCommits();
 }
 
 void

Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.cpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.cpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -26,7 +26,7 @@
 #include "qpid/management/Manageable.h" 
 #include "qpid/agent/ManagementAgent.h"
 #include "Journal.h"
-#include "qpid/management/ArgsJournalExpand.h"
+#include "ArgsJournalExpand.h"
 
 
 using namespace qpid::management;
@@ -37,15 +37,12 @@
 string  Journal::packageName  = string ("mrgstore");
 string  Journal::className    = string ("journal");
 uint8_t Journal::md5Sum[16]   =
-    {0x0,0xdf,0x9a,0xf7,0x4,0x98,0x29,0x54,0xde,0x42,0xc5,0xf5,0xf5,0x13,0xab,0xa5};
+    {0x1e,0x63,0xa4,0x3d,0xa3,0x1b,0xc0,0x1,0x1,0x70,0x5a,0x2a,0xb4,0xa,0x1b,0x4e};
 
 Journal::Journal (ManagementAgent* _agent, Manageable* _core) :
     ManagementObject(_agent, _core)
 {
     
-    initialFileCount = 0;
-    dataFileSize = 0;
-    currentFileCount = 0;
     recordDepth = 0;
     recordDepthHigh = 0;
     recordDepthLow  = 0;
@@ -107,24 +104,24 @@
     buf.putShortString (packageName); // Package Name
     buf.putShortString (className);   // Class Name
     buf.putBin128      (md5Sum);      // Schema Hash
-    buf.putShort       (8); // Config Element Count
-    buf.putShort       (27); // Inst Element Count
+    buf.putShort       (11); // Config Element Count
+    buf.putShort       (28); // Inst Element Count
     buf.putShort       (1); // Method Count
     buf.putShort       (0); // Event Count
 
     // Properties
     ft = FieldTable ();
-    ft.setString (NAME,   "name");
-    ft.setInt    (TYPE,   TYPE_SSTR);
+    ft.setString (NAME,   "queueRef");
+    ft.setInt    (TYPE,   TYPE_REF);
     ft.setInt    (ACCESS, ACCESS_RO);
-    ft.setInt    (INDEX,  1);
+    ft.setInt    (INDEX,  0);
     buf.put (ft);
 
     ft = FieldTable ();
-    ft.setString (NAME,   "queueRef");
-    ft.setInt    (TYPE,   TYPE_REF);
+    ft.setString (NAME,   "name");
+    ft.setInt    (TYPE,   TYPE_SSTR);
     ft.setInt    (ACCESS, ACCESS_RO);
-    ft.setInt    (INDEX,  0);
+    ft.setInt    (INDEX,  1);
     buf.put (ft);
 
     ft = FieldTable ();
@@ -179,11 +176,11 @@
     ft.setString (DESC,   "Number of pages in read-page-cache");
     buf.put (ft);
 
-
-    // Statistics
     ft = FieldTable ();
     ft.setString (NAME,   "initialFileCount");
     ft.setInt    (TYPE,   TYPE_U16);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
     ft.setString (UNIT,   "file");
     ft.setString (DESC,   "Number of files initially allocated to this journal");
     buf.put (ft);
@@ -191,6 +188,8 @@
     ft = FieldTable ();
     ft.setString (NAME,   "dataFileSize");
     ft.setInt    (TYPE,   TYPE_U32);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
     ft.setString (UNIT,   "byte");
     ft.setString (DESC,   "Size of each journal data file");
     buf.put (ft);
@@ -198,64 +197,96 @@
     ft = FieldTable ();
     ft.setString (NAME,   "currentFileCount");
     ft.setInt    (TYPE,   TYPE_U32);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
     ft.setString (UNIT,   "file");
     ft.setString (DESC,   "Number of files currently allocated to this journal");
     buf.put (ft);
 
+
+    // Statistics
     ft = FieldTable ();
     ft.setString (NAME,   "recordDepth");
     ft.setInt    (TYPE,   TYPE_U32);
     ft.setString (UNIT,   "record");
-    ft.setString (DESC,   "Number of enqueued records (durable messages)");
+    ft.setString (DESC,   "Number of currently enqueued records (durable messages)");
     buf.put (ft);
 
     ft = FieldTable ();
     ft.setString (NAME,   "recordDepthHigh");
     ft.setInt    (TYPE,   TYPE_U32);
     ft.setString (UNIT,   "record");
-    ft.setString (DESC,   "Number of enqueued records (durable messages) (High)");
+    ft.setString (DESC,   "Number of currently enqueued records (durable messages) (High)");
     buf.put (ft);
 
     ft = FieldTable ();
     ft.setString (NAME,   "recordDepthLow");
     ft.setInt    (TYPE,   TYPE_U32);
     ft.setString (UNIT,   "record");
-    ft.setString (DESC,   "Number of enqueued records (durable messages) (Low)");
+    ft.setString (DESC,   "Number of currently enqueued records (durable messages) (Low)");
     buf.put (ft);
 
     ft = FieldTable ();
-    ft.setString (NAME,   "recordEnqueues");
+    ft.setString (NAME,   "enqueues");
     ft.setInt    (TYPE,   TYPE_U64);
     ft.setString (UNIT,   "record");
     ft.setString (DESC,   "Total enqueued records on journal");
     buf.put (ft);
 
     ft = FieldTable ();
-    ft.setString (NAME,   "recordDequeues");
+    ft.setString (NAME,   "dequeues");
     ft.setInt    (TYPE,   TYPE_U64);
     ft.setString (UNIT,   "record");
     ft.setString (DESC,   "Total dequeued records on journal");
     buf.put (ft);
 
     ft = FieldTable ();
+    ft.setString (NAME,   "txnEnqueues");
+    ft.setInt    (TYPE,   TYPE_U64);
+    ft.setString (UNIT,   "record");
+    ft.setString (DESC,   "Total transactional enqueued records on journal");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "txnDequeues");
+    ft.setInt    (TYPE,   TYPE_U64);
+    ft.setString (UNIT,   "record");
+    ft.setString (DESC,   "Total transactional dequeued records on journal");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "txnCommits");
+    ft.setInt    (TYPE,   TYPE_U64);
+    ft.setString (UNIT,   "record");
+    ft.setString (DESC,   "Total transactional commit records on journal");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "txnAborts");
+    ft.setInt    (TYPE,   TYPE_U64);
+    ft.setString (UNIT,   "record");
+    ft.setString (DESC,   "Total transactional abort records on journal");
+    buf.put (ft);
+
+    ft = FieldTable ();
     ft.setString (NAME,   "outstandingAIOs");
     ft.setInt    (TYPE,   TYPE_U32);
     ft.setString (UNIT,   "aio_op");
-    ft.setString (DESC,   "Number of outstanding AIO requests in Async IO system");
+    ft.setString (DESC,   "Number of currently outstanding AIO requests in Async IO system");
     buf.put (ft);
 
     ft = FieldTable ();
     ft.setString (NAME,   "outstandingAIOsHigh");
     ft.setInt    (TYPE,   TYPE_U32);
     ft.setString (UNIT,   "aio_op");
-    ft.setString (DESC,   "Number of outstanding AIO requests in Async IO system (High)");
+    ft.setString (DESC,   "Number of currently outstanding AIO requests in Async IO system (High)");
     buf.put (ft);
 
     ft = FieldTable ();
     ft.setString (NAME,   "outstandingAIOsLow");
     ft.setInt    (TYPE,   TYPE_U32);
     ft.setString (UNIT,   "aio_op");
-    ft.setString (DESC,   "Number of outstanding AIO requests in Async IO system (Low)");
+    ft.setString (DESC,   "Number of currently outstanding AIO requests in Async IO system (Low)");
     buf.put (ft);
 
     ft = FieldTable ();
@@ -393,8 +424,12 @@
 
 void Journal::aggregatePerThreadStats(struct PerThreadStats* totals)
 {
-    totals->recordEnqueues = 0;
-    totals->recordDequeues = 0;
+    totals->enqueues = 0;
+    totals->dequeues = 0;
+    totals->txnEnqueues = 0;
+    totals->txnDequeues = 0;
+    totals->txnCommits = 0;
+    totals->txnAborts = 0;
     totals->writeWaitFailures = 0;
     totals->writeBusyFailures = 0;
     totals->readRecordCount = 0;
@@ -403,8 +438,12 @@
     for (int idx = 0; idx < maxThreads; idx++) {
         struct PerThreadStats* threadStats = perThreadStatsArray[idx];
         if (threadStats != 0) {
-            totals->recordEnqueues += threadStats->recordEnqueues;
-            totals->recordDequeues += threadStats->recordDequeues;
+            totals->enqueues += threadStats->enqueues;
+            totals->dequeues += threadStats->dequeues;
+            totals->txnEnqueues += threadStats->txnEnqueues;
+            totals->txnDequeues += threadStats->txnDequeues;
+            totals->txnCommits += threadStats->txnCommits;
+            totals->txnAborts += threadStats->txnAborts;
             totals->writeWaitFailures += threadStats->writeWaitFailures;
             totals->writeBusyFailures += threadStats->writeBusyFailures;
             totals->readRecordCount += threadStats->readRecordCount;
@@ -421,14 +460,17 @@
     configChanged = false;
 
     writeTimestamps (buf);
+    buf.putLongLong (queueRef);
     buf.putShortString (name);
-    buf.putLongLong (queueRef);
     buf.putShortString (directory);
     buf.putShortString (baseFileName);
     buf.putLong (writePageSize);
     buf.putLong (writePages);
     buf.putLong (readPageSize);
     buf.putLong (readPages);
+    buf.putShort (initialFileCount);
+    buf.putLong (dataFileSize);
+    buf.putLong (currentFileCount);
 
 }
 
@@ -444,14 +486,15 @@
 
     if (!skipHeaders)
         writeTimestamps (buf);
-    buf.putShort (initialFileCount);
-    buf.putLong (dataFileSize);
-    buf.putLong (currentFileCount);
     buf.putLong (recordDepth);
     buf.putLong (recordDepthHigh);
     buf.putLong (recordDepthLow);
-    buf.putLongLong (totals.recordEnqueues);
-    buf.putLongLong (totals.recordDequeues);
+    buf.putLongLong (totals.enqueues);
+    buf.putLongLong (totals.dequeues);
+    buf.putLongLong (totals.txnEnqueues);
+    buf.putLongLong (totals.txnDequeues);
+    buf.putLongLong (totals.txnCommits);
+    buf.putLongLong (totals.txnAborts);
     buf.putLong (outstandingAIOs);
     buf.putLong (outstandingAIOsHigh);
     buf.putLong (outstandingAIOsLow);

Modified: store/trunk/cpp/lib/gen/qpid/management/Journal.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Journal.h	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Journal.h	2008-07-18 13:30:50 UTC (rev 2207)
@@ -40,19 +40,19 @@
     static uint8_t     md5Sum[16];
 
     // Properties
+    uint64_t queueRef;
     std::string name;
-    uint64_t queueRef;
     std::string directory;
     std::string baseFileName;
     uint32_t writePageSize;
     uint32_t writePages;
     uint32_t readPageSize;
     uint32_t readPages;
+    uint16_t initialFileCount;
+    uint32_t dataFileSize;
+    uint32_t currentFileCount;
 
     // Statistics
-    uint16_t  initialFileCount;
-    uint32_t  dataFileSize;
-    uint32_t  currentFileCount;
     uint32_t  recordDepth;
     uint32_t  recordDepthHigh;
     uint32_t  recordDepthLow;
@@ -75,8 +75,12 @@
 
     // Per-Thread Statistics
     struct PerThreadStats {
-        uint64_t  recordEnqueues;
-        uint64_t  recordDequeues;
+        uint64_t  enqueues;
+        uint64_t  dequeues;
+        uint64_t  txnEnqueues;
+        uint64_t  txnDequeues;
+        uint64_t  txnCommits;
+        uint64_t  txnAborts;
         uint64_t  writeWaitFailures;
         uint64_t  writeBusyFailures;
         uint64_t  readRecordCount;
@@ -92,8 +96,12 @@
         if (threadStats == 0) {
             threadStats = new(PerThreadStats);
             perThreadStatsArray[index] = threadStats;
-            threadStats->recordEnqueues = 0;
-            threadStats->recordDequeues = 0;
+            threadStats->enqueues = 0;
+            threadStats->dequeues = 0;
+            threadStats->txnEnqueues = 0;
+            threadStats->txnDequeues = 0;
+            threadStats->txnCommits = 0;
+            threadStats->txnAborts = 0;
             threadStats->writeWaitFailures = 0;
             threadStats->writeBusyFailures = 0;
             threadStats->readRecordCount = 0;
@@ -134,14 +142,14 @@
     static const uint32_t METHOD_EXPAND = 1;
 
     // Accessor Methods
-    inline void set_name (std::string val){
+    inline void set_queueRef (uint64_t val){
         sys::Mutex::ScopedLock mutex(accessLock);
-        name = val;
+        queueRef = val;
         configChanged = true;
     }
-    inline void set_queueRef (uint64_t val){
+    inline void set_name (std::string val){
         sys::Mutex::ScopedLock mutex(accessLock);
-        queueRef = val;
+        name = val;
         configChanged = true;
     }
     inline void set_directory (std::string val){
@@ -177,17 +185,17 @@
     inline void set_initialFileCount (uint16_t val){
         sys::Mutex::ScopedLock mutex(accessLock);
         initialFileCount = val;
-        instChanged = true;
+        configChanged = true;
     }
     inline void set_dataFileSize (uint32_t val){
         sys::Mutex::ScopedLock mutex(accessLock);
         dataFileSize = val;
-        instChanged = true;
+        configChanged = true;
     }
     inline void set_currentFileCount (uint32_t val){
         sys::Mutex::ScopedLock mutex(accessLock);
         currentFileCount = val;
-        instChanged = true;
+        configChanged = true;
     }
     inline void inc_recordDepth (uint32_t by = 1){
         sys::Mutex::ScopedLock mutex(accessLock);
@@ -203,22 +211,54 @@
             recordDepthLow = recordDepth;
         instChanged = true;
     }
-    inline void inc_recordEnqueues (uint64_t by = 1){
-        getThreadStats()->recordEnqueues += by;
+    inline void inc_enqueues (uint64_t by = 1){
+        getThreadStats()->enqueues += by;
         instChanged = true;
     }
-    inline void dec_recordEnqueues (uint64_t by = 1){
-        getThreadStats()->recordEnqueues -= by;
+    inline void dec_enqueues (uint64_t by = 1){
+        getThreadStats()->enqueues -= by;
         instChanged = true;
     }
-    inline void inc_recordDequeues (uint64_t by = 1){
-        getThreadStats()->recordDequeues += by;
+    inline void inc_dequeues (uint64_t by = 1){
+        getThreadStats()->dequeues += by;
         instChanged = true;
     }
-    inline void dec_recordDequeues (uint64_t by = 1){
-        getThreadStats()->recordDequeues -= by;
+    inline void dec_dequeues (uint64_t by = 1){
+        getThreadStats()->dequeues -= by;
         instChanged = true;
     }
+    inline void inc_txnEnqueues (uint64_t by = 1){
+        getThreadStats()->txnEnqueues += by;
+        instChanged = true;
+    }
+    inline void dec_txnEnqueues (uint64_t by = 1){
+        getThreadStats()->txnEnqueues -= by;
+        instChanged = true;
+    }
+    inline void inc_txnDequeues (uint64_t by = 1){
+        getThreadStats()->txnDequeues += by;
+        instChanged = true;
+    }
+    inline void dec_txnDequeues (uint64_t by = 1){
+        getThreadStats()->txnDequeues -= by;
+        instChanged = true;
+    }
+    inline void inc_txnCommits (uint64_t by = 1){
+        getThreadStats()->txnCommits += by;
+        instChanged = true;
+    }
+    inline void dec_txnCommits (uint64_t by = 1){
+        getThreadStats()->txnCommits -= by;
+        instChanged = true;
+    }
+    inline void inc_txnAborts (uint64_t by = 1){
+        getThreadStats()->txnAborts += by;
+        instChanged = true;
+    }
+    inline void dec_txnAborts (uint64_t by = 1){
+        getThreadStats()->txnAborts -= by;
+        instChanged = true;
+    }
     inline void inc_outstandingAIOs (uint32_t by = 1){
         sys::Mutex::ScopedLock mutex(accessLock);
         outstandingAIOs += by;

Modified: store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/PackageMrgstore.cpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -21,9 +21,9 @@
 // This source file was created by a code generator.
 // Please do not edit.
 
-#include "qpid/management/PackageMrgstore.h"
-#include "qpid/management/Store.h"
-#include "qpid/management/Journal.h"
+#include "PackageMrgstore.h"
+#include "Store.h"
+#include "Journal.h"
 
 
 using namespace qpid::management;

Modified: store/trunk/cpp/lib/gen/qpid/management/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.cpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.cpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -36,19 +36,35 @@
 string  Store::packageName  = string ("mrgstore");
 string  Store::className    = string ("store");
 uint8_t Store::md5Sum[16]   =
-    {0x63,0xc5,0x1a,0x81,0x18,0x8a,0x8d,0x9b,0x3e,0x96,0xf7,0x6d,0x3b,0xd0,0x51,0x14};
+    {0x18,0xd,0xd4,0x15,0xd3,0x9a,0xf,0xbe,0x3a,0x40,0xe1,0x1b,0x9e,0x5b,0x7e,0x86};
 
 Store::Store (ManagementAgent* _agent, Manageable* _core, Manageable* _parent) :
     ManagementObject(_agent, _core)
 {
     brokerRef = _parent->GetManagementObject ()->getObjectId ();
+    tplTransactionDepth = 0;
+    tplTransactionDepthHigh = 0;
+    tplTransactionDepthLow  = 0;
+    tplOutstandingAIOs = 0;
+    tplOutstandingAIOsHigh = 0;
+    tplOutstandingAIOsLow  = 0;
 
 
+    maxThreads = agent->getMaxThreads();
+    perThreadStatsArray = new struct PerThreadStats*[maxThreads];
+    for (int idx = 0; idx < maxThreads; idx++)
+        perThreadStatsArray[idx] = 0;
+
 }
 
 Store::~Store ()
 {
 
+    for (int idx = 0; idx < maxThreads; idx++)
+        if (perThreadStatsArray[idx] != 0)
+            delete perThreadStatsArray[idx];
+    delete[] perThreadStatsArray;
+
 }
 
 namespace {
@@ -75,8 +91,8 @@
     buf.putShortString (packageName); // Package Name
     buf.putShortString (className);   // Class Name
     buf.putBin128      (md5Sum);      // Schema Hash
-    buf.putShort       (4); // Config Element Count
-    buf.putShort       (0); // Inst Element Count
+    buf.putShort       (11); // Config Element Count
+    buf.putShort       (9); // Inst Element Count
     buf.putShort       (0); // Method Count
     buf.putShort       (0); // Event Count
 
@@ -114,9 +130,133 @@
     ft.setString (DESC,   "Default size of each journal data file");
     buf.put (ft);
 
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplIsInitialized");
+    ft.setInt    (TYPE,   TYPE_BOOL);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
+    ft.setString (DESC,   "Transaction prepared list has been initialized by a transactional prepare");
+    buf.put (ft);
 
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplDirectory");
+    ft.setInt    (TYPE,   TYPE_SSTR);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
+    ft.setString (DESC,   "Transaction prepared list directory");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplWritePageSize");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
+    ft.setString (UNIT,   "byte");
+    ft.setString (DESC,   "Page size in transaction prepared list write-page-cache");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplWritePages");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
+    ft.setString (UNIT,   "wpage");
+    ft.setString (DESC,   "Number of pages in transaction prepared list write-page-cache");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplInitialFileCount");
+    ft.setInt    (TYPE,   TYPE_U16);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
+    ft.setString (UNIT,   "file");
+    ft.setString (DESC,   "Number of files initially allocated to transaction prepared list journal");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplDataFileSize");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
+    ft.setString (UNIT,   "byte");
+    ft.setString (DESC,   "Size of each journal data file in transaction prepared list journal");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplCurrentFileCount");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setInt    (ACCESS, ACCESS_RO);
+    ft.setInt    (INDEX,  0);
+    ft.setString (UNIT,   "file");
+    ft.setString (DESC,   "Number of files currently allocated to transaction prepared list journal");
+    buf.put (ft);
+
+
     // Statistics
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplTransactionDepth");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setString (UNIT,   "txn");
+    ft.setString (DESC,   "Number of currently enqueued prepared transactions");
+    buf.put (ft);
 
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplTransactionDepthHigh");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setString (UNIT,   "txn");
+    ft.setString (DESC,   "Number of currently enqueued prepared transactions (High)");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplTransactionDepthLow");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setString (UNIT,   "txn");
+    ft.setString (DESC,   "Number of currently enqueued prepared transactions (Low)");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplTxnPrepares");
+    ft.setInt    (TYPE,   TYPE_U64);
+    ft.setString (UNIT,   "record");
+    ft.setString (DESC,   "Total transaction prepares on transaction prepared list");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplTxnCommits");
+    ft.setInt    (TYPE,   TYPE_U64);
+    ft.setString (UNIT,   "record");
+    ft.setString (DESC,   "Total transaction commits on transaction prepared list");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplTxnAborts");
+    ft.setInt    (TYPE,   TYPE_U64);
+    ft.setString (UNIT,   "record");
+    ft.setString (DESC,   "Total transaction aborts on transaction prepared list");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplOutstandingAIOs");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setString (UNIT,   "aio_op");
+    ft.setString (DESC,   "Number of currently outstanding AIO requests in Async IO system");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplOutstandingAIOsHigh");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setString (UNIT,   "aio_op");
+    ft.setString (DESC,   "Number of currently outstanding AIO requests in Async IO system (High)");
+    buf.put (ft);
+
+    ft = FieldTable ();
+    ft.setString (NAME,   "tplOutstandingAIOsLow");
+    ft.setInt    (TYPE,   TYPE_U32);
+    ft.setString (UNIT,   "aio_op");
+    ft.setString (DESC,   "Number of currently outstanding AIO requests in Async IO system (Low)");
+    buf.put (ft);
+
+
     // Methods
 
     // Events
@@ -124,7 +264,24 @@
 }
 
 
+void Store::aggregatePerThreadStats(struct PerThreadStats* totals)
+{
+    totals->tplTxnPrepares = 0;
+    totals->tplTxnCommits = 0;
+    totals->tplTxnAborts = 0;
 
+    for (int idx = 0; idx < maxThreads; idx++) {
+        struct PerThreadStats* threadStats = perThreadStatsArray[idx];
+        if (threadStats != 0) {
+            totals->tplTxnPrepares += threadStats->tplTxnPrepares;
+            totals->tplTxnCommits += threadStats->tplTxnCommits;
+            totals->tplTxnAborts += threadStats->tplTxnAborts;
+
+        }
+    }
+}
+
+
 void Store::writeProperties (Buffer& buf)
 {
     sys::Mutex::ScopedLock mutex(accessLock);
@@ -135,6 +292,13 @@
     buf.putShortString (location);
     buf.putShort (defaultInitialFileCount);
     buf.putLong (defaultDataFileSize);
+    buf.putOctet (tplIsInitialized?1:0);
+    buf.putShortString (tplDirectory);
+    buf.putLong (tplWritePageSize);
+    buf.putLong (tplWritePages);
+    buf.putShort (tplInitialFileCount);
+    buf.putLong (tplDataFileSize);
+    buf.putLong (tplCurrentFileCount);
 
 }
 
@@ -144,12 +308,28 @@
     instChanged = false;
 
 
+    struct PerThreadStats totals;
+    aggregatePerThreadStats(&totals);
 
+
     if (!skipHeaders)
         writeTimestamps (buf);
+    buf.putLong (tplTransactionDepth);
+    buf.putLong (tplTransactionDepthHigh);
+    buf.putLong (tplTransactionDepthLow);
+    buf.putLongLong (totals.tplTxnPrepares);
+    buf.putLongLong (totals.tplTxnCommits);
+    buf.putLongLong (totals.tplTxnAborts);
+    buf.putLong (tplOutstandingAIOs);
+    buf.putLong (tplOutstandingAIOsHigh);
+    buf.putLong (tplOutstandingAIOsLow);
 
 
     // Maintenance of hi-lo statistics
+    tplTransactionDepthHigh = tplTransactionDepth;
+    tplTransactionDepthLow  = tplTransactionDepth;
+    tplOutstandingAIOsHigh = tplOutstandingAIOs;
+    tplOutstandingAIOsLow  = tplOutstandingAIOs;
 
 
 }

Modified: store/trunk/cpp/lib/gen/qpid/management/Store.h
===================================================================
--- store/trunk/cpp/lib/gen/qpid/management/Store.h	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/gen/qpid/management/Store.h	2008-07-18 13:30:50 UTC (rev 2207)
@@ -44,10 +44,49 @@
     std::string location;
     uint16_t defaultInitialFileCount;
     uint32_t defaultDataFileSize;
+    uint8_t tplIsInitialized;
+    std::string tplDirectory;
+    uint32_t tplWritePageSize;
+    uint32_t tplWritePages;
+    uint16_t tplInitialFileCount;
+    uint32_t tplDataFileSize;
+    uint32_t tplCurrentFileCount;
 
     // Statistics
+    uint32_t  tplTransactionDepth;
+    uint32_t  tplTransactionDepthHigh;
+    uint32_t  tplTransactionDepthLow;
+    uint32_t  tplOutstandingAIOs;
+    uint32_t  tplOutstandingAIOsHigh;
+    uint32_t  tplOutstandingAIOsLow;
 
 
+    // Per-Thread Statistics
+    struct PerThreadStats {
+        uint64_t  tplTxnPrepares;
+        uint64_t  tplTxnCommits;
+        uint64_t  tplTxnAborts;
+
+    };
+
+    struct PerThreadStats** perThreadStatsArray;
+
+    inline struct PerThreadStats* getThreadStats() {
+        int index = getThreadIndex();
+        struct PerThreadStats* threadStats = perThreadStatsArray[index];
+        if (threadStats == 0) {
+            threadStats = new(PerThreadStats);
+            perThreadStatsArray[index] = threadStats;
+            threadStats->tplTxnPrepares = 0;
+            threadStats->tplTxnCommits = 0;
+            threadStats->tplTxnAborts = 0;
+
+        }
+        return threadStats;
+    }
+
+    void aggregatePerThreadStats(struct PerThreadStats*);
+
     // Private Methods
     static void writeSchema (qpid::framing::Buffer& buf);
     void writeProperties    (qpid::framing::Buffer& buf);
@@ -58,9 +97,6 @@
                              qpid::framing::Buffer& outBuf);
     writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
 
-    // Stub for getInstChanged.  There are no statistics in this class.
-    bool getInstChanged (void) { return false; }
-
   public:
 
     friend class PackageMrgstore;
@@ -98,6 +134,93 @@
         defaultDataFileSize = val;
         configChanged = true;
     }
+    inline void set_tplIsInitialized (uint8_t val){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplIsInitialized = val;
+        configChanged = true;
+    }
+    inline void set_tplDirectory (std::string val){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplDirectory = val;
+        configChanged = true;
+    }
+    inline void set_tplWritePageSize (uint32_t val){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplWritePageSize = val;
+        configChanged = true;
+    }
+    inline void set_tplWritePages (uint32_t val){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplWritePages = val;
+        configChanged = true;
+    }
+    inline void set_tplInitialFileCount (uint16_t val){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplInitialFileCount = val;
+        configChanged = true;
+    }
+    inline void set_tplDataFileSize (uint32_t val){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplDataFileSize = val;
+        configChanged = true;
+    }
+    inline void set_tplCurrentFileCount (uint32_t val){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplCurrentFileCount = val;
+        configChanged = true;
+    }
+    inline void inc_tplTransactionDepth (uint32_t by = 1){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplTransactionDepth += by;
+        if (tplTransactionDepthHigh < tplTransactionDepth)
+            tplTransactionDepthHigh = tplTransactionDepth;
+        instChanged = true;
+    }
+    inline void dec_tplTransactionDepth (uint32_t by = 1){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplTransactionDepth -= by;
+        if (tplTransactionDepthLow > tplTransactionDepth)
+            tplTransactionDepthLow = tplTransactionDepth;
+        instChanged = true;
+    }
+    inline void inc_tplTxnPrepares (uint64_t by = 1){
+        getThreadStats()->tplTxnPrepares += by;
+        instChanged = true;
+    }
+    inline void dec_tplTxnPrepares (uint64_t by = 1){
+        getThreadStats()->tplTxnPrepares -= by;
+        instChanged = true;
+    }
+    inline void inc_tplTxnCommits (uint64_t by = 1){
+        getThreadStats()->tplTxnCommits += by;
+        instChanged = true;
+    }
+    inline void dec_tplTxnCommits (uint64_t by = 1){
+        getThreadStats()->tplTxnCommits -= by;
+        instChanged = true;
+    }
+    inline void inc_tplTxnAborts (uint64_t by = 1){
+        getThreadStats()->tplTxnAborts += by;
+        instChanged = true;
+    }
+    inline void dec_tplTxnAborts (uint64_t by = 1){
+        getThreadStats()->tplTxnAborts -= by;
+        instChanged = true;
+    }
+    inline void inc_tplOutstandingAIOs (uint32_t by = 1){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplOutstandingAIOs += by;
+        if (tplOutstandingAIOsHigh < tplOutstandingAIOs)
+            tplOutstandingAIOsHigh = tplOutstandingAIOs;
+        instChanged = true;
+    }
+    inline void dec_tplOutstandingAIOs (uint32_t by = 1){
+        sys::Mutex::ScopedLock mutex(accessLock);
+        tplOutstandingAIOs -= by;
+        if (tplOutstandingAIOsLow > tplOutstandingAIOs)
+            tplOutstandingAIOsLow = tplOutstandingAIOs;
+        instChanged = true;
+    }
 
 };
 

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -146,6 +146,38 @@
     return itr->second.size();
 }
 
+u_int32_t
+txn_map::cnt(const bool enq_flag)
+{
+    slock s(&_mutex);
+    u_int32_t c = 0;
+    for (xmap_itr i = _map.begin(); i != _map.end(); i++)
+    {
+        for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+        {
+            if (j->_enq_flag == enq_flag)
+                c++;
+        }
+    }
+    return c;
+}
+
+u_int32_t
+txn_map::cnt(const std::string& xid, const bool enq_flag)
+{
+    slock s(&_mutex);
+    u_int32_t c = 0;
+    xmap_itr i = _map.find(xid);
+    if (i == _map.end()) // not found in map
+        return 0;
+    for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+    {
+        if (j->_enq_flag == enq_flag)
+            c++;
+    }
+    return c;
+}
+
 bool
 txn_map::is_txn_synced(const std::string& xid)
 {

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -132,6 +132,10 @@
         const txn_data_list get_remove_tdata_list(const std::string& xid);
         bool in_map(const std::string& xid);
         u_int32_t get_rid_count(const std::string& xid);
+        inline u_int32_t enq_cnt() { return cnt(true); }
+        inline u_int32_t enq_cnt(const std::string& xid) { return cnt(xid, true); }
+        inline u_int32_t deq_cnt() { return cnt(true); }
+        inline u_int32_t deq_cnt(const std::string& xid) { return cnt(xid, false); }
         bool is_txn_synced(const std::string& xid);
         bool set_aio_compl(const std::string& xid, const u_int64_t rid);
         const txn_data& get_data(const std::string& xid, const u_int64_t rid);
@@ -140,6 +144,8 @@
         inline u_int32_t size() const { return u_int32_t(_map.size()); }
         void xid_list(std::vector<std::string>& xv);
     private:
+        u_int32_t cnt(const bool enq_flag);
+        u_int32_t cnt(const std::string& xid, const bool enq_flag);
         static std::string xid_format(const std::string& xid);
     };
 

Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-18 13:30:50 UTC (rev 2207)
@@ -189,7 +189,7 @@
             return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
         }
         u_int32_t getRemainingPreparedListTxns() {
-            return preparedXidStorePtr->get_open_txn_cnt();
+            return tplStorePtr->get_open_txn_cnt();
         }
     };
 

Modified: store/trunk/specs/management-schema.xml
===================================================================
--- store/trunk/specs/management-schema.xml	2008-07-18 13:27:59 UTC (rev 2206)
+++ store/trunk/specs/management-schema.xml	2008-07-18 13:30:50 UTC (rev 2207)
@@ -4,30 +4,47 @@
     License Text
 -->
   <class name="Store">
-    <property name="brokerRef"               type="objId"  references="qpid.Broker" access="RO" index="y" parentRef="y"/>
+    <property name="brokerRef"               type="objId"  access="RO" references="qpid.Broker" index="y" parentRef="y"/>
     <property name="location"                type="sstr"   access="RO"              desc="Logical directory on disk"/>
     <property name="defaultInitialFileCount" type="uint16" access="RO" unit="file"  desc="Default number of files initially allocated to each journal"/>
     <property name="defaultDataFileSize"     type="uint32" access="RO" unit="RdPg"  desc="Default size of each journal data file"/>
+    <property name="tplIsInitialized"        type="bool"   access="RO"              desc="Transaction prepared list has been initialized by a transactional prepare"/>
+    <property name="tplDirectory"            type="sstr"   access="RO"              desc="Transaction prepared list directory"/>
+    <property name="tplWritePageSize"        type="uint32" access="RO" unit="byte"  desc="Page size in transaction prepared list write-page-cache"/>
+    <property name="tplWritePages"           type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/>
+    <property name="tplInitialFileCount"     type="uint16" access="RO" unit="file"  desc="Number of files initially allocated to transaction prepared list journal"/>
+    <property name="tplDataFileSize"         type="uint32" access="RO" unit="byte"  desc="Size of each journal data file in transaction prepared list journal"/>
+    <property name="tplCurrentFileCount"     type="uint32" access="RO" unit="file"  desc="Number of files currently allocated to transaction prepared list journal"/>
+    
+    <statistic name="tplTransactionDepth"    type="hilo32"  unit="txn"    desc="Number of currently enqueued prepared transactions"/>
+    <statistic name="tplTxnPrepares"         type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/>
+    <statistic name="tplTxnCommits"          type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/>
+    <statistic name="tplTxnAborts"           type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/>
+    <statistic name="tplOutstandingAIOs"     type="hilo32"  unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
   </class>
 
   <class name="Journal">
-    <property name="name"          type="sstr"   access="RO" index="y"/>
-    <property name="queueRef"      type="objId"  access="RO" references="qpid.Queue" isGeneralReference="y"/>
-    <property name="directory"     type="sstr"   access="RO"              desc="Directory containing journal files"/>
-    <property name="baseFileName"  type="sstr"   access="RO"              desc="Base filename prefix for journal"/>
-    <property name="writePageSize" type="uint32" access="RO" unit="byte"  desc="Page size in write-page-cache"/>
-    <property name="writePages"    type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
-    <property name="readPageSize"  type="uint32" access="RO" unit="byte"  desc="Page size in read-page-cache"/>
-    <property name="readPages"     type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
+    <property name="queueRef"           type="objId"  access="RO" references="qpid.Queue" isGeneralReference="y"/>
+    <property name="name"               type="sstr"   access="RO" index="y"/>
+    <property name="directory"          type="sstr"   access="RO"              desc="Directory containing journal files"/>
+    <property name="baseFileName"       type="sstr"   access="RO"              desc="Base filename prefix for journal"/>
+    <property name="writePageSize"      type="uint32" access="RO" unit="byte"  desc="Page size in write-page-cache"/>
+    <property name="writePages"         type="uint32" access="RO" unit="wpage" desc="Number of pages in write-page-cache"/>
+    <property name="readPageSize"       type="uint32" access="RO" unit="byte"  desc="Page size in read-page-cache"/>
+    <property name="readPages"          type="uint32" access="RO" unit="rpage" desc="Number of pages in read-page-cache"/>
+    <property name="initialFileCount"   type="uint16" access="RO" unit="file"  desc="Number of files initially allocated to this journal"/>
+    <property name="dataFileSize"       type="uint32" access="RO" unit="byte"  desc="Size of each journal data file"/>
+    <property name="currentFileCount"   type="uint32" access="RO" unit="file"  desc="Number of files currently allocated to this journal"/>
+    
+    <statistic name="recordDepth"       type="hilo32"  unit="record" desc="Number of currently enqueued records (durable messages)"/>
+    <statistic name="enqueues"          type="count64" unit="record" desc="Total enqueued records on journal"/>
+    <statistic name="dequeues"          type="count64" unit="record" desc="Total dequeued records on journal"/>
+    <statistic name="txnEnqueues"       type="count64" unit="record" desc="Total transactional enqueued records on journal"/>
+    <statistic name="txnDequeues"       type="count64" unit="record" desc="Total transactional dequeued records on journal"/>
+    <statistic name="txnCommits"        type="count64" unit="record" desc="Total transactional commit records on journal"/>
+    <statistic name="txnAborts"         type="count64" unit="record" desc="Total transactional abort records on journal"/>
+    <statistic name="outstandingAIOs"   type="hilo32"  unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/>
 
-    <statistic name="initialFileCount"    type="uint16"  unit="file"   desc="Number of files initially allocated to this journal"/>
-    <statistic name="dataFileSize"        type="uint32"  unit="byte"   desc="Size of each journal data file"/>
-    <statistic name="currentFileCount"    type="uint32"  unit="file"   desc="Number of files currently allocated to this journal"/>
-    <statistic name="recordDepth"         type="hilo32"  unit="record" desc="Number of enqueued records (durable messages)"/>
-    <statistic name="recordEnqueues"      type="count64" unit="record" desc="Total enqueued records on journal"/>
-    <statistic name="recordDequeues"      type="count64" unit="record" desc="Total dequeued records on journal"/>
-    <statistic name="outstandingAIOs"     type="hilo32"  unit="aio_op" desc="Number of outstanding AIO requests in Async IO system"/>
-
 <!--
     The following are not yet "wired up" in JournalImpl.cpp
 -->




More information about the rhmessaging-commits mailing list