[rhmessaging-commits] rhmessaging commits: r2198 - in store/branches/mrg-1.0/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jul 14 13:05:32 EDT 2008


Author: kpvdr
Date: 2008-07-14 13:05:31 -0400 (Mon, 14 Jul 2008)
New Revision: 2198

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
   store/branches/mrg-1.0/cpp/lib/JournalImpl.h
   store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp
   store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
   store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp
   store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
   store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Backport of trunk r.2187-2188: Fixed 2PC multi-queue transaction atomicity problem. Any multi-queue 2PC commit/abort which is interrupted by failure will now be completed on recover for all queues which did not get processed. Local txns still have this problem, however. Some code tidy-up is also included. Removed old error messages from StorePlugin which are no longer possible owing to the removal of sync journal. 2188: orrected signed/unsigned comparison (which does not show up on F9)

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -2,7 +2,7 @@
   Copyright (C) 2007 Red Hat Software
 
   This file is part of Red Hat Messaging.
-    
+
   Red Hat Messaging is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
   License as published by the Free Software Foundation; either
@@ -26,6 +26,7 @@
 #include "BindingDbt.h"
 #include "BufferValue.h"
 #include "IdPairDbt.h"
+#include "jrnl/txn_map.hpp"
 #include "qpid/log/Statement.h"
 #include "qpid/management/PackageMrgstore.h"
 
@@ -49,28 +50,35 @@
 qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
 qpid::sys::Mutex TxnCtxt::globalSerialiser;
 
-BdbMessageStore::BdbMessageStore(const char* envpath) : env(0), 
-                                                        queueDb(&env, 0), 
-                                                        configDb(&env, 0), 
-                                                        exchangeDb(&env, 0), 
-                                                        messageDb(&env, 0), 
-                                                        mappingDb(&env, 0), 
-                                                        bindingDb(&env, 0), 
-                                                        generalDb(&env, 0),
-                                                        numJrnlFiles(defNumJrnlFiles),
-                                                        jrnlFsizePgs(defJrnlFileSizePgs),
-                                                        wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
-                                                        wcache_num_pages(JRNL_WMGR_DEF_PAGES),
-                                                        highestRid(0),
-                                                        isInit(false),
-                                                        envPath(envpath)
+BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+                                                              const bool _deq_flag,
+                                                              const bool _commit_flag) :
+                                                              rid(_rid),
+                                                              deq_flag(_deq_flag),
+                                                              commit_flag(_commit_flag)
+{}
 
+BdbMessageStore::BdbMessageStore(const char* envpath) :
+                                 env(0),
+                                 queueDb(&env, 0),
+                                 configDb(&env, 0),
+                                 exchangeDb(&env, 0),
+                                 messageDb(&env, 0),
+                                 mappingDb(&env, 0),
+                                 bindingDb(&env, 0),
+                                 generalDb(&env, 0),
+                                 numJrnlFiles(defNumJrnlFiles),
+                                 jrnlFsizePgs(defJrnlFileSizePgs),
+                                 wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
+                                 wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+                                 highestRid(0),
+                                 isInit(false),
+                                 envPath(envpath)
 {}
- 
+
 void BdbMessageStore::initManagement (Broker* broker)
 {
-    if (broker != 0)
-    {
+    if (broker != 0) {
         ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
 
         if (agent.get () != 0)
@@ -87,13 +95,16 @@
     }
 }
 
-bool BdbMessageStore::init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize) 
-{ 
+bool BdbMessageStore::init(const std::string& dir,
+                           u_int16_t jfiles,
+                           u_int32_t jfileSizePgs,
+                           uint32_t wCachePageSize)
+{
     if (isInit) return true;
 
     numJrnlFiles = jfiles;
     jrnlFsizePgs = jfileSizePgs;
-    
+
     // set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
     wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
     u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
@@ -114,7 +125,7 @@
             // 1 MiB total cache
             wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
     }
-    
+
     if (dir.size()>0) storeDir = dir;
 
     journal::jdir::create_dir(getBdbBaseDir());
@@ -132,12 +143,11 @@
 
     TxnCtxt txn;
     try {
-
         txn.begin(env, false);
         open(queueDb, txn.get(), "queues.db", false);
         open(configDb, txn.get(), "config.db", false);
         open(exchangeDb, txn.get(), "exchanges.db", false);
-        open(messageDb, txn.get(), "messages.db", false);       
+        open(messageDb, txn.get(), "messages.db", false);
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
@@ -153,7 +163,7 @@
         txn.abort();
         throw;
     }
-    
+
     isInit = true;
     QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
     return true;
@@ -161,25 +171,21 @@
 
 void BdbMessageStore::chkInitPreparedXidStore()
 {
-    if (!preparedXidStorePtr->is_init())
-    {
+    if (!preparedXidStorePtr->is_ready()) {
         u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
         preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
     }
 }
 
-bool BdbMessageStore::init(const qpid::Options* options) 
+bool BdbMessageStore::init(const qpid::Options* options)
 {
     const Options* opts = static_cast<const Options*>(options);
-    
+
     u_int16_t numJrnlFiles = opts->numJrnlFiles;
-    if (numJrnlFiles < JRNL_MIN_NUM_FILES)
-    {
+    if (numJrnlFiles < JRNL_MIN_NUM_FILES) {
         numJrnlFiles = JRNL_MIN_NUM_FILES;
         QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
-    }
-    else if (numJrnlFiles > JRNL_MAX_NUM_FILES)
-    {
+    } else if (numJrnlFiles > JRNL_MAX_NUM_FILES) {
         numJrnlFiles = JRNL_MAX_NUM_FILES;
         QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
     }
@@ -187,17 +193,14 @@
     u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
     u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
     u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
-    if (jrnlFsizePgs < jrnlMinFsizePgs)
-    {
+    if (jrnlFsizePgs < jrnlMinFsizePgs) {
         jrnlFsizePgs = jrnlMinFsizePgs;
         QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
-    }
-    else if (jrnlFsizePgs > jrnlMaxFsizePgs)
-    {
+    } else if (jrnlFsizePgs > jrnlMaxFsizePgs) {
         jrnlFsizePgs = jrnlMaxFsizePgs;
         QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
     }
-    
+
     u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
     switch (jrnlWrCachePageSize)
     {
@@ -212,14 +215,11 @@
             break;
         default:
             u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
-            if (oldJrnlWrCachePageSize == 0)
-            {
+            if (oldJrnlWrCachePageSize == 0) {
                 // For zero value, use default
                 jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
                 QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
-            }
-            else
-            {
+            } else {
                 // For any positive value, use closest value
                 if (oldJrnlWrCachePageSize < 6)
                     jrnlWrCachePageSize = 4;
@@ -240,17 +240,19 @@
     return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
 }
 
-void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
+void BdbMessageStore::open(Db& db,
+                           DbTxn* txn,
+                           const char* file,
+                           bool dupKey)
 {
     if(dupKey) db.set_flags(DB_DUPSORT);
     db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
     dbs.push_back(&db);
 }
 
-BdbMessageStore::~BdbMessageStore() 
+BdbMessageStore::~BdbMessageStore()
 {
     try {
-
         for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
         }
@@ -279,17 +281,18 @@
         (*i)->truncate(txn, &count, 0);
     }
 
-    txn->commit(0); 
-    try{    
+    txn->commit(0);
+    try {
         journal::jdir::delete_dir(getJrnlBaseDir(),true);
         journal::jdir::delete_dir(getPxidBaseDir(),true);
-    }    
+    }
     catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
-    } 
+    }
 }
 
-void BdbMessageStore::create(PersistableQueue& queue, const FieldTable& args)
+void BdbMessageStore::create(PersistableQueue& queue,
+                             const FieldTable& args)
 {
     checkInit();
     if (queue.getPersistenceId()) {
@@ -315,15 +318,15 @@
                                  string("JournalData"), defJournalGetEventsTimeout,
                                  defJournalFlushTimeout);
     }
+
     queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-    try	{
+    try {
         // init will create the deque's for the init...
         jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
     } catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
                               ": create() failed: " + e.what());
     }
-
     try {
         if (!create(queueDb, queueIdSequence, queue)) {
             THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
@@ -339,21 +342,20 @@
     destroy(queueDb, queue);
     deleteBindingsForQueue(queue);
     qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
-    if (eqs)
-    {
+    if (eqs) {
         JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
-    	jQueue->delete_jrnl_files();
+        jQueue->delete_jrnl_files();
         queue.setExternalQueueStore(0); // will delete the journal if exists
     }
 }
 
-void BdbMessageStore::create(const PersistableExchange& exchange, const FieldTable& /*args*/)
+void BdbMessageStore::create(const PersistableExchange& exchange,
+                             const FieldTable& /*args*/)
 {
     checkInit();
     if (exchange.getPersistenceId()) {
         THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
     }
-
     try {
         if (!create(exchangeDb, exchangeIdSequence, exchange)) {
             THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName());
@@ -378,7 +380,6 @@
     if (general.getPersistenceId()) {
         THROW_STORE_EXCEPTION("General configuration item already created");
     }
-
     try {
         if (!create(generalDb, generalIdSequence, general)) {
             THROW_STORE_EXCEPTION("General configuration already exists");
@@ -394,7 +395,9 @@
     destroy(generalDb, general);
 }
 
-bool BdbMessageStore::create(Db& db, IdSequence& seq, const Persistable& p)
+bool BdbMessageStore::create(Db& db,
+                             IdSequence& seq,
+                             const Persistable& p)
 {
     u_int64_t id (seq.next());
     Dbt key(&id, sizeof(id));
@@ -416,11 +419,13 @@
 }
 
 
-void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q, 
-                           const std::string& k, const FieldTable& a)
+void BdbMessageStore::bind(const PersistableExchange& e,
+                           const PersistableQueue& q,
+                           const std::string& k,
+                           const FieldTable& a)
 {
     checkInit();
-    IdDbt key(e.getPersistenceId());    
+    IdDbt key(e.getPersistenceId());
     BindingDbt value(e, q, k, a);
     TxnCtxt txn;
     txn.begin(env, true);
@@ -428,8 +433,10 @@
     txn.commit();
 }
 
-void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q, 
-                             const std::string& k, const FieldTable&)
+void BdbMessageStore::unbind(const PersistableExchange& e,
+                             const PersistableQueue& q,
+                             const std::string& k,
+                             const FieldTable&)
 {
     checkInit();
     deleteBinding(e, q, k);
@@ -450,7 +457,7 @@
     try {
         //read all queues, calls recoversMessages
         recoverQueues(txn, registry, queues, prepared, messages);
-        
+
         //recover exchange & bindings:
         recoverExchanges(txn, registry, exchanges);
         recoverBindings(txn, exchanges, queues);
@@ -466,35 +473,45 @@
     }
 
     //recover transactions:
-    for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {        
-        
+    for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
         TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+        std::auto_ptr<TPCTransactionContext> txn(tpcc);
 
+        tpcc->prepare(preparedXidStorePtr.get());
+
         // Restore data token state in TxnCtxt
-        xid_rid_map_citr citr = preparedMap.find(i->xid);
-        if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedMap");
-        tpcc->recoverDtok(citr->second, i->xid);
-        tpcc->addXidRecord(preparedXidStorePtr.get());
+        PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
+        if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+        tpcc->recoverDtok(citr->second.rid, i->xid);
 
-        RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
+        // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+        // was interrupted part way through committing/aborting the impacted queues. Complete this process.
+        bool incomplTxnFlag = citr->second.deq_flag;
+
+        RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, txn);
         if (i->enqueues.get()) {
             for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
                 tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
-                dtx->enqueue(queues[j->first], messages[j->second]);
+                if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
             }
         }
         if (i->dequeues.get()) {
             for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
                 tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
-                dtx->dequeue(queues[j->first], messages[j->second]);
+                if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
             }
         }
+
+        if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
     }
     registry.recoveryComplete();
 }
 
-void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
-                                    prepared, message_index& messages)
+void BdbMessageStore::recoverQueues(TxnCtxt& txn,
+                                    RecoveryManager& registry,
+                                    queue_index& queue_index,
+                                    txn_list& prepared,
+                                    message_index& messages)
 {
     Cursor queues;
     queues.open(queueDb, txn.get());
@@ -510,7 +527,7 @@
         RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
         //set the persistenceId and update max as required
         queue->setPersistenceId(key.id);
-	
+
         const char* queueName = queue->getName().c_str();
         JournalImpl* jQueue = 0;
         {
@@ -518,14 +535,14 @@
             jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
         }
         queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-	
+
         try
         {
             u_int64_t thisHighestRid = 0;
             jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
             if (thisHighestRid > highestRid)
                 highestRid = thisHighestRid;
-            recoverMessages(txn, registry, queue, prepared, messages); 
+            recoverMessages(txn, registry, queue, prepared, messages);
             jQueue->recover_complete(); // start journal.
         } catch (const journal::jexception& e) {
             THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -544,7 +561,9 @@
 }
 
 
-void BdbMessageStore::recoverExchanges(TxnCtxt& txn, RecoveryManager& registry, exchange_index& index)
+void BdbMessageStore::recoverExchanges(TxnCtxt& txn,
+                                       RecoveryManager& registry,
+                                       exchange_index& index)
 {
     //TODO: this is a copy&paste from recoverQueues - refactor!
     Cursor exchanges;
@@ -566,13 +585,15 @@
     exchangeIdSequence.reset(maxExchangeId + 1);
 }
 
-void BdbMessageStore::recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues)
+void BdbMessageStore::recoverBindings(TxnCtxt& txn,
+                                      exchange_index& exchanges,
+                                      queue_index& queues)
 {
     Cursor bindings;
     bindings.open(bindingDb, txn.get());
 
     IdDbt key;
-    Dbt value;        
+    Dbt value;
     while (bindings.next(key, value)) {
         Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
         if (buffer.available() < 8) {
@@ -582,7 +603,7 @@
         uint64_t queueId = buffer.getLongLong();
         string queueName;
         string routingkey;
-        FieldTable args;        
+        FieldTable args;
         buffer.getShortString(queueName);
         buffer.getShortString(routingkey);
         buffer.get(args);
@@ -599,7 +620,8 @@
     }
 }
 
-void BdbMessageStore::recoverGeneral(TxnCtxt& txn, RecoveryManager& registry)
+void BdbMessageStore::recoverGeneral(TxnCtxt& txn,
+                                     RecoveryManager& registry)
 {
     Cursor items;
     items.open(generalDb, txn.get());
@@ -620,16 +642,18 @@
 }
 
 
-void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery, 
-                                      qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
+                                      qpid::broker::RecoveryManager& recovery,
+                                      qpid::broker::RecoverableQueue::shared_ptr& queue,
+                                      txn_list& prepared,
+                                      message_index& messages)
 {
+    size_t preambleLength = sizeof(u_int32_t)/*header size*/;
 
-    size_t preambleLength = sizeof(u_int32_t)/*header size*/;
- 
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
     DataTokenImpl dtokp;
     size_t readSize = 0;
-    unsigned msg_count=0;
+    unsigned msg_count = 0;
 
     // TODO: This optimization to skip reading if there are no enqueued messages to read
     // breaks the python system test in phase 6 with "Exception: Cannot write lock file"
@@ -642,58 +666,63 @@
     bool transientFlag = false;
     bool externalFlag = false;
 
+    dtokp.set_wstate(DataTokenImpl::ENQ);
 
-    dtokp.set_wstate(DataTokenImpl::ENQ);
-    // read the message from the Journal.
+    // Read the message from the Journal.
     try {
-        
-        //std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
-		
         unsigned aio_sleep_cnt = 0;
         while (read) {
             rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
             readSize = dtokp.dsize();
-		
+
             switch (res)
             {
-              case rhm::journal::RHM_IORES_SUCCESS:{
-                  msg_count++;
-                  RecoverableMessage::shared_ptr msg;
-                  char* data = (char*)dbuff;
-					
-                  unsigned headerSize;
-                  if (externalFlag){
-                      msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl 
-                  } else {
-                      headerSize = Buffer(data, preambleLength).getLong();
-                      Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
-                      msg = recovery.recoverMessage(headerBuff);
-                  }
-                  msg->setPersistenceId(dtokp.rid());
-				 
-                  u_int32_t contentOffset = headerSize + preambleLength;
-                  u_int64_t contentSize = readSize - contentOffset;
-                  if (msg->loadContent(contentSize) && !externalFlag) {
-                      //now read the content
-                      Buffer contentBuff(data + contentOffset, contentSize);
-                      msg->decodeContent(contentBuff);
-                  }
+              case rhm::journal::RHM_IORES_SUCCESS: {
+                msg_count++;
+                RecoverableMessage::shared_ptr msg;
+                char* data = (char*)dbuff;
 
-                  if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
-                      prepared[dtokp.rid()] = msg;
-                  } else {
-                      queue->recover(msg);
-                  }
-    
-                  dtokp.reset();
-                  dtokp.set_wstate(DataTokenImpl::ENQ);
-					
-                  if (xidbuff)
-                      ::free(xidbuff);
-                  else if (dbuff)
-                      ::free(dbuff);
-                  aio_sleep_cnt = 0;
-                  break;
+                unsigned headerSize;
+                if (externalFlag) {
+                    msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+                } else {
+                    headerSize = Buffer(data, preambleLength).getLong();
+                    Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+                    msg = recovery.recoverMessage(headerBuff);
+                }
+                msg->setPersistenceId(dtokp.rid());
+
+                u_int32_t contentOffset = headerSize + preambleLength;
+                u_int64_t contentSize = readSize - contentOffset;
+                if (msg->loadContent(contentSize) && !externalFlag) {
+                    //now read the content
+                    Buffer contentBuff(data + contentOffset, contentSize);
+                    msg->decodeContent(contentBuff);
+                }
+
+                if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
+                    std::string xid((char*)xidbuff, xidbuffSize);
+                    PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
+                    if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+                    if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+                        if (citr->second.commit_flag)
+                            queue->recover(msg);
+                    } else {
+                        messages[dtokp.rid()] = msg;
+                    }
+                } else {
+                    queue->recover(msg);
+                }
+
+                dtokp.reset();
+                dtokp.set_wstate(DataTokenImpl::ENQ);
+
+                if (xidbuff)
+                    ::free(xidbuff);
+                else if (dbuff)
+                    ::free(dbuff);
+                aio_sleep_cnt = 0;
+                break;
               }
               case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
                   if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
@@ -702,7 +731,7 @@
                   break;
               case rhm::journal::RHM_IORES_EMPTY:
                   read = false;
-                  break; // done with all messages. ((add call in jrnl to test that _emap is empty. 
+                  break; // done with all messages. (add call in jrnl to test that _emap is empty.)
               default:
                   assert( "Store Error: Unexpected msg state");
             } // switch
@@ -713,33 +742,37 @@
     }
 }
 
-RecoverableMessage::shared_ptr  BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery, 
-                                                                  uint64_t messageId, unsigned& headerSize)
+RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
+                                                                 uint64_t messageId,
+                                                                 unsigned& headerSize)
 {
     Dbt key (&messageId, sizeof(messageId));
-    size_t preamble_length = sizeof(u_int32_t)/*header size*/;
+    size_t preamble_length = sizeof(u_int32_t); /*header size*/
 
     BufferValue value(preamble_length, 0);
     value.buffer.record();
     if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
-        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
+        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
     }
     //read header only to begin with
     headerSize = value.buffer.getLong();
 
     BufferValue header(headerSize, preamble_length);
     if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
-        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
+        THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
     }
 
     return recovery.recoverMessage(header.buffer);
-} 
+}
 
 
 
-int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId, RecoverableMessage::shared_ptr& msg, 
-                                    queue_index& index, txn_list& locked, 
-                                    message_index& prepared)
+int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
+                                    IdDbt& msgId,
+                                    RecoverableMessage::shared_ptr& msg,
+                                    queue_index& index,
+                                    txn_list& prepared,
+                                    message_index& messages)
 {
     Cursor mappings;
     mappings.open(mappingDb, txn.get());
@@ -751,10 +784,10 @@
         if (index.find(value.id) == index.end()) {
             QPID_LOG(warning, "Recovered message for queue that no longer exists");
             mappings->del(0);
-        } else {            
+        } else {
             RecoverableQueue::shared_ptr queue = index[value.id];
-            if (PreparedTransaction::isLocked(locked, value.id, msgId.id)) {
-                prepared[msgId.id] = msg;
+            if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
+                messages[msgId.id] = msg;
             } else {
                 queue->recover(msg);
             }
@@ -765,22 +798,69 @@
     return count;
 }
 
+void BdbMessageStore::recoverPreparedXidJournal()
+{
+    if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+        u_int64_t thisHighestRid;
+        preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+                JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
+                0, thisHighestRid, 0);
+        if (thisHighestRid > highestRid)
+            highestRid = thisHighestRid;
+        preparedXidStorePtr->recover_complete(); // start journal.
+    }
+}
+
+void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+{
+    if (preparedXidStorePtr.get()) {
+        if (!preparedXidStorePtr->is_ready())
+            recoverPreparedXidJournal();
+
+        // TODO: The journal will return a const txn_map and the txn_map will support
+        // const operations at some point. Using non-const txn_map this way is ugly...
+        journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+        std::vector<std::string> xidList;
+        tmap.xid_list(xidList);
+        for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
+            journal::txn_data_list txnList = tmap.get_tdata_list(*i);
+            unsigned enqCnt = 0;
+            unsigned deqCnt = 0;
+            u_int64_t rid = 0;
+            bool commitFlag = false;
+            for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+                if (j->_enq_flag) {
+                    rid = j->_rid;
+                    enqCnt++;
+                } else {
+                    commitFlag = j->_commit_flag;
+                    deqCnt++;
+                }
+            }
+            assert(enqCnt == 1);
+            assert(deqCnt <= 1);
+            prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+        }
+    }
+}
+
 void BdbMessageStore::recoverXids(txn_list& txns)
 {
-    std::set<string> preparedXidSet;
-    collectPreparedXids(preparedXidSet);
+    if (!preparedXidStorePtr->is_ready())
+        getPreparedXidMap(preparedXidStoreRecoverMap);
 
     // Abort unprepaired xids and populate the locked maps
-    for (std::set<string>::iterator i = preparedXidSet.begin(); i != preparedXidSet.end(); i++) {
+    for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
         LockedMappings::shared_ptr enq_ptr;
         enq_ptr.reset(new LockedMappings);
         LockedMappings::shared_ptr deq_ptr;
         deq_ptr.reset(new LockedMappings);
-        txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+        txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
     }
 }
 
-void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
+void BdbMessageStore::readLockedMappings(Db& db,
+                                         txn_lock_map& mappings)
 {
     Cursor c;
     c.open(db, 0);
@@ -795,54 +875,10 @@
 
 void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
 {
-    if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
-    {
-        u_int64_t thisHighestRid;
-        preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
-                JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
-                0, thisHighestRid, 0);
-        if (thisHighestRid > highestRid)
-            highestRid = thisHighestRid;
-        try {
-            void* dbuff = NULL; size_t dbuffSize = 0;
-            void* xidbuff = NULL; size_t xidbuffSize = 0;
-            bool transientFlag = false;
-            bool externalFlag = false;
-            DataTokenImpl dtokp;
-            bool done = false;
-            long aio_sleep_cnt = 0;
-            while (!done) {
-                dtokp.reset();
-                dtokp.set_wstate(DataTokenImpl::ENQ);
-                rhm::journal::iores res = preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
-                switch (res) {
-                    case rhm::journal::RHM_IORES_SUCCESS:
-                        if (xidbuffSize > 0) {
-                            xids.insert(std::string((const char*)xidbuff, xidbuffSize));
-                            preparedMap[std::string((const char*)xidbuff, xidbuffSize)] = dtokp.rid();
-                            ::free(xidbuff);
-                        } else {
-                            THROW_STORE_EXCEPTION("No XID found in BdbMessageStore::collectPreparedXids()");
-                        }
-                        aio_sleep_cnt = 0;
-                        break;
-                    case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
-                        if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                            THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::collectPreparedXids()");
-                        ::usleep(AIO_SLEEP_TIME);
-                        break;
-                    case rhm::journal::RHM_IORES_EMPTY:
-                        done = true;
-                        break;
-                    default:
-                        assert( "Store Error: Unexpected msg state");
-                }
-            }
-        } catch (const journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("Prepared XID journal: collectPreparedXids() failed: ") + e.what());
-        }
-
-        preparedXidStorePtr->recover_complete(); // start journal.
+    if (!preparedXidStorePtr->is_ready())
+        getPreparedXidMap(preparedXidStoreRecoverMap);
+    for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+        xids.insert(i->first);
     }
 }
 
@@ -864,8 +900,9 @@
             txn.abort();
             throw;
         }
-    }        
+    }
 }
+
 void BdbMessageStore::destroy(PersistableMessage& msg)
 {
     checkInit();
@@ -880,17 +917,19 @@
         } catch (const DbException& e) {
             txn.abort();
             THROW_STORE_EXCEPTION_2("Error destroying message", e);
-        }            
+        }
     }
 }
 
-
-u_int64_t BdbMessageStore::getRecordSize(Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(Db& db,
+                                         Dbt& key)
 {
     return getRecordSize(0, db, key);
 }
 
-u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn, Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn,
+                                         Db& db,
+                                         Dbt& key)
 {
     Dbt peek;
     peek.set_flags(DB_DBT_USERMEM);
@@ -906,7 +945,8 @@
     return peek.get_size();
 }
 
-void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
+void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg,
+                                    const std::string& data)
 {
     checkInit();
     u_int64_t messageId (msg->getPersistenceId());
@@ -932,28 +972,31 @@
         }
     } else {
         THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
-    }    
+    }
 }
 
 void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
-                                  intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+                                  intrusive_ptr<const PersistableMessage>& msg,
+                                  std::string& data,
+                                  u_int64_t offset,
+                                  u_int32_t length)
 {
     checkInit();
-    u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
+    u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/ + msg->encodedHeaderSize();
     u_int64_t messageId (msg->getPersistenceId());
-	
+
     if (messageId != 0) {
         try {
             JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
-            if (jc && jc->is_enqueued(messageId) ){
-                if (jc->loadMsgContent(messageId, data, realOffset, length)){
+            if (jc && jc->is_enqueued(messageId) ) {
+                if (jc->loadMsgContent(messageId, data, realOffset, length)) {
                     return;
                 }
             }
         } catch (const journal::jexception& e) {
             THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
                                   ": loadContent() failed: " + e.what());
-        }		
+        }
         TxnCtxt txn;
         txn.begin(env, true);
         try {
@@ -964,11 +1007,11 @@
             value.set_ulen(length);
             value.set_doff(realOffset);
             value.set_dlen(length);
-            int status = messageDb.get(txn.get(), &key, &value, 0); 
+            int status = messageDb.get(txn.get(), &key, &value, 0);
             if (status == DB_NOTFOUND) {
                 txn.abort();
                 delete [] buffer;
-                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");                
+                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
             } else {
                 txn.commit();
                 data.assign(buffer, value.get_size());
@@ -989,11 +1032,11 @@
     std::string qn = queue.getName();
     try {
         JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
-        if (jc){
+        if (jc) {
             // TODO: check if this result should be used...
             /*rhm::journal::iores res =*/ jc->flush();
         }
-    }catch (const journal::jexception& e) {
+    } catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
     }
 }
@@ -1029,14 +1072,14 @@
     if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 }
 
-void BdbMessageStore::store(const PersistableQueue* queue, 
-                            TxnCtxt* txn, Dbt& messageId, 
-                            intrusive_ptr<PersistableMessage>& message, 
+void BdbMessageStore::store(const PersistableQueue* queue,
+                            TxnCtxt* txn, Dbt& messageId,
+                            intrusive_ptr<PersistableMessage>& message,
                             bool newId)
 {
     u_int32_t headerSize = message->encodedHeaderSize();
     u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
-    char* buff= 0;
+    char* buff = 0;
     if (!message->isContentReleased() )
     {
         buff = static_cast<char*>(::alloca(size)); // long + headers + content
@@ -1046,23 +1089,22 @@
     }
 
     try {
-
-        if ( queue ) {
+        if (queue) {
             boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
             dtokp->addRef();
 	        dtokp->setSourceMessage(message);
             dtokp->set_external_rid(true);
 	        dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
-            
+
             JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-            if (txn->getXid().empty()){
-                if (message->isContentReleased()){
+            if (txn->getXid().empty()) {
+                if (message->isContentReleased()) {
                     jc->enqueue_extern_data_record(size, dtokp.get(), false);
                 } else {
                     jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
                 }
-            }else {
-                if (message->isContentReleased()){
+            } else {
+                if (message->isContentReleased()) {
                     jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
                 } else {
                     jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
@@ -1083,7 +1125,7 @@
 
 void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
                               const PersistableQueue& queue)
-{    
+{
     checkInit();
     u_int64_t queueId (queue.getPersistenceId());
     u_int64_t messageId (msg->getPersistenceId());
@@ -1101,28 +1143,27 @@
     } else {
         txn = &implicit;
     }
-    
+
     // add queue* to the txn map..
     if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-    async_dequeue(ctxt, msg, queue); 
-			
+    async_dequeue(ctxt, msg, queue);
+
     msg->dequeueComplete();
 }
 
-void BdbMessageStore::async_dequeue(
-    TransactionContext* ctxt,
-    intrusive_ptr<PersistableMessage>& msg,
-    const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(TransactionContext* ctxt,
+                                    intrusive_ptr<PersistableMessage>& msg,
+                                    const PersistableQueue& queue)
 {
     boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
     ddtokp->addRef();
     ddtokp->setSourceMessage(msg);
     ddtokp->set_external_rid(true);
-    ddtokp->set_rid(messageIdSequence.next()); 
+    ddtokp->set_rid(messageIdSequence.next());
     ddtokp->set_dequeue_rid(msg->getPersistenceId());
     ddtokp->set_wstate(DataTokenImpl::ENQ);
     string tid;
-    if (ctxt){
+    if (ctxt) {
         TxnCtxt* txn = check(ctxt);
         tid = txn->getXid();
     }
@@ -1133,7 +1174,7 @@
         } else {
             jc->dequeue_txn_data_record(ddtokp.get(), tid);
         }
-    } catch (const journal::jexception& e) { 
+    } catch (const journal::jexception& e) {
         THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
     }
 }
@@ -1144,15 +1185,17 @@
     return 0;
 }
 
-bool BdbMessageStore::deleteIfUnused(DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(DbTxn* txn,
+                                     Dbt& messageId)
 {
     Cursor cursor;
     cursor.open(mappingDb, txn);
-
     return deleteIfUnused(cursor, txn, messageId);
 }
 
-bool BdbMessageStore::deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(Cursor& cursor,
+                                     DbTxn* txn,
+                                     Dbt& messageId)
 {
     if (isUnused(cursor, messageId)) {
         messageDb.del(txn, &messageId, 0);
@@ -1175,7 +1218,8 @@
     }
 }
 
-void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn,
+                                bool commit)
 {
     try {
         // Nothing to do if not prepared
@@ -1185,7 +1229,7 @@
             DataTokenImpl* dtokp = txn.getDtok();
             dtokp->set_dequeue_rid(dtokp->rid());
             dtokp->set_rid(messageIdSequence.next());
-            preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid());
+            preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
         }
         txn.complete(commit);
     } catch (const std::exception& e) {
@@ -1194,7 +1238,7 @@
     }
 }
 
-auto_ptr<TransactionContext> BdbMessageStore::begin() 
+auto_ptr<TransactionContext> BdbMessageStore::begin()
 {
     checkInit();
     // pass sequence number for c/a
@@ -1214,7 +1258,7 @@
     checkInit();
     TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
     if(!txn) throw InvalidTransactionContextException();
-    
+
     try {
         chkInitPreparedXidStore();
         txn->incrDtokRef();
@@ -1222,8 +1266,7 @@
         dtokp->set_external_rid(true);
         dtokp->set_rid(messageIdSequence.next());
         preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
-        txn->addXidRecord(preparedXidStorePtr.get());
-
+        txn->prepare(preparedXidStorePtr.get());
         // make sure all the data is written to disk before returning
         txn->sync();
     } catch (const std::exception& e) {
@@ -1232,18 +1275,18 @@
     }
 }
 
-void BdbMessageStore::commit(TransactionContext& ctxt) 
+void BdbMessageStore::commit(TransactionContext& ctxt)
 {
     checkInit();
     TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
-        completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);        
+        completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
     } else {
         txn->complete(true);
     }
 }
 
-void BdbMessageStore::abort(TransactionContext& ctxt) 
+void BdbMessageStore::abort(TransactionContext& ctxt)
 {
     checkInit();
     TxnCtxt* txn(check(&ctxt));
@@ -1261,7 +1304,10 @@
     return txn;
 }
 
-void BdbMessageStore::put(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+void BdbMessageStore::put(Db& db,
+                          DbTxn* txn,
+                          Dbt& key,
+                          Dbt& value)
 {
     try {
         int status = db.put(txn, &key, &value, DB_NODUPDATA);
@@ -1275,8 +1321,10 @@
     }
 }
 
- 
-bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+bool BdbMessageStore::deleteKeyValuePair(Db& db,
+                                         DbTxn* txn,
+                                         Dbt& key,
+                                         Dbt& value)
 {
     Cursor cursor;
     cursor.open(db, txn);
@@ -1298,9 +1346,9 @@
     try {
         Cursor bindings;
         bindings.open(bindingDb, txn.get());
-        
+
         IdDbt key;
-        Dbt value;        
+        Dbt value;
         while (bindings.next(key, value)) {
             Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
             if (buffer.available() < 8) {
@@ -1319,16 +1367,18 @@
     QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
 }
 
-void BdbMessageStore::deleteBinding(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& bkey)
+void BdbMessageStore::deleteBinding(const PersistableExchange& exchange,
+                                    const PersistableQueue& queue,
+                                    const std::string& bkey)
 {
     TxnCtxt txn;
     txn.begin(env, true);
     try {
         Cursor bindings;
         bindings.open(bindingDb, txn.get());
-        
+
         IdDbt key(exchange.getPersistenceId());
-        Dbt value;        
+        Dbt value;
 
         for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
             Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
@@ -1353,21 +1403,21 @@
     txn.commit();
 }
 
-string BdbMessageStore::getJrnlBaseDir() 
+string BdbMessageStore::getJrnlBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/jrnl/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getBdbBaseDir() 
+string BdbMessageStore::getBdbBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/dat/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getPxidBaseDir() 
+string BdbMessageStore::getPxidBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/pxid/" ;
@@ -1386,19 +1436,19 @@
     dir << std::setw(4);
     dir << std::setfill('0');
     u_int32_t count = 0;
-    for (u_int32_t i=0; i < strlen(queueName); i++)
-    	count += queueName[i];
-    
-    dir << (count%20);
+    for (u_int32_t i = 0; i < strlen(queueName); i++) {
+        count += queueName[i];
+    }
+    dir << (count % 20);
     dir << "/" << queueName << "/";
     return dir.str();
 }
 
 BdbMessageStore::Options::Options(const std::string& name) :
-    qpid::Options(name),
-    numJrnlFiles(8),
-    jrnlFsizePgs(24),
-    wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
+                                  qpid::Options(name),
+                                  numJrnlFiles(8),
+                                  jrnlFsizePgs(24),
+                                  wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
 {
     addOptions()
         ("store-dir", qpid::optValue(storeDir, "DIR"),

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 17:05:31 UTC (rev 2198)
@@ -2,7 +2,7 @@
     Copyright (C) 2007 Red Hat Software
 
     This file is part of Red Hat Messaging.
-    
+
     Red Hat Messaging is free software; you can redistribute it and/or
     modify it under the terms of the GNU Lesser General Public
     License as published by the Free Software Foundation; either
@@ -53,15 +53,25 @@
          */
         class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
         {
+          protected:
             typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
             typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
             typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
-            
+
             typedef LockedMappings::map txn_lock_map;
             typedef boost::ptr_list<PreparedTransaction> txn_list;
 
-            typedef std::map<std::string, u_int64_t> xid_rid_map;
-            typedef xid_rid_map::const_iterator xid_rid_map_citr;
+            // Structs for preparedXidStore recover state
+            struct PreparedRecoverStruct {
+                u_int64_t rid;
+                bool deq_flag;
+                bool commit_flag;
+                PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+            };
+            typedef PreparedRecoverStruct PreparedRecover;
+            typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
+            typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
+            typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
 
             // Default store settings
             static const u_int16_t defNumJrnlFiles = 8;
@@ -80,7 +90,11 @@
             Db mappingDb;
             Db bindingDb;
             Db generalDb;
+
+            // Pointer to prepared XID journal instance
             boost::shared_ptr<JournalImpl> preparedXidStorePtr;
+            PreparedRecoverMap preparedXidStoreRecoverMap;
+
             IdSequence queueIdSequence;
             IdSequence exchangeIdSequence;
             IdSequence generalIdSequence;
@@ -90,7 +104,6 @@
             u_int32_t jrnlFsizePgs;
             u_int32_t wcache_pgsize_sblks;
             u_int16_t wcache_num_pages;
-            xid_rid_map preparedMap;
             u_int64_t highestRid;
 			bool isInit;
 			const char* envPath;
@@ -99,57 +112,99 @@
             qpid::management::Store::shared_ptr mgmtObject;
             qpid::sys::Mutex jrnlCreateLock;
 
-            bool mode(const bool mode, const bool force);
-            void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
-	    	        txn_list& locked, message_index& messages);
-            void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index, 
-                    txn_list& locked, message_index& prepared);
-            void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, 
-	    	        qpid::broker::RecoverableQueue::shared_ptr& queue, 
-                    txn_list& locked, message_index& prepared);
-            qpid::broker::RecoverableMessage::shared_ptr  getExternMessage(qpid::broker::RecoveryManager& recovery, 
-			        uint64_t mId, unsigned& headerSize);
-            void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
-            void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
-            void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
-            int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg, 
-                    queue_index& index, txn_list& locked, message_index& prepared);
+            void recoverQueues(TxnCtxt& txn,
+                               qpid::broker::RecoveryManager& recovery,
+                               queue_index& index,
+                               txn_list& locked,
+                               message_index& messages);
+            void recoverMessages(TxnCtxt& txn,
+                                 qpid::broker::RecoveryManager& recovery,
+                                 queue_index& index,
+                                 txn_list& locked,
+                                 message_index& prepared);
+            void recoverMessages(TxnCtxt& txn,
+                                 qpid::broker::RecoveryManager& recovery,
+                                 qpid::broker::RecoverableQueue::shared_ptr& queue,
+                                 txn_list& locked,
+                                 message_index& prepared);
+            qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+                                                                          uint64_t mId, unsigned& headerSize);
+            void recoverExchanges(TxnCtxt& txn,
+                                  qpid::broker::RecoveryManager& recovery,
+                                  exchange_index& index);
+            void recoverBindings(TxnCtxt& txn,
+                                 exchange_index& exchanges,
+                                 queue_index& queues);
+            void recoverGeneral(TxnCtxt& txn,
+                                qpid::broker::RecoveryManager& recovery);
+            int enqueueMessage(TxnCtxt& txn,
+                               IdDbt& msgId,
+                               qpid::broker::RecoverableMessage::shared_ptr& msg,
+                               queue_index& index,
+                               txn_list& locked,
+                               message_index& prepared);
+            void recoverPreparedXidJournal();
+            void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
             void recoverXids(txn_list& txns);
-            void readLockedMappings(Db& db, txn_lock_map& mappings);
+            void readLockedMappings(Db& db,
+                                    txn_lock_map& mappings);
             TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
-            void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn, 
-	    	        Dbt& messageId, 
-			        boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
-			        bool newId);
-            void async_dequeue(qpid::broker::TransactionContext* ctxt, 
-	    	        boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, 
-			        const qpid::broker::PersistableQueue& queue);
-            bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
-            bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
-            bool isUnused(Cursor& cursor, Dbt& messageId);
-            void destroy(Db& db, const qpid::broker::Persistable& p);
-            bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
-            void completed(TPCTxnCtxt& txn, bool commit);
-            void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+            void store(const qpid::broker::PersistableQueue* queue,
+                       TxnCtxt* txn,
+                       Dbt& messageId,
+                       boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+                       bool newId);
+            void async_dequeue(qpid::broker::TransactionContext* ctxt,
+                               boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                               const qpid::broker::PersistableQueue& queue);
+            bool deleteIfUnused(Cursor& cursor,
+                                DbTxn* txn,
+                                Dbt& messageId);
+            bool deleteIfUnused(DbTxn* txn,
+                                Dbt& messageId);
+            bool isUnused(Cursor& cursor,
+                          Dbt& messageId);
+            void destroy(Db& db,
+                         const qpid::broker::Persistable& p);
+            bool create(Db& db,
+                        IdSequence& seq,
+                        const qpid::broker::Persistable& p);
+            void completed(TPCTxnCtxt& txn,
+                           bool commit);
+            void record2pcOp(Db& db,
+                             TPCTxnCtxt& txn,
+                             u_int64_t messageId,
+                             u_int64_t queueId);
             void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
-            void deleteBinding(const qpid::broker::PersistableExchange& exchange, 
-                               const qpid::broker::PersistableQueue& queue, 
+            void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+                               const qpid::broker::PersistableQueue& queue,
                                const std::string& key);
 
-            u_int64_t getRecordSize(Db& db, Dbt& key);
-            u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
-            void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
-            bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
-            void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
-	    
-	    
+            u_int64_t getRecordSize(Db& db,
+                                    Dbt& key);
+            u_int64_t getRecordSize(DbTxn* txn,
+                                    Db& db,
+                                    Dbt& key);
+            void put(Db& db,
+                     DbTxn* txn,
+                     Dbt& key,
+                     Dbt& value);
+            bool deleteKeyValuePair(Db& db,
+                                    DbTxn* txn,
+                                    Dbt& key,
+                                    Dbt& value);
+            void open(Db& db,
+                      DbTxn* txn,
+                      const char* file,
+                      bool dupKey);
+
 	    	// journal functions
 	  	  	void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
             std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
             std::string getJrnlDir(const char* queueName);
-            std::string getJrnlBaseDir(); 
-            std::string getBdbBaseDir(); 
-            std::string getPxidBaseDir(); 
+            std::string getJrnlBaseDir();
+            std::string getBdbBaseDir();
+            std::string getPxidBaseDir();
 			inline void checkInit() {
                 if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
             }
@@ -183,51 +238,71 @@
             typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
 
             BdbMessageStore(const char* envpath = 0);
+
             virtual ~BdbMessageStore();
+
 			bool init(const qpid::Options* options);
-            bool init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+
+            bool init(const std::string& dir,
+                      u_int16_t jfiles,
+                      u_int32_t jfileSizePgs,
+                      uint32_t wCachePageSize);
+
             void initManagement (qpid::broker::Broker* broker);
 
             void truncate();
 
             void create(qpid::broker::PersistableQueue& queue,
                         const qpid::framing::FieldTable& args);
+
             void destroy(qpid::broker::PersistableQueue& queue);
 
             void create(const qpid::broker::PersistableExchange& queue,
                         const qpid::framing::FieldTable& args);
+
             void destroy(const qpid::broker::PersistableExchange& queue);
 
-            void bind(const qpid::broker::PersistableExchange& exchange, 
-                    const qpid::broker::PersistableQueue& queue, 
-                    const std::string& key, const qpid::framing::FieldTable& args);
-            void unbind(const qpid::broker::PersistableExchange& exchange, 
-                    const qpid::broker::PersistableQueue& queue, 
-                    const std::string& key, const qpid::framing::FieldTable& args);
+            void bind(const qpid::broker::PersistableExchange& exchange,
+                      const qpid::broker::PersistableQueue& queue,
+                      const std::string& key,
+                      const qpid::framing::FieldTable& args);
 
+            void unbind(const qpid::broker::PersistableExchange& exchange,
+                        const qpid::broker::PersistableQueue& queue,
+                        const std::string& key,
+                        const qpid::framing::FieldTable& args);
+
             void create(const qpid::broker::PersistableConfig& config);
+
             void destroy(const qpid::broker::PersistableConfig& config);
 
             void recover(qpid::broker::RecoveryManager& queues);
 
             void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
             void destroy(qpid::broker::PersistableMessage& msg);
-            void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+
+            void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                               const std::string& data);
+
             void loadContent(const qpid::broker::PersistableQueue& queue,
-                    boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, 
-			        std::string& data, u_int64_t offset, u_int32_t length);
+                             boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+                             std::string& data,
+                             u_int64_t offset,
+                             u_int32_t length);
 
-            void enqueue(qpid::broker::TransactionContext* ctxt, 
-                    boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, 
-                    const qpid::broker::PersistableQueue& queue);
-            void dequeue(qpid::broker::TransactionContext* ctxt, 
-                    boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg, 
-                    const qpid::broker::PersistableQueue& queue);
+            void enqueue(qpid::broker::TransactionContext* ctxt,
+                         boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                         const qpid::broker::PersistableQueue& queue);
+
+            void dequeue(qpid::broker::TransactionContext* ctxt,
+                         boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+                         const qpid::broker::PersistableQueue& queue);
+
             void flush(const qpid::broker::PersistableQueue& queue);
 
     	    u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
 
-
             void collectPreparedXids(std::set<std::string>& xids);
 
             std::auto_ptr<qpid::broker::TransactionContext> begin();
@@ -239,10 +314,12 @@
             qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
             { return mgmtObject; }
 
-            qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+            inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
             { return qpid::management::Manageable::STATUS_OK; }
-        };
-    }
-}
+        }; // class BdbMessageStore
+  
+    } // namespace bdbstore
+} // namespace rhm
 
+
 #endif

Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -166,8 +166,7 @@
     if (prep_tx_list_ptr) {
         // Create list of prepared xids
         std::vector<std::string> prep_xid_list;
-        for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin();
-                i != prep_tx_list_ptr->end(); i++) {
+        for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
             prep_xid_list.push_back(i->xid);
         }
 
@@ -347,9 +346,9 @@
 }
 
 void
-JournalImpl::dequeue_data_record(data_tok* const dtokp)
+JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
 {
-    handleIoResult(jcntl::dequeue_data_record(dtokp));
+    handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
     
     if (_mgmtObject.get() != 0)
     {
@@ -359,9 +358,9 @@
 }
 
 void
-JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
 {
-    handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid));
+    handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
     
     if (_mgmtObject.get() != 0)
     {

Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.h	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.h	2008-07-14 17:05:31 UTC (rev 2198)
@@ -156,9 +156,9 @@
             void enqueue_extern_txn_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
                     const std::string& xid, const bool transient = false);
 
-            void dequeue_data_record(journal::data_tok* const dtokp);
+            void dequeue_data_record(journal::data_tok* const dtokp, const bool txn_coml_commit = false);
 
-            void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid);
+            void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
 
             void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
 

Modified: store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -49,19 +49,12 @@
         if (options.storeDir.empty ())
         {
             if (!dataDir.isEnabled ())
-                throw Exception ("If --data-dir is blank or --no-data-dir is specified, "
-                                 "--store-directory must be present.");
+                throw Exception ("If --data-dir is blank or --no-data-dir is specified, --store-dir must be present.");
 
             options.storeDir = dataDir.getPath ();
         }
 
-        if (!store->init (&options))
-        {
-            throw Exception("Existing journal found in different bdb/async mode. "
-                            "Move or delete existing data files before changing modes, or use "
-                            "'--store-force yes' to discard existing data.");
-        }
-
+        store->init(&options);
         broker->setStore (store);
     }
 

Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 17:05:31 UTC (rev 2198)
@@ -47,11 +47,11 @@
 
 class TxnCtxt : public qpid::broker::TransactionContext
 {
-protected:
-
+  protected:
     static qpid::sys::Mutex globalSerialiser;
 
     typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
+    typedef ipqdef::iterator ipqItr;
     typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
 
     ipqdef impactedQueues; // list of Queues used in the txn
@@ -66,31 +66,34 @@
     std::string tid;
     DbTxn* txn;
 
-    void completeTXN(bool commit) {
+    virtual void completeTxn(bool commit) {
         sync();
-        for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
-            JournalImpl* jc = static_cast<JournalImpl*>(*i);
-            if (jc && loggedtx) { /* if using journal */
-                boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
-                dtokp->addRef();
-                dtokp->set_external_rid(true);
-                dtokp->set_rid(loggedtx->next());
-                try {
-                    if (commit) {
-                        jc->txn_commit(dtokp.get(), getXid());
-                        jc->flush(true);
-                    } else {
-                        jc->txn_abort(dtokp.get(), getXid());
-                    }
-                } catch (const journal::jexception& e) {
-                    THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+        for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+            commitTxn(static_cast<JournalImpl*>(*i), commit);
+        }
+        impactedQueues.clear();
+    }
+
+    void commitTxn(JournalImpl* jc, bool commit) {
+        if (jc && loggedtx) { /* if using journal */
+            boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+            dtokp->addRef();
+            dtokp->set_external_rid(true);
+            dtokp->set_rid(loggedtx->next());
+            try {
+                if (commit) {
+                    jc->txn_commit(dtokp.get(), getXid());
+                    jc->flush(true);
+                } else {
+                    jc->txn_abort(dtokp.get(), getXid());
                 }
+            } catch (const journal::jexception& e) {
+                THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
             }
         }
-        deleteXidRecord();
     }
 
-public:
+  public:
 
     TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
         if (loggedtx) {
@@ -119,22 +122,25 @@
             if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
             if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
             allWritten = true;
-            for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
-                JournalImpl* jc = static_cast<JournalImpl*>(*i);
-                try {
-                    if (jc && !(jc->is_txn_synced(getXid()))) {
-                        if (firstloop) jc->flush();
-                        allWritten = false;
-                        jc->get_wr_events();
-                    }
-                } catch (const journal::jexception& e) {
-                    THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
-                }
+            for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+                sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
             }
             firstloop = false;
         }
     }
 
+    void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
+        try {
+            if (jc && !(jc->is_txn_synced(getXid()))) {
+                if (firstloop) jc->flush();
+                allWritten = false;
+                jc->get_wr_events();
+            }
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+        }
+    }
+
     void begin(DbEnv& env, bool sync = false) {
         env.txn_begin(0, &txn, 0);
         if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
@@ -160,9 +166,8 @@
     virtual bool isTPC() { return false; }
     virtual const std::string& getXid() { return tid; }
     
-    void deleteXidRecord() { impactedQueues.clear(); }
     void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
-    void complete(bool commit) { completeTXN(commit); }
+    void complete(bool commit) { completeTxn(commit); }
     DataTokenImpl* getDtok() { return dtokp.get(); }
     void incrDtokRef() { dtokp->addRef(); }
     void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -175,11 +180,29 @@
 
 class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
 {
+  protected:
     const std::string xid;
-public:
-    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
-    virtual bool isTPC() { return true; }
-    virtual const std::string& getXid() { return xid; }
+    JournalImpl* preparedXidStorePtr;
+    virtual void completeTxn(bool commit) {
+        TxnCtxt::completeTxn(commit);
+        if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
+    }
+
+  public:
+    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
+    void sync() {
+        TxnCtxt::sync();
+        bool allWritten = false;
+        if (preparedXidStorePtr) {
+            while (!allWritten) {
+                allWritten = true;
+                sync_jrnl(preparedXidStorePtr, true, allWritten);
+            }
+        }
+    }
+    inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+    inline virtual bool isTPC() { return true; }
+    inline virtual const std::string& getXid() { return xid; }
 };
 
 }}

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -87,6 +87,7 @@
 #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
         u_int32_t _filler0;     ///< Little-endian filler for 32-bit size_t
 #endif
+        static const u_int16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
 
         /**
         * \brief Default constructor, which sets all values to 0.
@@ -105,7 +106,8 @@
         * \brief Convenience constructor which initializes values during construction.
         */
         inline deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
-                const u_int64_t deq_rid, const std::size_t xidsize, const bool owi):
+                const u_int64_t deq_rid, const std::size_t xidsize, const bool owi,
+                const bool txn_coml_commit = false):
                 rec_hdr(magic, version, rid, owi), _deq_rid(deq_rid),
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
             _filler0(0),
@@ -114,8 +116,17 @@
 #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
             , _filler0(0)
 #endif
-        {}
+        { set_txn_coml_commit(txn_coml_commit); }
 
+
+        inline bool is_txn_coml_commit() const { return _uflag & DEQ_HDR_TXNCMPLCOMMIT_MASK; }
+
+        inline void set_txn_coml_commit(const bool commit)
+        {
+            _uflag = commit ? _uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
+                    _uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+        }
+
         /**
         * \brief Returns the size of the header in bytes.
         */

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -52,8 +52,8 @@
 {}
 
 deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
-        const std::size_t xidlen, const bool owi):
-        _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi),
+        const std::size_t xidlen, const bool owi, const bool txn_coml_commit):
+        _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
         _xidp(xidp),
         _buff(0),
         _deq_tail(_deq_hdr)
@@ -68,6 +68,8 @@
 deq_rec::reset()
 {
     _deq_hdr._rid = 0;
+    _deq_hdr.set_owi(false);
+    _deq_hdr.set_txn_coml_commit(false);
     _deq_hdr._deq_rid = 0;
     _deq_hdr._xidsize = 0;
     _deq_tail._rid = 0;
@@ -77,10 +79,11 @@
 
 void
 deq_rec::reset(const  u_int64_t rid, const  u_int64_t drid, const void* const xidp,
-        const std::size_t xidlen, const bool owi)
+        const std::size_t xidlen, const bool owi, const bool txn_coml_commit)
 {
     _deq_hdr._rid = rid;
     _deq_hdr.set_owi(owi);
+    _deq_hdr.set_txn_coml_commit(txn_coml_commit);
     _deq_hdr._deq_rid = drid;
     _deq_hdr._xidsize = xidlen;
     _deq_tail._rid = rid;

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -67,20 +67,21 @@
         deq_rec();
         // constructor used for write operations, where xid already exists
         deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
-                const std::size_t xidlen, const bool owi);
+                const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
         virtual ~deq_rec();
 
         // Prepare instance for use in reading data from journal
         void reset();
         // Prepare instance for use in writing data to journal
         void reset(const  u_int64_t rid, const  u_int64_t drid, const void* const xidp,
-                const std::size_t xidlen, const bool owi);
+                const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
         u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks);
         u_int32_t decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks,
                 u_int32_t max_size_dblks);
         // Decode used for recover
         bool rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs);
 
+        inline bool is_txn_coml_commit() const { return _deq_hdr.is_txn_coml_commit(); }
         inline u_int64_t rid() const { return _deq_hdr._rid; }
         inline u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
         std::size_t get_xid(void** const xidpp);

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -338,25 +338,25 @@
 }
 
 iores
-jcntl::dequeue_data_record(data_tok* const dtokp)
+jcntl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
 {
     check_wstatus("dequeue_data");
     {
         slock s(&_wr_mutex);
         iores r;
-        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r, dtokp)) ;
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
         return r;
     }
 }
 
 iores
-jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
 {
     check_wstatus("dequeue_data");
     {
         slock s(&_wr_mutex);
         iores r;
-        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
         return r;
     }
 }
@@ -711,7 +711,8 @@
                     dr.get_xid(&xidp);
                     assert(xidp != 0);
                     std::string xid((char*)xidp, dr.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+                    _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false,
+                            dr.is_txn_coml_commit()));
                     _tmap.set_aio_compl(xid, dr.rid());
                     std::free(xidp);
                 }

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -428,10 +428,13 @@
         *
         * \param dtokp Pointer to data_tok instance for this data, used to track state of data
         *     through journal.
+        * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+        *     prepared XID list items, sets whether the complete() was called in commit or abort
+        *     mode.
         *
         * \exception TODO
         */
-        iores dequeue_data_record(data_tok* const dtokp);
+        iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);
 
         /**
         * \brief Dequeues (marks as no longer needed) data record in journal.
@@ -446,10 +449,13 @@
         *     through journal.
         * \param xid String containing xid. An empty string (i.e. length=0) will be considered
         *     non-transactional.
+        * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+        *     prepared XID list items, sets whether the complete() was called in commit or abort
+        *     mode.
         *
         * \exception TODO
         */
-        iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid);
+        iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
 
         /**
         * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -574,8 +580,7 @@
         * \return <b><i>true</i></b> if the jouranl is ready to read and write data;
         *     <b><i>false</i></b> otherwise.
         */
-        inline bool is_ready() const { return _init_flag and not _stop_flag; }
-        inline bool is_init() const { return _init_flag; }
+        inline bool is_ready() const { return _init_flag && !_stop_flag; }
 
         inline bool is_read_only() const { return _readonly_flag; }
 
@@ -602,7 +607,8 @@
         inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
         
         inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
-        void get_open_txn_list(std::vector<std::string>& xv) { _tmap.xid_list(xv); }
+        // TODO Make this a const, but txn_map must support const first.
+        inline txn_map& get_txn_map() { return _tmap; }
 
         // Logging
         virtual void log(log_level level, const std::string& log_stmt) const;

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -42,11 +42,12 @@
 {
 
 txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
-		const bool enq_flag):
+		const bool enq_flag, const bool commit_flag):
         _rid(rid),
         _drid(drid),
         _fid(fid),
         _enq_flag(enq_flag),
+        _commit_flag(commit_flag),
         _aio_compl(false)
 {}
 

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -64,9 +64,10 @@
         u_int64_t _drid;    ///< Dequeue record id for this operation
         u_int16_t _fid;     ///< File id, to be used when transferring to emap on commit
         bool _enq_flag;     ///< If true, enq op, otherwise deq op
+        bool _commit_flag;  ///< (2PC transactions) Records 2PC complete c/a mode
         bool _aio_compl;    ///< Initially false, set to true when record AIO returns
         txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
-                const bool enq_flag);
+                const bool enq_flag, const bool commit_flag = false);
     };
     typedef txn_data_struct txn_data;
     typedef std::vector<txn_data> txn_data_list;

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -256,7 +256,7 @@
 }
 
 iores
-wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit)
 {
     if (xid_len)
         assert(xid_ptr != 0);
@@ -284,7 +284,7 @@
     const bool ext_rid = dtokp->external_rid();
     u_int64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
     u_int64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
-    _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi());
+    _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit);
     if (!cont)
     {
 	    if (!ext_rid)

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -110,7 +110,8 @@
         iores enqueue(const void* const data_buff, const std::size_t tot_data_len,
                 const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
                 const std::size_t xid_len, const bool transient, const bool external);
-        iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
+        iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len,
+                const bool txn_coml_commit);
         iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
         iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
         iores flush();

Modified: store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -30,9 +30,9 @@
 #include "BdbMessageStore.h"
 #include <iostream>
 #include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
 
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
@@ -45,99 +45,171 @@
 const char* tdp = getenv("TMPDIR");
 const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
 
+// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+class TestTxnCtxt : public TxnCtxt
+{
+  public:
+    TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {}
+    void setCompleteFailure(const unsigned num_queues_rem) {
+        // Remove queue members from back of impactedQueues until queues_rem reamin.
+        // to end to simulate multi-queue txn complete failure.
+        while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+    }
+};
+
+// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+// reamining open transactions
+class TestMessageStore: public BdbMessageStore
+{
+  public:
+    TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+    std::auto_ptr<qpid::broker::TransactionContext> begin() {
+        checkInit();
+        // pass sequence number for c/a
+        return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
+    }
+};
+
 // === Helper fns ===
 
 const string nameA("queueA");
 const string nameB("queueB");
-const Uuid messageId(true);
+//const Uuid messageId(true);
 std::auto_ptr<BdbMessageStore> store;
-QueueRegistry queues;
+std::auto_ptr<QueueRegistry> queues;
 Queue::shared_ptr queueA;
 Queue::shared_ptr queueB;
 
+template <class T>
 void setup()
 {
-    store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+    store = std::auto_ptr<T>(new T());
     store->init(test_dir, 4, 1, 8);
     store->truncate();
 
     //create two queues:
+    FieldTable settings;
     queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
-    FieldTable settings;
     queueA->create(settings);
     queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));        
     queueB->create(settings);
-
-    //create message and enqueue it onto first queue:
-    boost::intrusive_ptr<Message> msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
-    msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
-    
-    queueA->deliver(msg);
 }
 
+template <class T>
 void restart()
 {
     queueA.reset();
     queueB.reset();
+    queues.reset();
     store.reset();
 
-    store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+    store = std::auto_ptr<T>(new T());
     store->init(test_dir, 4, 1, 8);
+    queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
     ExchangeRegistry exchanges;
     LinkRegistry links(0);
     DtxManager mgr;
     mgr.setStore (store.get());
-    RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
+    RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, 0);
     store->recover(recovery);
 
-    queueA = queues.find(nameA);
-    queueB = queues.find(nameB);
+    queueA = queues->find(nameA);
+    queueB = queues->find(nameB);
 }
 
-void check(bool swapped)
+boost::intrusive_ptr<Message> createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+{
+    boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, key);
+    msg->getProperties<MessageProperties>()->setCorrelationId(id);        
+    msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+    return msg;
+}
+
+void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
 {        
+    BOOST_REQUIRE(queue);
+    BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+    if (size > 0) {
+        boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+        BOOST_REQUIRE(msg);
+        BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+    }
+}
+
+void swap(bool commit)
+{
+    setup<BdbMessageStore>();
+
+    //create message and enqueue it onto first queue:
+    boost::intrusive_ptr<Message> msgA = createMessage("Message", "exchange", "routing_key");
+    queueA->deliver(msgA);
+
+    boost::intrusive_ptr<Message> msgB = queueA->dequeue().payload;
+    BOOST_REQUIRE(msgB);
+    //move the message from one queue to the other as a transaction
+    std::auto_ptr<TransactionContext> txn = store->begin();
+    queueB->enqueue(txn.get(), msgB);//note: need to enqueue it first to avoid message being deleted
+    queueA->dequeue(txn.get(), msgB);
+    if (commit) {
+        store->commit(*txn);
+    } else {
+        store->abort(*txn);
+    }
+
+    restart<BdbMessageStore>();
+
+    // Check outcome
     BOOST_REQUIRE(queueA);
     BOOST_REQUIRE(queueB);
     
-    Queue::shared_ptr x;//the other queue
+    Queue::shared_ptr x;//the queue from which the message was swapped
     Queue::shared_ptr y;//the queue on which the message is expected to be
     
-    if (swapped) {
+    if (commit) {
         x = queueA;
         y = queueB;
     } else {
         x = queueB;
         y = queueA;
     }
-
-    BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
-    BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
-    boost::intrusive_ptr<Message> msg = y->dequeue().payload;
-    BOOST_REQUIRE(msg);
-    BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+    
+    checkMsg(x, 0);
+    checkMsg(y, 1, "Message");
+    checkMsg(y, 0);
 }
 
-void swap(bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
 {
-    setup();
+    setup<TestMessageStore>();
+    std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
 
-    boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
-    BOOST_REQUIRE(msg);
-    //move the message from one queue to the other as a transaction
-    std::auto_ptr<TransactionContext> txn = store->begin();
-    queueB->enqueue(txn.get(), msg);//note: need to enqueue it first to avoid message being deleted
-    queueA->dequeue(txn.get(), msg);
-    if (commit) {
-       store->commit(*txn);
-    } else {
+    //create two messages and enqueue them onto both queues:
+    boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
+    queueA->enqueue(txn.get(), msgA);
+    queueB->enqueue(txn.get(), msgA);
+    boost::intrusive_ptr<Message> msgB = createMessage("MessageB", "exchange", "routing_key");
+    queueA->enqueue(txn.get(), msgB);
+    queueB->enqueue(txn.get(), msgB);
+
+    static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
+    if (commit)
+        store->commit(*txn);
+    else
         store->abort(*txn);
+    restart<TestMessageStore>();
+
+    // Check outcome
+    if (commit)
+    {
+        checkMsg(queueA, 2, "MessageA");
+        checkMsg(queueB, 2, "MessageA");
+        checkMsg(queueA, 1, "MessageB");
+        checkMsg(queueB, 1, "MessageB");
     }
-
-    restart();
-    check(commit);
+    checkMsg(queueA, 0);
+    checkMsg(queueB, 0);
 }
 
-
 // === Test suite ===
 
 QPID_AUTO_TEST_CASE(Commit)
@@ -153,5 +225,47 @@
     swap(false);
     cout << "ok" << endl;
 }
+/*
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+    testMultiQueueTxn(0, true);
+    cout << "ok" << endl;
+}
 
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+    testMultiQueueTxn(0, false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+    testMultiQueueTxn(1, true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+    testMultiQueueTxn(1, false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+    testMultiQueueTxn(2, true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+    testMultiQueueTxn(2, false);
+    cout << "ok" << endl;
+}
+*/
 QPID_AUTO_TEST_SUITE_END()

Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-14 17:05:31 UTC (rev 2198)
@@ -30,9 +30,11 @@
 #include "BdbMessageStore.h"
 #include <iostream>
 #include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "TxnCtxt.h"
 
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
@@ -67,8 +69,8 @@
     public:
         Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
         void init(){ msg = test->deliver(messageId, test->queueA); }
-        void run(TPCTransactionContext* txn) { test->swap(txn); }
-        void check(bool committed) { test->swapCheck(committed, messageId); }
+        void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); }
+        void check(bool committed) { test->swapCheck(committed, messageId,  test->queueA, test->queueB); }
     };
 
     class Enqueue : public Strategy
@@ -81,17 +83,17 @@
         Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
         void init() {}
         void run(TPCTransactionContext* txn) { 
-            msg1 = test->enqueue(txn, "Enqueue1"); 
-            msg2 = test->enqueue(txn, "Enqueue2"); 
-            msg3 = test->enqueue(txn, "Enqueue3"); 
+            msg1 = test->enqueue(txn, "Enqueue1", test->queueA); 
+            msg2 = test->enqueue(txn, "Enqueue2", test->queueA); 
+            msg3 = test->enqueue(txn, "Enqueue3", test->queueA); 
         }
         void check(bool committed) { 
             if (committed) {
-                test->checkA(3, "Enqueue1");
-                test->checkA(2, "Enqueue2");
-                test->checkA(1, "Enqueue3");
+                test->checkMsg(test->queueA, 3, "Enqueue1");
+                test->checkMsg(test->queueA, 2, "Enqueue2");
+                test->checkMsg(test->queueA, 1, "Enqueue3");
             } 
-            test->checkA(0);
+            test->checkMsg(test->queueA, 0);
         }
     };
 
@@ -109,26 +111,94 @@
             msg3 = test->deliver("Dequeue3", test->queueA); 
         }
         void run(TPCTransactionContext* txn) { 
-            test->dequeue(txn); 
-            test->dequeue(txn); 
-            test->dequeue(txn); 
+            test->dequeue(txn, test->queueA); 
+            test->dequeue(txn, test->queueA); 
+            test->dequeue(txn, test->queueA); 
         }
         void check(bool committed) { 
             if (!committed) {
-                test->checkA(3, "Dequeue1");
-                test->checkA(2, "Dequeue2");
-                test->checkA(1, "Dequeue3");
+                test->checkMsg(test->queueA, 3, "Dequeue1");
+                test->checkMsg(test->queueA, 2, "Dequeue2");
+                test->checkMsg(test->queueA, 1, "Dequeue3");
             } 
-            test->checkA(0);
+            test->checkMsg(test->queueA, 0);
         }
     };
 
+    class MultiQueueTxn : public Strategy
+    {
+        TwoPhaseCommitTest* const test;
+	    boost::intrusive_ptr<Message> msg1;
+	    boost::intrusive_ptr<Message> msg2;
+        std::set<Queue::shared_ptr> queueset;
+    public:
+        MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {}
+        virtual void init() {}
+        virtual void run(TPCTransactionContext* txn) {
+            queueset.insert(test->queueA);
+            queueset.insert(test->queueB);
+            msg1 = test->enqueue(txn, "Message1", queueset);
+            msg2 = test->enqueue(txn, "Message2", queueset);
+            queueset.clear();
+        }
+        virtual void check(bool committed) {
+            TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get());
+            if (committed)
+            {
+                test->checkMsg(test->queueA, 2, "Message1");
+                test->checkMsg(test->queueB, 2, "Message1");
+                test->checkMsg(test->queueA, 1, "Message2");
+                test->checkMsg(test->queueB, 1, "Message2");
+            }
+            test->checkMsg(test->queueA, 0);
+            test->checkMsg(test->queueB, 0);
+            // Check there are no remaining open txns in store
+            BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA)));
+            BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB)));
+            BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns());
+        }
+    };
+
+    // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+    class TestTPCTxnCtxt : public TPCTxnCtxt
+    {
+      public:
+        TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {}
+        void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) {
+            // Remove queue members from back of impactedQueues until queues_rem reamin.
+            // to end to simulate multi-queue txn complete failure.
+            while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+            // If prepared list is not to be committed, set pointer to 0
+            if (!complete_prepared_list) preparedXidStorePtr = 0;
+        }
+    };
+
+    // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+    // reamining open transactions
+    class TestMessageStore: public BdbMessageStore
+    {
+      public:
+        TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+        std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid) {
+            checkInit();
+            IdSequence* jtx = &messageIdSequence;
+            // pass sequence number for c/a
+            return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx));
+        }
+        u_int32_t getRemainingTxns(const PersistableQueue& queue) {
+            return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
+        }
+        u_int32_t getRemainingPreparedListTxns() {
+            return preparedXidStorePtr->get_open_txn_cnt();
+        }
+    };
+
     const string nameA;
     const string nameB;
     std::auto_ptr<BdbMessageStore> store;
     std::auto_ptr<DtxManager> dtxmgr;
-    QueueRegistry queues;
-    LinkRegistry links;
+    std::auto_ptr<QueueRegistry> queues;
+    std::auto_ptr<LinkRegistry> links;
     Queue::shared_ptr queueA;
     Queue::shared_ptr queueB;
     boost::intrusive_ptr<Message> msg1;
@@ -137,14 +207,14 @@
 
     void recoverPrepared(bool commit)
     {
-        setup();
+        setup<BdbMessageStore>();
 
         Swap swap(this, "RecoverPrepared");
         swap.init();
         std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
         swap.run(txn.get());
         store->prepare(*txn);
-        restart();
+        restart<BdbMessageStore>();
 
         //check that the message is not available from either queue
         BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount());
@@ -158,58 +228,89 @@
         }
 
         swap.check(commit);        
-        restart();
+        restart<BdbMessageStore>();
         swap.check(commit);
     }
+    
+    void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+    {
+        setup<TestMessageStore>();
+        MultiQueueTxn mqtTest(this);
+        mqtTest.init();
+        std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid"));
+        mqtTest.run(txn.get());
+        store->prepare(*txn);
 
+        // As the commits and aborts should happen through DtxManager, and it is too complex to
+        // pass all these test params through, we bypass DtxManager and use the store directly.
+        // This will prevent the queues from seeing committed txns, however. To test the success
+        // or failure of 
+        static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list);
+        if (commit)
+            store->commit(*txn);
+        else
+            store->abort(*txn);
+        restart<TestMessageStore>();
+        mqtTest.check(commit);
+    }
+
     void commit(Strategy& strategy)
     {
-        setup();
+        setup<BdbMessageStore>();
         strategy.init();
 
         std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
         strategy.run(txn.get());
         store->prepare(*txn);
-        store->commit(*txn);        
-        restart();
+        store->commit(*txn);
+        restart<BdbMessageStore>();
         strategy.check(true);
     }
 
     void abort(Strategy& strategy, bool prepare)
     {
-        setup();
+        setup<BdbMessageStore>();
         strategy.init();
 
         std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
         strategy.run(txn.get());
         if (prepare) store->prepare(*txn);
         store->abort(*txn);
-        restart();
+        restart<BdbMessageStore>();
         strategy.check(false);
     }
 
-    void swap(TPCTransactionContext* txn)
+    void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to)
     {
-        msg1 = queueA->dequeue().payload;//just dequeues in memory
+        msg1 = from->dequeue().payload;//just dequeues in memory
         //move the message from one queue to the other as part of a
         //distributed transaction
-        queueB->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
-        queueA->dequeue(txn, msg1);
+        to->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
+        from->dequeue(txn, msg1);
     }
 
-    void dequeue(TPCTransactionContext* txn)
+    void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue)
     {
-        msg2 = queueA->dequeue().payload;//just dequeues in memory
-        queueA->dequeue(txn, msg2);
+        msg2 = queue->dequeue().payload;//just dequeues in memory
+        queue->dequeue(txn, msg2);
     }
 
-    boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid)
+    boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, Queue::shared_ptr& queue)
     {
-        boost::intrusive_ptr<Message> msg = createMessage(msgid);        
-        queueA->enqueue(txn, msg);
+        boost::intrusive_ptr<Message> msg = createMessage(msgid);
+        queue->enqueue(txn, msg);
 	    return msg;
     }
 
+    boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, std::set<Queue::shared_ptr>& queueset)
+    {
+        boost::intrusive_ptr<Message> msg = createMessage(msgid);
+        for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) {
+            (*i)->enqueue(txn, msg);
+        }
+	    return msg;
+    }
+
     boost::intrusive_ptr<Message> deliver(const string& msgid, Queue::shared_ptr& queue)
     {
         msg4 = createMessage(msgid);        
@@ -217,9 +318,10 @@
 	    return msg4;
     }
 
+    template <class T>
     void setup()
     {
-        store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+        store = std::auto_ptr<T>(new T());
         store->init(test_dir, 4, 1, 8);
         store->truncate();
 
@@ -239,60 +341,63 @@
         return msg;
     }
 
+    template <class T>
     void restart()
     {
         queueA.reset();
         queueB.reset();
         store.reset();
+        queues.reset();
+        links.reset();
 
-        store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+        store = std::auto_ptr<T>(new T());
         store->init(test_dir, 4, 1, 8);
         ExchangeRegistry exchanges;
+        queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+        links = std::auto_ptr<LinkRegistry>(new LinkRegistry(0));
         dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
         dtxmgr->setStore (store.get());
-        RecoveryManagerImpl recovery(queues, exchanges, links, *dtxmgr, 0);
+        RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, 0);
         store->recover(recovery);
 
-        queueA = queues.find(nameA);
-        queueB = queues.find(nameB);
+        queueA = queues->find(nameA);
+        queueB = queues->find(nameB);
     }
 
-    void swapCheck(bool swapped, const string& msgid)
+    void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
     {        
-        BOOST_REQUIRE(queueA);
-        BOOST_REQUIRE(queueB);
+        BOOST_REQUIRE(queue);
+        BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+        if (size > 0) {
+            boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+            BOOST_REQUIRE(msg);
+            BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+        }
+    }
+
+    void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to)
+    {        
+        BOOST_REQUIRE(from);
+        BOOST_REQUIRE(to);
     
-        Queue::shared_ptr x;//the other queue
-        Queue::shared_ptr y;//the queue on which the message is expected to be
+        Queue::shared_ptr x; //the queue from which the message was swapped
+        Queue::shared_ptr y; //the queue on which the message is expected to be
     
         if (swapped) {
-            x = queueA;
-            y = queueB;
+            x = from;
+            y = to;
         } else {
-            x = queueB;
-            y = queueA;
+            x = to;
+            y = from;
         }
 
-        BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
-        BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
-        boost::intrusive_ptr<Message> msg = y->dequeue().payload;
-        BOOST_REQUIRE(msg);
-        BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());        
+        checkMsg(x, 0);
+        checkMsg(y, 1, msgid);
+        checkMsg(y, 0);
     }
 
-    void checkA(u_int32_t size, const string& msgid = "<none>")
-    {        
-        BOOST_REQUIRE(queueA);
-        BOOST_CHECK_EQUAL(size, queueA->getMessageCount());
-        if (size > 0) {
-            boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
-            BOOST_REQUIRE(msg);
-            BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
-        }
-    }
-
 public:
-    TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
+    TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
 
     void testCommitEnqueue()
     {
@@ -357,6 +462,46 @@
     {
         recoverPrepared(false);
     }
+    
+    void testMultiQueueCommit()
+    {
+        testMultiQueueTxn(2, true, true);
+    }
+    
+    void testMultiQueueAbort()
+    {
+        testMultiQueueTxn(2, true, false);
+    }
+    
+    void testMultiQueueNoQueueCommitRecover()
+    {
+        testMultiQueueTxn(0, false, true);
+    }
+    
+    void testMultiQueueNoQueueAbortRecover()
+    {
+        testMultiQueueTxn(0, false, false);
+    }
+    
+    void testMultiQueueSomeQueueCommitRecover()
+    {
+        testMultiQueueTxn(1, false, true);
+    }
+    
+    void testMultiQueueSomeQueueAbortRecover()
+    {
+        testMultiQueueTxn(1, false, false);
+    }
+    
+    void testMultiQueueAllQueueCommitRecover()
+    {
+        testMultiQueueTxn(2, false, true);
+    }
+    
+    void testMultiQueueAllQueueAbortRecover()
+    {
+        testMultiQueueTxn(2, false, false);
+    }
 };
 
 TwoPhaseCommitTest tpct;
@@ -440,4 +585,60 @@
     cout << "ok" << endl;
 }
 
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+    cout << test_filename << ".MultiQueueCommit: " << flush;
+    tpct.testMultiQueueCommit();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+    cout << test_filename << ".MultiQueueAbort: " << flush;
+    tpct.testMultiQueueAbort();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+    tpct.testMultiQueueNoQueueCommitRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+    tpct.testMultiQueueNoQueueAbortRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+    tpct.testMultiQueueSomeQueueCommitRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+    tpct.testMultiQueueSomeQueueAbortRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+    tpct.testMultiQueueAllQueueCommitRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+    tpct.testMultiQueueAllQueueAbortRecover();
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_SUITE_END()




More information about the rhmessaging-commits mailing list