[rhmessaging-commits] rhmessaging commits: r2206 - in store/branches/mrg-1.0/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Jul 18 09:27:59 EDT 2008


Author: kpvdr
Date: 2008-07-18 09:27:59 -0400 (Fri, 18 Jul 2008)
New Revision: 2206

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Additional tidy-up on new code. Renamed some functions and variables to keep naming consistent around the Transaction Prepared List (TPL) name. Deleted an old BDB function that escaped detection from a previous BDB cleanup.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-18 13:27:59 UTC (rev 2206)
@@ -50,7 +50,7 @@
 qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
 qpid::sys::Mutex TxnCtxt::globalSerialiser;
 
-BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
                                                               const bool _deq_flag,
                                                               const bool _commit_flag) :
                                                               rid(_rid),
@@ -241,11 +241,11 @@
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
-        preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
+        tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
         txn.commit();
     } catch (const journal::jexception& e) {
         txn.abort();
-        THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
+        THROW_STORE_EXCEPTION_2("Error opening tplStore instance", e.what());
     } catch (const DbException& e) {
         txn.abort();
         THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -260,12 +260,12 @@
     return true;
 }
 
-void BdbMessageStore::chkInitPreparedXidStore()
+void BdbMessageStore::chkTplStoreInit()
 {
     qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-    journal::jdir::create_dir(getPxidBaseDir());
-    if (!preparedXidStorePtr->is_ready()) {
-        preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+    journal::jdir::create_dir(getTplBaseDir());
+    if (!tplStorePtr->is_ready()) {
+        tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
     }
 }
 
@@ -285,7 +285,7 @@
         for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
         }
-        if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
+        if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
     } catch (const DbException& e) {
         QPID_LOG(error, "Error closing BDB databases: " <<  e.what());
     } catch (const journal::jexception& e) {
@@ -313,7 +313,7 @@
     txn->commit(0);
     try {
         journal::jdir::delete_dir(getJrnlBaseDir(),true);
-        journal::jdir::delete_dir(getPxidBaseDir(),true);
+        journal::jdir::delete_dir(getTplBaseDir(),true);
     }
     catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -474,7 +474,7 @@
 {
     checkInit();
     txn_list prepared;
-    recoverXids(prepared);
+    recoverLockedMappings(prepared);
 
     queue_index queues;//id->queue
     exchange_index exchanges;//id->exchange
@@ -505,14 +505,14 @@
         TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
         std::auto_ptr<TPCTransactionContext> txn(tpcc);
 
-        tpcc->prepare(preparedXidStorePtr.get());
+        tpcc->prepare(tplStorePtr.get());
 
         // Restore data token state in TxnCtxt
-        PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
-        if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+        TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+        if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
         tpcc->recoverDtok(citr->second.rid, i->xid);
 
-        // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+        // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
         // was interrupted part way through committing/aborting the impacted queues. Complete this process.
         bool incomplTxnFlag = citr->second.deq_flag;
 
@@ -581,8 +581,8 @@
         maxQueueId = max(key.id, maxQueueId);
     }
 
-    // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
-    // the messageIdSequence is used for both queue journals and the preparedXid journal.
+    // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
+    // the messageIdSequence is used for both queue journals and the tpl journal.
     messageIdSequence.reset(highestRid + 1);
 
     queueIdSequence.reset(maxQueueId + 1);
@@ -669,7 +669,6 @@
     generalIdSequence.reset(maxGeneralId + 1);
 }
 
-
 void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
                                       qpid::broker::RecoveryManager& recovery,
                                       qpid::broker::RecoverableQueue::shared_ptr& queue,
@@ -730,8 +729,8 @@
 
                 if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
                     std::string xid((char*)xidbuff, xidbuffSize);
-                    PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
-                    if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+                    TplRecoverMapCitr citr = tplRecoverMap.find(xid);
+                    if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
                     if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
                         if (citr->second.commit_flag)
                             queue->recover(msg);
@@ -793,8 +792,6 @@
     return recovery.recoverMessage(header.buffer);
 }
 
-
-
 int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
                                     IdDbt& msgId,
                                     RecoverableMessage::shared_ptr& msg,
@@ -826,26 +823,26 @@
     return count;
 }
 
-void BdbMessageStore::recoverPreparedXidJournal()
+void BdbMessageStore::recoverTplStore()
 {
-    if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+    if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
         u_int64_t thisHighestRid;
-        preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+        tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
         if (thisHighestRid > highestRid)
             highestRid = thisHighestRid;
-        preparedXidStorePtr->recover_complete(); // start journal.
+        tplStorePtr->recover_complete(); // start journal.
     }
 }
 
-void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
 {
-    if (preparedXidStorePtr.get()) {
-        if (!preparedXidStorePtr->is_ready())
-            recoverPreparedXidJournal();
+    if (tplStorePtr.get()) {
+        if (!tplStorePtr->is_ready())
+            recoverTplStore();
 
         // TODO: The journal will return a const txn_map and the txn_map will support
         // const operations at some point. Using non-const txn_map this way is ugly...
-        journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+        journal::txn_map& tmap = tplStorePtr->get_txn_map();
         std::vector<std::string> xidList;
         tmap.xid_list(xidList);
         for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
@@ -865,18 +862,18 @@
             }
             assert(enqCnt == 1);
             assert(deqCnt <= 1);
-            prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+            tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
         }
     }
 }
 
-void BdbMessageStore::recoverXids(txn_list& txns)
+void BdbMessageStore::recoverLockedMappings(txn_list& txns)
 {
-    if (!preparedXidStorePtr->is_ready())
-        getPreparedXidMap(preparedXidStoreRecoverMap);
+    if (!tplStorePtr->is_ready())
+        getTplRecoverMap(tplRecoverMap);
 
     // Abort unprepaired xids and populate the locked maps
-    for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+    for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
         LockedMappings::shared_ptr enq_ptr;
         enq_ptr.reset(new LockedMappings);
         LockedMappings::shared_ptr deq_ptr;
@@ -885,25 +882,11 @@
     }
 }
 
-void BdbMessageStore::readLockedMappings(Db& db,
-                                         txn_lock_map& mappings)
-{
-    Cursor c;
-    c.open(db, 0);
-
-    Dbt key;
-    IdPairDbt value;
-    while (c.next(key, value)) {
-        std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
-        LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
-    }
-}
-
 void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
 {
-    if (!preparedXidStorePtr->is_ready())
-        getPreparedXidMap(preparedXidStoreRecoverMap);
-    for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+    if (!tplStorePtr->is_ready())
+        getTplRecoverMap(tplRecoverMap);
+    for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
         xids.insert(i->first);
     }
 }
@@ -1250,14 +1233,15 @@
                                 bool commit)
 {
     try {
+        chkTplStoreInit(); // Late initialize (if needed)
+
         // Nothing to do if not prepared
-        chkInitPreparedXidStore();
         if (txn.getDtok()->is_enqueued()) {
             txn.incrDtokRef();
             DataTokenImpl* dtokp = txn.getDtok();
             dtokp->set_dequeue_rid(dtokp->rid());
             dtokp->set_rid(messageIdSequence.next());
-            preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
+            tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
         }
         txn.complete(commit);
     } catch (const std::exception& e) {
@@ -1292,13 +1276,14 @@
 void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
 {
     try {
-        chkInitPreparedXidStore();
+        chkTplStoreInit(); // Late initialize (if needed)
+
         ctxt->incrDtokRef();
         DataTokenImpl* dtokp = ctxt->getDtok();
         dtokp->set_external_rid(true);
         dtokp->set_rid(messageIdSequence.next());
-        preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
-        ctxt->prepare(preparedXidStorePtr.get());
+        tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+        ctxt->prepare(tplStorePtr.get());
         // make sure all the data is written to disk before returning
         ctxt->sync();
     } catch (const std::exception& e) {
@@ -1449,10 +1434,10 @@
     return dir.str();
 }
 
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getTplBaseDir()
 {
     std::stringstream dir;
-    dir << storeDir << "/rhm/pxid/" ;
+    dir << storeDir << "/rhm/tpl/" ;
     return dir.str();
 }
 

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-18 13:27:59 UTC (rev 2206)
@@ -61,17 +61,17 @@
     typedef LockedMappings::map txn_lock_map;
     typedef boost::ptr_list<PreparedTransaction> txn_list;
 
-    // Structs for preparedXidStore recover state
-    struct PreparedRecoverStruct {
+    // Structs for Transaction Recover List (TPL) recover state
+    struct TplRecoverStruct {
         u_int64_t rid;
         bool deq_flag;
         bool commit_flag;
-        PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
     };
-    typedef PreparedRecoverStruct PreparedRecover;
-    typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
-    typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
-    typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+    typedef TplRecoverStruct TplRecover;
+    typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
+    typedef std::map<std::string, TplRecover> TplRecoverMap;
+    typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
 
     // Default store settings
     static const u_int16_t defNumJrnlFiles = 8;
@@ -91,9 +91,9 @@
     Db bindingDb;
     Db generalDb;
 
-    // Pointer to prepared XID journal instance
-    boost::shared_ptr<JournalImpl> preparedXidStorePtr;
-    PreparedRecoverMap preparedXidStoreRecoverMap;
+    // Pointer to Transaction Prepared List (TPL) journal instance
+    boost::shared_ptr<JournalImpl> tplStorePtr;
+    TplRecoverMap tplRecoverMap;
 
     IdSequence queueIdSequence;
     IdSequence exchangeIdSequence;
@@ -157,11 +157,9 @@
                        queue_index& index,
                        txn_list& locked,
                        message_index& prepared);
-    void recoverPreparedXidJournal();
-    void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
-    void recoverXids(txn_list& txns);
-    void readLockedMappings(Db& db,
-                            txn_lock_map& mappings);
+    void recoverTplStore();
+    void getTplRecoverMap(TplRecoverMap& tplMap);
+    void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
     void store(const qpid::broker::PersistableQueue* queue,
                TxnCtxt* txn,
@@ -218,12 +216,12 @@
     std::string getJrnlDir(const char* queueName);
     std::string getJrnlBaseDir();
     std::string getBdbBaseDir();
-    std::string getPxidBaseDir();
+    std::string getTplBaseDir();
     inline void checkInit() {
         // TODO: change the default dir to ~/.qpidd
         if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
     }
-    void chkInitPreparedXidStore();
+    void chkTplStoreInit();
 
     // debug aid for printing XIDs that may contain non-printable chars
     static std::string xid2str(const std::string xid) {

Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-18 13:27:59 UTC (rev 2206)
@@ -189,7 +189,7 @@
             return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
         }
         u_int32_t getRemainingPreparedListTxns() {
-            return preparedXidStorePtr->get_open_txn_cnt();
+            return tplStorePtr->get_open_txn_cnt();
         }
     };
 




More information about the rhmessaging-commits mailing list