[rhmessaging-commits] rhmessaging commits: r2195 - in store/branches/mrg-1.0/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jul 14 10:23:12 EDT 2008


Author: kpvdr
Date: 2008-07-14 10:23:12 -0400 (Mon, 14 Jul 2008)
New Revision: 2195

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
   store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp
   store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
   store/branches/mrg-1.0/cpp/tests/persistence.py
Log:
Backport of trunk r.2177: Fixed problems with transaction recover: DataToken in TxnCtxt was not being restored; also highest rid found during restore was not taking account of the new preparedXid instance which shares the messageIdSequence. Other minor bugfixes and tidy-ups.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -29,6 +29,9 @@
 #include "qpid/log/Statement.h"
 #include "qpid/management/PackageMrgstore.h"
 
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
 using boost::static_pointer_cast;
@@ -58,6 +61,7 @@
                                                         jrnlFsizePgs(defJrnlFileSizePgs),
                                                         wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
                                                         wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+                                                        highestRid(0),
                                                         isInit(false),
                                                         envPath(envpath)
 
@@ -460,10 +464,18 @@
         txn.abort();
         THROW_STORE_EXCEPTION_2("Error on recovery", e);
     }
+
     //recover transactions:
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {        
         
         TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+
+        // Restore data token state in TxnCtxt
+        xid_rid_map_citr citr = preparedMap.find(i->xid);
+        if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedMap");
+        tpcc->recoverDtok(citr->second, i->xid);
+        tpcc->addXidRecord(preparedXidStorePtr.get());
+
         RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
         if (i->enqueues.get()) {
             for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
@@ -492,7 +504,6 @@
     IdDbt key;
     Dbt value;
     //read all queues
-    u_int64_t highestRid = 0;
     while (queues.next(key, value)) {
         Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
         //create a Queue instance
@@ -524,7 +535,11 @@
         queue_index[key.id] = queue;
         maxQueueId = max(key.id, maxQueueId);
     }
+
+    // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
+    // the messageIdSequence is used for both queue journals and the preparedXid journal.
     messageIdSequence.reset(highestRid + 1);
+
     queueIdSequence.reset(maxQueueId + 1);
 }
 
@@ -605,9 +620,6 @@
 }
 
 
-#define MAX_AIO_SLEEPS 1000 // ~1 second
-#define AIO_SLEEP_TIME 1000 // 1 milisecond
-
 void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery, 
                                       qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
 {
@@ -768,19 +780,6 @@
     }
 }
 
-void BdbMessageStore::readXids(Db& db, std::set<string>& xids)
-{
-    Cursor c;
-    c.open(db, 0);
-
-    Dbt key;
-    Dbt ignore;
-    while (c.next(key, ignore)) {
-        std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
-        xids.insert(xid);
-    }
-}
-
 void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
 {
     Cursor c;
@@ -798,15 +797,50 @@
 {
     if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
     {
-        u_int64_t highest_rid;
+        u_int64_t thisHighestRid;
         preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
                 JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
-                0, highest_rid, 0);
-    
-        std::vector<std::string> xv;
-        preparedXidStorePtr->get_open_txn_list(xv);
-        for (std::vector<std::string>::const_iterator itr = xv.begin(); itr < xv.end(); itr++)
-            xids.insert(*itr);
+                0, thisHighestRid, 0);
+        if (thisHighestRid > highestRid)
+            highestRid = thisHighestRid;
+        try {
+            void* dbuff = NULL; size_t dbuffSize = 0;
+            void* xidbuff = NULL; size_t xidbuffSize = 0;
+            bool transientFlag = false;
+            bool externalFlag = false;
+            DataTokenImpl dtokp;
+            bool done = false;
+            long aio_sleep_cnt = 0;
+            while (!done) {
+                dtokp.reset();
+                dtokp.set_wstate(DataTokenImpl::ENQ);
+                rhm::journal::iores res = preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
+                switch (res) {
+                    case rhm::journal::RHM_IORES_SUCCESS:
+                        if (xidbuffSize > 0) {
+                            xids.insert(std::string((const char*)xidbuff, xidbuffSize));
+                            preparedMap[std::string((const char*)xidbuff, xidbuffSize)] = dtokp.rid();
+                            ::free(xidbuff);
+                        } else {
+                            THROW_STORE_EXCEPTION("No XID found in BdbMessageStore::collectPreparedXids()");
+                        }
+                        aio_sleep_cnt = 0;
+                        break;
+                    case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+                        if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                            THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::collectPreparedXids()");
+                        ::usleep(AIO_SLEEP_TIME);
+                        break;
+                    case rhm::journal::RHM_IORES_EMPTY:
+                        done = true;
+                        break;
+                    default:
+                        assert( "Store Error: Unexpected msg state");
+                }
+            }
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("Prepared XID journal: collectPreparedXids() failed: ") + e.what());
+        }
 
         preparedXidStorePtr->recover_complete(); // start journal.
     }
@@ -1146,9 +1180,13 @@
     try {
         // Nothing to do if not prepared
         chkInitPreparedXidStore();
-        if (txn.getDtok().is_enqueued())
-            preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(), txn.getXid());
-
+        if (txn.getDtok()->is_enqueued()) {
+            txn.incrDtokRef();
+            DataTokenImpl* dtokp = txn.getDtok();
+            dtokp->set_dequeue_rid(dtokp->rid());
+            dtokp->set_rid(messageIdSequence.next());
+            preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid());
+        }
         txn.complete(commit);
     } catch (const std::exception& e) {
         QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
@@ -1160,19 +1198,15 @@
 {
     checkInit();
     // pass sequence number for c/a
-    TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
-    return auto_ptr<TransactionContext>(txn);
+    return auto_ptr<TransactionContext>(new TxnCtxt(&messageIdSequence));
 }
 
 std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
 {
     checkInit();
-    IdSequence* jtx = NULL;
-    jtx = &messageIdSequence;
-
+    IdSequence* jtx = &messageIdSequence;
     // pass sequence number for c/a
-    TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
-    return auto_ptr<TPCTransactionContext>(txn);
+    return auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
 }
 
 void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
@@ -1183,7 +1217,11 @@
     
     try {
         chkInitPreparedXidStore();
-        preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(), txn->getXid(), false);
+        txn->incrDtokRef();
+        DataTokenImpl* dtokp = txn->getDtok();
+        dtokp->set_external_rid(true);
+        dtokp->set_rid(messageIdSequence.next());
+        preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
         txn->addXidRecord(preparedXidStorePtr.get());
 
         // make sure all the data is written to disk before returning

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 14:23:12 UTC (rev 2195)
@@ -25,29 +25,19 @@
 #define _BdbMessageStore_
 
 #include <string>
+
 #include "db-inc.h"
-//#include "BufferValue.h"
 #include "Cursor.h"
 #include "IdDbt.h"
-//#include "IdSequence.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jcfg.hpp"
 #include "PreparedTransaction.h"
-//#include "StoreException.h"
-#include "TxnCtxt.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/MessageStore.h"
 #include "qpid/management/Manageable.h"
-//#include <qpid/sys/Monitor.h>
-//#include <qpid/sys/Time.h>
-//#include <map>
-//#include <set>
-//#include <iostream>
-//#include <boost/format.hpp>
-//#include <boost/intrusive_ptr.hpp>
-//#include <boost/ptr_container/ptr_list.hpp>
 #include "qpid/management/Store.h"
-#include "jrnl/jcfg.hpp"
-#include "JournalImpl.h"
-#include "IdSequence.h"
+#include "TxnCtxt.h"
 
 // Assume DB_VERSION_MAJOR == 4
 #if (DB_VERSION_MINOR == 2)
@@ -57,7 +47,6 @@
 
 namespace rhm {
     namespace bdbstore {
-        using std::string;
 
         /**
          * An implementation of the MessageStore interface based on Berkeley DB
@@ -71,6 +60,9 @@
             typedef LockedMappings::map txn_lock_map;
             typedef boost::ptr_list<PreparedTransaction> txn_list;
 
+            typedef std::map<std::string, u_int64_t> xid_rid_map;
+            typedef xid_rid_map::const_iterator xid_rid_map_citr;
+
             // Default store settings
             static const u_int16_t defNumJrnlFiles = 8;
             static const u_int32_t defJrnlFileSizePgs = 24;
@@ -98,6 +90,8 @@
             u_int32_t jrnlFsizePgs;
             u_int32_t wcache_pgsize_sblks;
             u_int16_t wcache_num_pages;
+            xid_rid_map preparedMap;
+            u_int64_t highestRid;
 			bool isInit;
 			const char* envPath;
             static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -121,7 +115,6 @@
             int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg, 
                     queue_index& index, txn_list& locked, message_index& prepared);
             void recoverXids(txn_list& txns);
-            void readXids(Db& db, std::set<string>& xids);
             void readLockedMappings(Db& db, txn_lock_map& mappings);
             TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
             void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn, 
@@ -152,21 +145,34 @@
 	    
 	    	// journal functions
 	  	  	void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
-	  	  	string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
-	  	  	string getJrnlDir(const char* queueName);
-	  	  	string getJrnlBaseDir(); 
-            string getBdbBaseDir(); 
-            string getPxidBaseDir(); 
+            std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+            std::string getJrnlDir(const char* queueName);
+            std::string getJrnlBaseDir(); 
+            std::string getBdbBaseDir(); 
+            std::string getPxidBaseDir(); 
 			inline void checkInit() {
                 if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
             }
             void chkInitPreparedXidStore();
+  
+            // debug aid for printing XIDs that may contain non-printable chars
+            static std::string xid2str(const std::string xid) {
+                std::ostringstream oss;
+                oss << std::hex << std::setfill('0');
+                for (unsigned i=0; i<xid.size(); i++) {
+                    if (isprint(xid[i]))
+                        oss << xid[i];
+                    else
+                        oss << "/" << std::setw(2) << (int)((char)xid[i]);
+                }
+                return oss.str();
+            }
 
         public:
             struct Options : public qpid::Options {
                 Options(const std::string& name="Store Options");
-                string clusterName;
-                string storeDir;
+                std::string clusterName;
+                std::string storeDir;
                 bool storeAsync;
                 bool storeForce;
                 uint16_t numJrnlFiles;
@@ -222,7 +228,7 @@
     	    u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
 
 
-            void collectPreparedXids(std::set<string>& xids);
+            void collectPreparedXids(std::set<std::string>& xids);
 
             std::auto_ptr<qpid::broker::TransactionContext> begin();
             std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);

Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 14:23:12 UTC (rev 2195)
@@ -57,7 +57,7 @@
     ipqdef impactedQueues; // list of Queues used in the txn
     mutable qpid::sys::Mutex Lock;
     IdSequence* loggedtx;
-    DataTokenImpl dtok;
+    boost::intrusive_ptr<DataTokenImpl> dtokp;
     AutoScopedLock globalHolder;
 
     /**
@@ -83,7 +83,6 @@
                         jc->txn_abort(dtokp.get(), getXid());
                     }
                 } catch (const journal::jexception& e) {
-                    //std::cout << "Error commit" << e << std::endl;
                     THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
                 }
             }
@@ -93,7 +92,7 @@
 
 public:
 
-    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
         if (loggedtx) {
             std::stringstream s;
             s << "rhm-tid" << this;
@@ -129,7 +128,6 @@
                         jc->get_wr_events();
                     }
                 } catch (const journal::jexception& e) {
-                    //std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
                     THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
                 }
             }
@@ -165,7 +163,14 @@
     void deleteXidRecord() { impactedQueues.clear(); }
     void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
     void complete(bool commit) { completeTXN(commit); }
-    DataTokenImpl& getDtok() { return dtok; }
+    DataTokenImpl* getDtok() { return dtokp.get(); }
+    void incrDtokRef() { dtokp->addRef(); }
+    void recoverDtok(const u_int64_t rid, const std::string xid) {
+        dtokp->set_rid(rid);
+        dtokp->set_wstate(DataTokenImpl::ENQ);
+        dtokp->set_xid(xid);
+        dtokp->set_external_rid(true);
+    }
 };
 
 class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.cpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -30,6 +30,7 @@
 
 #include "jrnl/data_tok.hpp"
 
+#include <iomanip>
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
 #include <sstream>
@@ -50,6 +51,7 @@
     _dblks_written(0),
     _dblks_read(0),
     _pg_cnt(0),
+    _fid(0),
     _rid(0),
     _xid(),
     _dequeue_rid(0),
@@ -163,9 +165,31 @@
     _dblks_written = 0;
     _dblks_read = 0;
     _pg_cnt = 0;
+    _fid = 0;
     _rid = 0;
     _xid.clear();
 }
 
+// debug aid
+std::string
+data_tok::status_str() const
+{
+    std::ostringstream oss;
+    oss << std::hex << std::setfill('0');
+    oss << "dtok id=0x" << _cnt << "; ws=" << wstate_str() << "; rs=" << rstate_str();
+    oss << "; fid=0x" << _fid << "; rid=0x" << _rid << "; xid=";
+    for (unsigned i=0; i<_xid.size(); i++)
+    {
+        if (isprint(_xid[i]))
+            oss << _xid[i];
+        else
+            oss << "/" << std::setw(2) << (int)((char)_xid[i]);
+    }
+    oss << "; drid=0x" << _dequeue_rid << " extrid=" << (_external_rid?"T":"F");
+    oss << "; ds=0x" << _dsize << "; dw=0x" << _dblks_written << "; dr=0x" << _dblks_read;
+    oss << " pc=0x" << _pg_cnt;
+    return oss.str();
+}
+
 } // namespace journal
 } // namespace rhm

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/data_tok.hpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -160,6 +160,9 @@
                 { _xid.assign((const char*)xidp, xid_len); }
 
         void reset();
+        
+        // debug aid
+        std::string status_str() const;
     };
 
 } // namespace journal

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -97,7 +97,7 @@
     return _fh_arr[pg_index];
 }
 
-const std::string
+std::string
 rrfc::status_str() const
 {
     std::ostringstream oss;

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -136,7 +136,7 @@
                 { return _curr_fh->wr_aio_outstanding_dblks() > 0; }
         
         // Debug aid
-        const std::string status_str() const;
+        std::string status_str() const;
     }; // class rrfc
 
 } // namespace journal

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -91,11 +91,8 @@
 const txn_data_list
 txn_map::get_tdata_list(const std::string& xid)
 {
-    xmap_itr itr;
-    {
-        slock s(&_mutex);
-        itr = _map.find(xid);
-    }
+    slock s(&_mutex);
+    xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
@@ -127,11 +124,8 @@
 bool
 txn_map::in_map(const std::string& xid)
 {
-    xmap_itr itr;
-    {
-        slock s(&_mutex);
-        itr = _map.find(xid);
-    }
+    slock s(&_mutex);
+    xmap_itr itr= _map.find(xid);
     if (itr == _map.end()) // not found in map
         return false;
     return true;
@@ -140,11 +134,8 @@
 u_int32_t
 txn_map::get_rid_count(const std::string& xid)
 {
-    xmap_itr itr;
-    {
-        slock s(&_mutex);
-        itr = _map.find(xid);
-    }
+    slock s(&_mutex);
+    xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.cpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -176,7 +176,7 @@
     return findex != _fh_index && in_use;
 }
 
-const std::string
+std::string
 wrfc::status_str() const
 {
     std::ostringstream oss;

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wrfc.hpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -128,7 +128,7 @@
         bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
         
         // Debug aid
-        const std::string status_str() const;
+        std::string status_str() const;
     }; // class wrfc
 
 } // namespace journal

Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp	2008-07-14 14:23:12 UTC (rev 2195)
@@ -160,9 +160,6 @@
         swap.check(commit);        
         restart();
         swap.check(commit);
-        
-        // this test leaves xids in the store
-        store->truncate();
     }
 
     void commit(Strategy& strategy)

Modified: store/branches/mrg-1.0/cpp/tests/persistence.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/persistence.py	2008-07-14 13:59:22 UTC (rev 2194)
+++ store/branches/mrg-1.0/cpp/tests/persistence.py	2008-07-14 14:23:12 UTC (rev 2195)
@@ -262,6 +262,19 @@
         session = self.session
         session.synchronous = False
 
+        # check xids from phase 6 are gone
+        txc = self.xid('c')
+        txd = self.xid('d')
+
+        xids = session.dtx_recover().in_doubt
+        ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+        
+        if txc.global_id in ids:    
+            self.fail("Xid still present : %s" % (txc))
+        if txd.global_id in ids:    
+            self.fail("Xid still present : %s" % (txc))
+        self.assertEqual(0, len(xids))    
+
         #test deletion of queue after publish
         #create queue
         session.queue_declare(queue = "q", auto_delete=True, durable=True)




More information about the rhmessaging-commits mailing list