[rhmessaging-commits] rhmessaging commits: r2193 - in store/branches/mrg-1.0/cpp: lib/jrnl and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jul 14 09:50:10 EDT 2008


Author: kpvdr
Date: 2008-07-14 09:50:09 -0400 (Mon, 14 Jul 2008)
New Revision: 2193

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
   store/branches/mrg-1.0/cpp/lib/JournalImpl.h
   store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
   store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
   store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
   store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp
   store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp
   store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp
Log:
Backport of trunk r.2173: Moved prepared XID list from BDB to a journal instance in BdbMessageStore.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -22,20 +22,11 @@
 */
 
 #include "BdbMessageStore.h"
-#include <qpid/broker/RecoveryManager.h>
-#include <qpid/broker/Message.h>
-#include <qpid/framing/Buffer.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Mutex.h>
-#include <algorithm>
-#include <iomanip>
-#include <sstream>
+
 #include "BindingDbt.h"
+#include "BufferValue.h"
 #include "IdPairDbt.h"
-#include "StringDbt.h"
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
 #include "qpid/management/PackageMrgstore.h"
 
 using namespace rhm::bdbstore;
@@ -63,9 +54,6 @@
                                                         mappingDb(&env, 0), 
                                                         bindingDb(&env, 0), 
                                                         generalDb(&env, 0),
-                                                        enqueueXidDb(&env, 0), 
-                                                        dequeueXidDb(&env, 0), 
-                                                        prepareXidDb(&env, 0),
                                                         numJrnlFiles(defNumJrnlFiles),
                                                         jrnlFsizePgs(defJrnlFileSizePgs),
                                                         wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
@@ -125,12 +113,11 @@
     
     if (dir.size()>0) storeDir = dir;
 
-    string bdbdir = storeDir + "/rhm/dat/";
-    journal::jdir::create_dir(bdbdir);
+    journal::jdir::create_dir(getBdbBaseDir());
+    journal::jdir::create_dir(getPxidBaseDir());
 
-
     try {
-        env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+        env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
     } catch (const DbException& e) {
         if (e.get_errno() == DB_VERSION_MISMATCH)
             THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. "
@@ -150,10 +137,11 @@
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
-        open(enqueueXidDb, txn.get(), "enqueue_xid.db", true);
-        open(dequeueXidDb, txn.get(), "dequeue_xid.db", true);
-        open(prepareXidDb, txn.get(), "prepare_xid.db", false);
+        preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
         txn.commit();
+    } catch (const journal::jexception& e) {
+        txn.abort();
+        THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
     } catch (const DbException& e) {
         txn.abort();
         THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -167,6 +155,15 @@
     return true;
 }
 
+void BdbMessageStore::chkInitPreparedXidStore()
+{
+    if (!preparedXidStorePtr->is_init())
+    {
+        u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
+        preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
+    }
+}
+
 bool BdbMessageStore::init(const qpid::Options* options) 
 {
     const Options* opts = static_cast<const Options*>(options);
@@ -253,11 +250,16 @@
         for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
         }
+        if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
     } catch (const DbException& e) {
-        QPID_LOG(error, "Error closing databases: " <<  e.what());
+        QPID_LOG(error, "Error closing BDB databases: " <<  e.what());
+    } catch (const journal::jexception& e) {
+        QPID_LOG(error, "Error: " << e.what());
     } catch (const std::exception& e) {
-        QPID_LOG(error, e.what());
-    } catch (...) {}
+        QPID_LOG(error, "Error: " << e.what());
+    } catch (...) {
+        QPID_LOG(error, "Unknown error in BdbMessageStore::~BdbMessageStore()");
+    }
 
     if (mgmtObject.get() != 0)
         mgmtObject->resourceDestroy();
@@ -276,6 +278,7 @@
     txn->commit(0); 
     try{    
         journal::jdir::delete_dir(getJrnlBaseDir(),true);
+        journal::jdir::delete_dir(getPxidBaseDir(),true);
     }    
     catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -508,7 +511,7 @@
         try
         {
             u_int64_t thisHighestRid = 0;
-            jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start recovery
+            jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
             if (thisHighestRid > highestRid)
                 highestRid = thisHighestRid;
             recoverMessages(txn, registry, queue, prepared, messages); 
@@ -601,6 +604,10 @@
     generalIdSequence.reset(maxGeneralId + 1);
 }
 
+
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
 void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery, 
                                       qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
 {
@@ -678,7 +685,7 @@
               }
               case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
                   if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                      THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+                      THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverMessages()");
                   ::usleep(AIO_SLEEP_TIME);
                   break;
               case rhm::journal::RHM_IORES_EMPTY:
@@ -748,11 +755,11 @@
 
 void BdbMessageStore::recoverXids(txn_list& txns)
 {
-    std::set<string> prepared;
-    collectPreparedXids(prepared);
+    std::set<string> preparedXidSet;
+    collectPreparedXids(preparedXidSet);
 
-    //when using the async journal, it will abort unprepaired xids and populate the locked maps
-    for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+    // Abort unprepaired xids and populate the locked maps
+    for (std::set<string>::iterator i = preparedXidSet.begin(); i != preparedXidSet.end(); i++) {
         LockedMappings::shared_ptr enq_ptr;
         enq_ptr.reset(new LockedMappings);
         LockedMappings::shared_ptr deq_ptr;
@@ -789,7 +796,20 @@
 
 void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
 {
-    readXids(prepareXidDb, xids);    
+    if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
+    {
+        u_int64_t highest_rid;
+        preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+                JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
+                0, highest_rid, 0);
+    
+        std::vector<std::string> xv;
+        preparedXidStorePtr->get_open_txn_list(xv);
+        for (std::vector<std::string>::const_iterator itr = xv.begin(); itr < xv.end(); itr++)
+            xids.insert(*itr);
+
+        preparedXidStorePtr->recover_complete(); // start journal.
+    }
 }
 
 void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
@@ -961,26 +981,18 @@
         txn = check(ctxt);
     } else {
         txn = &implicit;
-        txn->begin(env);
     }
 
-    try {
-        bool newId = false;
-        if (messageId == 0) {
-            messageId = messageIdSequence.next();
-            msg->setPersistenceId(messageId);
-            newId = true;
-        }
-        store(&queue, txn, key, msg, newId);
+    bool newId = false;
+    if (messageId == 0) {
+        messageId = messageIdSequence.next();
+        msg->setPersistenceId(messageId);
+        newId = true;
+    }
+    store(&queue, txn, key, msg, newId);
 
-        // add queue* to the txn map..
-        if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-        
-        if (!ctxt) txn->commit();
-    } catch (const std::exception& e) {
-        if (!ctxt) txn->abort();
-        throw;
-    }
+    // add queue* to the txn map..
+    if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 }
 
 void BdbMessageStore::store(const PersistableQueue* queue, 
@@ -1054,25 +1066,13 @@
         txn = check(ctxt);
     } else {
         txn = &implicit;
-        txn->begin(env);
     }
     
-    try {
-        
-        // add queue* to the txn map..
-        if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-        async_dequeue(ctxt, msg, queue); 
+    // add queue* to the txn map..
+    if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+    async_dequeue(ctxt, msg, queue); 
 			
-        msg->dequeueComplete();
-        // 		    if ( msg->isDequeueComplete()  ) // clear id after last dequeue
-        // 		         msg->setPersistenceId(0);
-			
-        if (!ctxt) txn->commit();
-        
-    } catch (const std::exception& e) {
-        if (!ctxt) txn->abort();
-        throw;
-    }   
+    msg->dequeueComplete();
 }
 
 void BdbMessageStore::async_dequeue(
@@ -1143,17 +1143,15 @@
 
 void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
 {
-    if (!txn.get()) txn.begin(env);
-
     try {
+        // Nothing to do if not prepared
+        chkInitPreparedXidStore();
+        if (txn.getDtok().is_enqueued())
+            preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(), txn.getXid());
 
-        StringDbt key(txn.getXid());
-        prepareXidDb.del(txn.get(), &key, 0);
-
         txn.complete(commit);
     } catch (const std::exception& e) {
         QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
-        txn.abort();
         throw;
     }
 }
@@ -1161,9 +1159,8 @@
 auto_ptr<TransactionContext> BdbMessageStore::begin() 
 {
     checkInit();
-    // pass sequence number for c/a when using jrnl
+    // pass sequence number for c/a
     TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
-    txn->begin(env);
     return auto_ptr<TransactionContext>(txn);
 }
 
@@ -1173,9 +1170,8 @@
     IdSequence* jtx = NULL;
     jtx = &messageIdSequence;
 
-    // pass sequence number for c/a when using jrnl
+    // pass sequence number for c/a
     TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
-    txn->begin(env);
     return auto_ptr<TPCTransactionContext>(txn);
 }
 
@@ -1186,18 +1182,14 @@
     if(!txn) throw InvalidTransactionContextException();
     
     try {
-        u_int8_t dummy(1);
-        string xid(txn->getXid());
-        Dbt key ((void*) xid.data(), xid.length());
-        Dbt value(&dummy, sizeof(dummy));
+        chkInitPreparedXidStore();
+        preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(), txn->getXid(), false);
+        txn->addXidRecord(preparedXidStorePtr.get());
 
         // make sure all the data is written to disk before returning
         txn->sync();
-        prepareXidDb.put(txn->get(), &key, &value, 0);
-
-        txn->commit();
     } catch (const std::exception& e) {
-        txn->abort();
+        QPID_LOG(error, "Error preparing xid " << txn->getXid() << ": " << e.what());
         throw;
     }
 }
@@ -1209,7 +1201,7 @@
     if (txn->isTPC()) {
         completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);        
     } else {
-        txn->commit();
+        txn->complete(true);
     }
 }
 
@@ -1220,7 +1212,7 @@
     if (txn->isTPC()) {
         completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
     } else {
-        txn->abort();
+        txn->complete(false);
     }
 }
 
@@ -1326,10 +1318,24 @@
 string BdbMessageStore::getJrnlBaseDir() 
 {
     std::stringstream dir;
-    dir << storeDir<< "/rhm/jrnl/" ;
+    dir << storeDir << "/rhm/jrnl/" ;
     return dir.str();
 }
 
+string BdbMessageStore::getBdbBaseDir() 
+{
+    std::stringstream dir;
+    dir << storeDir << "/rhm/dat/" ;
+    return dir.str();
+}
+
+string BdbMessageStore::getPxidBaseDir() 
+{
+    std::stringstream dir;
+    dir << storeDir << "/rhm/pxid/" ;
+    return dir.str();
+}
+
 string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
 {
     return getJrnlDir(queue.getName().c_str());
@@ -1370,5 +1376,3 @@
          "Lower values decrease latency at the expense of throughput.")
         ;
 }
-
-

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 13:50:09 UTC (rev 2193)
@@ -24,26 +24,30 @@
 #ifndef _BdbMessageStore_
 #define _BdbMessageStore_
 
+#include <string>
 #include "db-inc.h"
-#include "BufferValue.h"
+//#include "BufferValue.h"
 #include "Cursor.h"
 #include "IdDbt.h"
-#include "IdSequence.h"
+//#include "IdSequence.h"
 #include "PreparedTransaction.h"
-#include "StoreException.h"
+//#include "StoreException.h"
 #include "TxnCtxt.h"
-#include <qpid/broker/Broker.h>
-#include <qpid/broker/MessageStore.h>
-#include <qpid/management/Manageable.h>
-#include <qpid/sys/Monitor.h>
-#include <qpid/sys/Time.h>
-#include <map>
-#include <set>
-#include <iostream>
-#include <boost/format.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/ptr_container/ptr_list.hpp>
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/management/Manageable.h"
+//#include <qpid/sys/Monitor.h>
+//#include <qpid/sys/Time.h>
+//#include <map>
+//#include <set>
+//#include <iostream>
+//#include <boost/format.hpp>
+//#include <boost/intrusive_ptr.hpp>
+//#include <boost/ptr_container/ptr_list.hpp>
 #include "qpid/management/Store.h"
+#include "jrnl/jcfg.hpp"
+#include "JournalImpl.h"
+#include "IdSequence.h"
 
 // Assume DB_VERSION_MAJOR == 4
 #if (DB_VERSION_MINOR == 2)
@@ -68,9 +72,12 @@
             typedef boost::ptr_list<PreparedTransaction> txn_list;
 
             // Default store settings
-            static const u_int16_t defNumJrnlFiles = 8;      // TODO: make configurable
-            static const u_int32_t defJrnlFileSizePgs = 24;  // TODO: make configurable
-            static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024; // TODO: make configurable
+            static const u_int16_t defNumJrnlFiles = 8;
+            static const u_int32_t defJrnlFileSizePgs = 24;
+            static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
+            static const u_int16_t defXidStoreNumJrnlFiles = 8;
+            static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
+            static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
 
             std::list<Db*> dbs;
             DbEnv env;
@@ -81,9 +88,7 @@
             Db mappingDb;
             Db bindingDb;
             Db generalDb;
-            Db enqueueXidDb;
-            Db dequeueXidDb;
-            Db prepareXidDb;
+            boost::shared_ptr<JournalImpl> preparedXidStorePtr;
             IdSequence queueIdSequence;
             IdSequence exchangeIdSequence;
             IdSequence generalIdSequence;
@@ -150,9 +155,12 @@
 	  	  	string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
 	  	  	string getJrnlDir(const char* queueName);
 	  	  	string getJrnlBaseDir(); 
+            string getBdbBaseDir(); 
+            string getPxidBaseDir(); 
 			inline void checkInit() {
                 if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
             }
+            void chkInitPreparedXidStore();
 
         public:
             struct Options : public qpid::Options {

Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -152,7 +152,7 @@
                      const u_int32_t wcache_pgsize_sblks,
                      const journal::rd_aio_cb rd_cb,
                      const journal::wr_aio_cb wr_cb,
-                     boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+                     boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
                      u_int64_t& highest_rid,
                      u_int64_t queue_id)
 {
@@ -162,34 +162,42 @@
     oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
     oss1 << " wcache_num_pages=" << wcache_num_pages;
     log(LOG_DEBUG, oss1.str());
-    // Create list of prepared xids
-    std::vector<std::string> prep_xid_list;
-    for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
-            i != prep_tx_list.end(); i++) {
-        prep_xid_list.push_back(i->xid);
+
+    if (prep_tx_list_ptr) {
+        // Create list of prepared xids
+        std::vector<std::string> prep_xid_list;
+        for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin();
+                i != prep_tx_list_ptr->end(); i++) {
+            prep_xid_list.push_back(i->xid);
+        }
+
+        jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
+                &prep_xid_list, highest_rid);
+    } else {
+        jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
+                0, highest_rid);
     }
-
-    jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, rd_cb, wr_cb,
-            prep_xid_list, highest_rid);
         
     // Populate PreparedTransaction lists from _tmap
-    for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
-            i != prep_tx_list.end(); i++) {
-        try {
-            txn_data_list tdl = _tmap.get_tdata_list(i->xid);
-            assert(tdl.size()); // should never be empty
-            for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
-                if (tdl_itr->_enq_flag) { // enqueue op
-                    i->enqueues->add(queue_id, tdl_itr->_rid);
-                } else { // dequeue op
-                    i->dequeues->add(queue_id, tdl_itr->_drid);
+    if (prep_tx_list_ptr)
+    {
+        for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+            try {
+                txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+                assert(tdl.size()); // should never be empty
+                for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+                    if (tdl_itr->_enq_flag) { // enqueue op
+                        i->enqueues->add(queue_id, tdl_itr->_rid);
+                    } else { // dequeue op
+                        i->dequeues->add(queue_id, tdl_itr->_drid);
+                    }
                 }
             }
+            catch (const jexception& e) {
+                if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                    throw;
+            }
         }
-        catch (const jexception& e) {
-            if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
-                throw;
-        }
     }
     std::ostringstream oss2;
     oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;

Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.h	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.h	2008-07-14 13:50:09 UTC (rev 2193)
@@ -119,7 +119,7 @@
                          const u_int32_t wcache_pgsize_sblks,
                          const journal::rd_aio_cb rd_cb,
                          const journal::wr_aio_cb wr_cb,
-                         boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+                         boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
                          u_int64_t& highest_rid,
                          u_int64_t queue_id);
 
@@ -127,11 +127,11 @@
                                 const u_int32_t jfsize_sblks,
                                 const u_int16_t wcache_num_pages,
                                 const u_int32_t wcache_pgsize_sblks,
-                                boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+                                boost::ptr_list<bdbstore::PreparedTransaction>* prep_tx_list_ptr,
                                 u_int64_t& highest_rid,
                                 u_int64_t queue_id) {
                 recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, 0,
-                        &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
+                        &aio_wr_callback, prep_tx_list_ptr, highest_rid, queue_id);
             }
 
             void recover_complete();

Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 13:50:09 UTC (rev 2193)
@@ -2,7 +2,7 @@
     Copyright (C) 2007 Red Hat Software
 
     This file is part of Red Hat Messaging.
-    
+
     Red Hat Messaging is free software; you can redistribute it and/or
     modify it under the terms of the GNU Lesser General Public
     License as published by the Free Software Foundation; either
@@ -24,38 +24,40 @@
 #ifndef _TxnCtxt_
 #define _TxnCtxt_
 
-#include "db-inc.h"
-#include <qpid/broker/MessageStore.h>
-#include <qpid/sys/Mutex.h>
-#include <boost/shared_ptr.hpp>
-#include <sstream>
-#include <memory>
-#include <vector>
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
 #include <boost/format.hpp>
 #include <boost/intrusive_ptr.hpp>
-#include <jrnl/jexception.hpp>
+#include <db-inc.h>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <string>
+#include <unistd.h> // ::usleep()
 
+#include "DataTokenImpl.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jexception.hpp"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+#include "StoreException.h"
+
 namespace rhm{
 namespace bdbstore{
 
-// find a better place to put these
-#define MAX_AIO_SLEEPS 1000
-#define AIO_SLEEP_TIME 1000
-
-
 class TxnCtxt : public qpid::broker::TransactionContext
 {
 protected:
+
+    static qpid::sys::Mutex globalSerialiser;
+
     typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
     typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
 
-    static qpid::sys::Mutex globalSerialiser;
-
     ipqdef impactedQueues; // list of Queues used in the txn
     mutable qpid::sys::Mutex Lock;
     IdSequence* loggedtx;
+    DataTokenImpl dtok;
     AutoScopedLock globalHolder;
 
     /**
@@ -63,67 +65,70 @@
      */
     std::string tid;
     DbTxn* txn;
-    
-    void completeTXN(bool commit){
+
+    void completeTXN(bool commit) {
         sync();
-        for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
+        for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
             JournalImpl* jc = static_cast<JournalImpl*>(*i);
             if (jc && loggedtx) { /* if using journal */
                 boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
                 dtokp->addRef();
                 dtokp->set_external_rid(true);
                 dtokp->set_rid(loggedtx->next());
-                try{
+                try {
                     if (commit) {
                         jc->txn_commit(dtokp.get(), getXid());
                         jc->flush(true);
                     } else {
                         jc->txn_abort(dtokp.get(), getXid());
                     }
-                } catch (const journal::jexception& e) { 
+                } catch (const journal::jexception& e) {
                     //std::cout << "Error commit" << e << std::endl;
                     THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
                 }
-		
             }
-        }	
+        }
         deleteXidRecord();
     }
-    
+
 public:
-	
-    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0)  {
-        if (loggedtx){ 
-	  std::stringstream s;
-	  s << "rhm-tid" << this;
-	  tid.assign(s.str());
-	}
+
+    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+        if (loggedtx) {
+            std::stringstream s;
+            s << "rhm-tid" << this;
+            tid.assign(s.str());
+        }
     }
-	
+
     /**
      * Call to make sure all the data for this txn is written to safe store
      *
      *@return if the data sucessfully synced.
-     */	
-    void sync(){
+     */
+
+    virtual ~TxnCtxt() { if(txn) abort(); }
+
+#define MAX_SYNC_SLEEPS 1000 // ~1 second
+#define SYNC_SLEEP_TIME 1000 // 1 milisecond
+
+    void sync() {
         bool allWritten = false;
         bool firstloop = true;
-        while (loggedtx && !allWritten){
-            if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
+        long sleep_cnt = 0L;
+        while (loggedtx && !allWritten) {
+            if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
+            if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
             allWritten = true;
-            for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
+            for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
                 JournalImpl* jc = static_cast<JournalImpl*>(*i);
-		
-                try
-                    {
-                        if (jc && !(jc->is_txn_synced(getXid())))
-                            {
-                                if (firstloop)
-                                    jc->flush();
-                                allWritten = false;
-                                jc->get_wr_events();
-                            }
-                    } catch (const journal::jexception& e) { 
+                try {
+                    if (jc && !(jc->is_txn_synced(getXid()))) {
+                        if (firstloop) jc->flush();
+                        allWritten = false;
+                        jc->get_wr_events();
+                    }
+                } catch (const journal::jexception& e) {
                     //std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
                     THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
                 }
@@ -131,34 +136,36 @@
             firstloop = false;
         }
     }
-    
-    virtual ~TxnCtxt() { if(txn) abort(); }
-    void begin(DbEnv& env, bool sync = false){ 
-        env.txn_begin(0, &txn, 0);  
-        if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser)); 
+
+    void begin(DbEnv& env, bool sync = false) {
+        env.txn_begin(0, &txn, 0);
+        if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
     }
-    void commit(){ 
-        txn->commit(0); 
-        txn = 0; 
-        completeTXN(true); 
-        globalHolder.reset(); 
+
+    void commit() {
+        if (txn) {
+            txn->commit(0);
+            txn = 0;
+            globalHolder.reset();
+        }
     }
-    void abort(){ 
+
+    void abort(){
         if (txn) {
-	    txn->abort();
-	    txn = 0; 
-	    completeTXN(false); 
-	    globalHolder.reset(); 
-	}
+            txn->abort();
+            txn = 0;
+            globalHolder.reset();
+        }
     }
-    DbTxn* get(){ return txn; }
+
+    DbTxn* get() { return txn; }
     virtual bool isTPC() { return false; }
     virtual const std::string& getXid() { return tid; }
     
-    void deleteXidRecord(){ impactedQueues.clear(); }
-    void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
-        impactedQueues.insert(queue); }
-    
+    void deleteXidRecord() { impactedQueues.clear(); }
+    void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+    void complete(bool commit) { completeTXN(commit); }
+    DataTokenImpl& getDtok() { return dtok; }
 };
 
 class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
@@ -168,12 +175,6 @@
     TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
     virtual bool isTPC() { return true; }
     virtual const std::string& getXid() { return xid; }
-	// commit the BDB abort, abort commit the jnrl
-    void commit(){ txn->commit(0); txn = 0; globalHolder.reset(); }
-    void abort(){ txn->abort(); txn = 0; globalHolder.reset(); }
-    void complete(bool commit){ 
-	txn->commit(0); completeTXN(commit); txn = 0; 
-    } 
 };
 
 }}

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -155,7 +155,7 @@
 jcntl::recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
         const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
         const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
-        const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+        const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid)
 {
     _init_flag = false;
     _stop_flag = false;
@@ -187,7 +187,7 @@
     _jdir.verify_dir();
     _rcvdat.reset(_num_jfiles);
 
-    rcvr_janalyze(_rcvdat, prep_txn_list);
+    rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
     highest_rid = _rcvdat._h_rid;
     if (_rcvdat._full)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
@@ -574,7 +574,7 @@
 }
 
 void
-jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr)
 {
     jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
 
@@ -633,16 +633,16 @@
         if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
             rd._full = true;
         
-        // Remove all transactions not in prep_txn_list      
-        std::vector<std::string> xid_list;
-        _tmap.xid_list(xid_list);
-        for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
-                itr++)
+        if (!rd._empty && prep_txn_list_ptr)
         {
-            std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
-                    prep_txn_list.end(), *itr);
-            if (pitr == prep_txn_list.end())
-                _tmap.get_remove_tdata_list(*itr);
+            std::vector<std::string> xid_list;
+            _tmap.xid_list(xid_list);
+            for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
+            {
+                std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
+                if (pitr == prep_txn_list_ptr->end())
+                    _tmap.get_remove_tdata_list(*itr);
+            }
         }
     }
 }

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -210,7 +210,7 @@
         * \param wcache_pgsize_sblks The size in sblks of each write cache page.
         * \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
         * \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
-        * \param prep_txn_list
+        * \param prep_txn_list_ptr
         * \param highest_rid Returns the highest rid found in the journal during recover
         *
         * \exception TODO
@@ -218,7 +218,7 @@
         void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
                 const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
                 const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
-                const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
+                const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid);
 
         /**
         * \brief Notification to the journal that recovery is complete and that normal operation
@@ -575,6 +575,7 @@
         *     <b><i>false</i></b> otherwise.
         */
         inline bool is_ready() const { return _init_flag and not _stop_flag; }
+        inline bool is_init() const { return _init_flag; }
 
         inline bool is_read_only() const { return _readonly_flag; }
 
@@ -599,6 +600,9 @@
         inline u_int16_t num_jfiles() const { return _num_jfiles; }
 
         inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+        
+        inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
+        void get_open_txn_list(std::vector<std::string>& xv) { _tmap.xid_list(xv); }
 
         // Logging
         virtual void log(log_level level, const std::string& log_stmt) const;
@@ -644,7 +648,7 @@
         /**
         * \brief Analyze journal for recovery.
         */
-        void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);
+        void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr);
 
         bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd);
 

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -967,8 +967,8 @@
         else
         {
             std::ostringstream oss;
-            oss << "op=" << _op_str[op] << " index=" << _pg_index << " state=";
-            oss << _page_cb_arr[_pg_index].state_str();
+            oss << "jrnl=" << _jc->id()  << " op=" << _op_str[op];
+            oss << " index=" << _pg_index << " pg_state=" << _page_cb_arr[_pg_index].state_str();
             throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "pre_write_check");
         }
     }
@@ -988,8 +988,8 @@
                 if (!dtokp->is_writable())
                 {
                     std::ostringstream oss;
-                    oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
-                    oss << " dtok_state=" << dtokp->wstate_str();
+                    oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
+                    oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
                     throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
                         "pre_write_check");
                 }
@@ -999,8 +999,8 @@
             if (!dtokp->is_dequeueable())
             {
                 std::ostringstream oss;
-                oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
-                oss << " dtok_state=" << dtokp->wstate_str();
+                oss << "jrnl=" << _jc->id()  << " op=" << _op_str[op];
+                oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
                 throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
                         "pre_write_check");
             }

Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -160,6 +160,9 @@
         swap.check(commit);        
         restart();
         swap.check(commit);
+        
+        // this test leaves xids in the store
+        store->truncate();
     }
 
     void commit(Strategy& strategy)
@@ -294,52 +297,46 @@
 public:
     TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
 
-    void testCommitSwap()
+    void testCommitEnqueue()
     {
-        Swap swap(this, "SwapMessageId");
-        commit(swap);
+        Enqueue enqueue(this);
+        commit(enqueue);
     }
 
-    void testPrepareAndAbortSwap()
+    void testCommitDequeue()
     {
-        Swap swap(this, "SwapMessageId");
-        abort(swap, true);
+        Dequeue dequeue(this);
+        commit(dequeue);
     }
 
-    void testAbortNoPrepareSwap()
+    void testCommitSwap()
     {
         Swap swap(this, "SwapMessageId");
-        abort(swap, false);
+        commit(swap);
     }
 
-    void testCommitEnqueue()
-    {
-        Enqueue enqueue(this);
-        commit(enqueue);
-    }
-
     void testPrepareAndAbortEnqueue()
     {
         Enqueue enqueue(this);
         abort(enqueue, true);
     }
 
-    void testAbortNoPrepareEnqueue()
+    void testPrepareAndAbortDequeue()
     {
-        Enqueue enqueue(this);
-        abort(enqueue, false);
+        Dequeue dequeue(this);
+        abort(dequeue, true);
     }
 
-    void testCommitDequeue()
+    void testPrepareAndAbortSwap()
     {
-        Dequeue dequeue(this);
-        commit(dequeue);
+        Swap swap(this, "SwapMessageId");
+        abort(swap, true);
     }
 
-    void testPrepareAndAbortDequeue()
+    void testAbortNoPrepareEnqueue()
     {
-        Dequeue dequeue(this);
-        abort(dequeue, true);
+        Enqueue enqueue(this);
+        abort(enqueue, false);
     }
 
     void testAbortNoPrepareDequeue()
@@ -348,6 +345,12 @@
         abort(dequeue, false);
     }
 
+    void testAbortNoPrepareSwap()
+    {
+        Swap swap(this, "SwapMessageId");
+        abort(swap, false);
+    }
+
     void testRecoverPreparedThenCommitted()
     {
         recoverPrepared(true);
@@ -363,73 +366,73 @@
 
 // === Test suite ===
 
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
+QPID_AUTO_TEST_CASE(CommitEnqueue)
 {
-    cout << test_filename << ".PrepareAndAbortSwap: " << flush;
-    tpct.testPrepareAndAbortSwap();
+    cout << test_filename << ".CommitEnqueue: " << flush;
+    tpct.testCommitEnqueue();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(CommitEnqueue)
+QPID_AUTO_TEST_CASE(CommitDequeue)
 {
-    cout << test_filename << ".CommitEnqueue: " << flush;
-    tpct.testCommitEnqueue();
+    cout << test_filename << ".CommitDequeue: " << flush;
+    tpct.testCommitDequeue();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
+QPID_AUTO_TEST_CASE(CommitSwap)
 {
-    cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
-    tpct.testAbortNoPrepareEnqueue();
+    cout << test_filename << ".CommitSwap: " << flush;
+    tpct.testCommitSwap();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
 {
-    cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
-    tpct.testPrepareAndAbortDequeue();
+    cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
+    tpct.testPrepareAndAbortEnqueue();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
 {
-    cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
-    tpct.testRecoverPreparedThenCommitted();
+    cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
+    tpct.testPrepareAndAbortDequeue();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(CommitSwap)
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
 {
-    cout << test_filename << ".CommitSwap: " << flush;
-    tpct.testCommitSwap();
+    cout << test_filename << ".PrepareAndAbortSwap: " << flush;
+    tpct.testPrepareAndAbortSwap();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
 {
-    cout << test_filename << ".AbortNoPrepareSwap: " << flush;
-    tpct.testAbortNoPrepareSwap();
+    cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
+    tpct.testAbortNoPrepareEnqueue();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
 {
-    cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
-    tpct.testPrepareAndAbortEnqueue();
+    cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
+    tpct.testAbortNoPrepareDequeue();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(CommitDequeue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
 {
-    cout << test_filename << ".CommitDequeue: " << flush;
-    tpct.testCommitDequeue();
+    cout << test_filename << ".AbortNoPrepareSwap: " << flush;
+    tpct.testAbortNoPrepareSwap();
     cout << "ok" << endl;
 }
 
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
 {
-    cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
-    tpct.testAbortNoPrepareDequeue();
+    cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
+    tpct.testRecoverPreparedThenCommitted();
     cout << "ok" << endl;
 }
 

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -144,25 +144,23 @@
             BOOST_CHECK_EQUAL(jc.is_read_only(), false);
         }
         {
-            vector<string> txn_list;
             u_int64_t hrid;
 
             test_jrnl jc(test_name, test_dir, test_name);
             BOOST_CHECK_EQUAL(jc.is_ready(), false);
             BOOST_CHECK_EQUAL(jc.is_read_only(), false);
-            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
             BOOST_CHECK_EQUAL(jc.is_ready(), true);
             BOOST_CHECK_EQUAL(jc.is_read_only(), true);
             BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
         }
         {
-            vector<string> txn_list;
             u_int64_t hrid;
 
             test_jrnl jc(test_name, test_dir, test_name);
             BOOST_CHECK_EQUAL(jc.is_ready(), false);
             BOOST_CHECK_EQUAL(jc.is_read_only(), false);
-            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
             BOOST_CHECK_EQUAL(jc.is_ready(), true);
             BOOST_CHECK_EQUAL(jc.is_read_only(), true);
             BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
@@ -189,11 +187,10 @@
                 enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
         }
         {
-            vector<string> txn_list;
             u_int64_t hrid;
 
             test_jrnl jc(test_name, test_dir, test_name);
-            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
             BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
             jc.recover_complete();
             for (int m=0; m<NUM_MSGS; m++)
@@ -210,7 +207,6 @@
     try
     {
         string msg;
-        vector<string> txn_list;
         u_int64_t hrid;
 
         for (int m=0; m<2*NUM_MSGS; m+=2)
@@ -221,7 +217,7 @@
                     jc.initialize(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS); // First time only
                 else
                 {
-                    jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+                    jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
                     BOOST_CHECK_EQUAL(hrid, u_int64_t(m - 1));
                     jc.recover_complete();
                 }
@@ -229,7 +225,7 @@
             }
             {
                 test_jrnl jc(test_name, test_dir, test_name);
-                jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+                jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
                 BOOST_CHECK_EQUAL(hrid, u_int64_t(m));
                 jc.recover_complete();
                 deq_msg(jc, m);
@@ -265,11 +261,10 @@
         }
         {
             string msg;
-            vector<string> txn_list;
             u_int64_t hrid;
 
             test_jrnl jc(test_name, test_dir, test_name);
-            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
             // Recover non-transient msgs
             for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
             {

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_helper_fns.hpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -70,7 +70,7 @@
     void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks)
         { jcntl::initialize(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE,
             0, &aio_wr_callback); }
-    void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, vector<string>& txn_list,
+    void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, vector<string>* txn_list,
             u_int64_t& highest_rid)
     { jcntl::recover(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES, JRNL_WMGR_DEF_PAGE_SIZE, 0,
             &aio_wr_callback, txn_list, highest_rid); }

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_read.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -144,7 +144,6 @@
         }
         {
             string msg;
-            vector<string> txn_list;
             u_int64_t hrid;
             string rmsg;
             string xid;
@@ -152,7 +151,7 @@
             bool externalFlag;
 
             test_jrnl jc(test_name, test_dir, test_name);
-            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
             BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
             jc.recover_complete();
             for (int m=0; m<NUM_MSGS; m++)
@@ -187,7 +186,6 @@
         }
         {
             string msg;
-            vector<string> txn_list;
             u_int64_t hrid;
             string rmsg;
             string xid;
@@ -195,7 +193,7 @@
             bool externalFlag;
 
             test_jrnl jc(test_name, test_dir, test_name);
-            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
             BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
             for (int m=0; m<NUM_MSGS; m++)
             {
@@ -209,7 +207,6 @@
         }
         {
             string msg;
-            vector<string> txn_list;
             u_int64_t hrid;
             string rmsg;
             string xid;
@@ -217,7 +214,7 @@
             bool externalFlag;
 
             test_jrnl jc(test_name, test_dir, test_name);
-            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+            jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
             BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
             for (int m=0; m<NUM_MSGS; m++)
             {

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-07-14 13:05:07 UTC (rev 2192)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-07-14 13:50:09 UTC (rev 2193)
@@ -110,11 +110,10 @@
         {
             try
             {
-            std::vector<std::string> prep_txn_list;
             u_int64_t highest_rid;
             recover(_jpp->num_jfiles(), _jpp->jfsize_sblks(), _jpp->wcache_num_pages(),
                     _jpp->wcache_pgsize_sblks(), aio_rd_callback, aio_wr_callback,
-                    prep_txn_list, highest_rid);
+                    0, highest_rid);
             recover_complete();
             }
             catch (const rhm::journal::jexception& e)




More information about the rhmessaging-commits mailing list