[rhmessaging-commits] rhmessaging commits: r1079 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Oct 16 08:29:00 EDT 2007


Author: cctrieloff
Date: 2007-10-16 08:29:00 -0400 (Tue, 16 Oct 2007)
New Revision: 1079

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
 - Txn fixes
 - TPC fixes
 - All unit tests now pass
 - still has sys tests issues, so jrnl not enabled yet
 


Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -262,15 +262,18 @@
     }
     //recover transactions:
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {        
-        RecoverableTransaction::shared_ptr dtx =
-            registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(i->xid, &messageIdSequence)));
+        
+		TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+		RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
         if (i->enqueues.get()) {
             for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+				tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                 dtx->enqueue(queues[j->first], messages[j->second]);
             }
         }
         if (i->dequeues.get()) {
             for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+				tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
                 dtx->dequeue(queues[j->first], messages[j->second]);
             }
         }
@@ -427,7 +430,8 @@
 		    			 Buffer contentBuff(data + contentOffset, contentSize);
                          msg->decodeContent(contentBuff);
                     }
-                 	if (xidbuffSize > 0 && PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
+
+                 	if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
                         prepared[dtokp.rid()] = msg;
                     } else {
                          queue->recover(msg);
@@ -450,8 +454,6 @@
                     break;
                 case rhm::journal::RHM_IORES_EMPTY:
                     read = false;
-				    // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
-				    assert (jc->get_enq_cnt() == msg_count);
 				    break; // done with all messages. ((add call in jrnl to test that _emap is empty. 
                 default:
 				    assert( "Store Error: Unexpected msg state");
@@ -558,7 +560,7 @@
     	for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
 	        if (prepared.find(*i) == prepared.end()) {
             	TPCTxnCtxt txn(*i, NULL);
-        	    completed(txn, dequeueXidDb, enqueueXidDb);
+        	    completed(txn, dequeueXidDb, enqueueXidDb, false);
         	}
     	}
 	    readLockedMappings(enqueueXidDb, enqueues);
@@ -758,7 +760,7 @@
 
 	    if (usingJrnl()){
 			// add queue* to the txn map..
-			if (ctxt) txn->addXidRecord(queue);
+			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 		}else{
        	    msg.enqueueComplete();  // set enqueued for ack
             put(mappingDb, txn->get(), key, value);
@@ -876,7 +878,7 @@
         
 		if (usingJrnl()){
 			// add queue* to the txn map..
-			if (ctxt) txn->addXidRecord(queue);
+			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 			async_dequeue(ctxt, msg, queue); 
 			
 		} else if (txn->isTPC()) {
@@ -997,7 +999,7 @@
     }
 }
 
-void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply)
+void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit)
 {
     if (!txn.get()) txn.begin(env);
 
@@ -1022,7 +1024,7 @@
 		}
         prepareXidDb.del(txn.get(), &key, 0);
 
-        txn.commit();
+        txn.complete(commit);
     } catch (std::exception& e) {
         std::cout << "Error completing xid " << txn.getXid() << ": " << e.what() << std::endl;
         txn.abort();
@@ -1061,7 +1063,7 @@
         Dbt value(&dummy, sizeof(dummy));
 
 		// make sure all the data is written to disk before returning
-		txn->sync();
+ 		txn->sync();
         prepareXidDb.put(txn->get(), &key, &value, 0);
 
         txn->commit();
@@ -1069,14 +1071,14 @@
         txn->abort();
         throw e;
     }
-
 }
 
 void BdbMessageStore::commit(TransactionContext& ctxt) 
 {
-    TxnCtxt* txn(check(&ctxt));
+std::cout << " commit1" << std::flush;
+     TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
-        completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb);        
+        completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);        
     } else {
         txn->commit();
     }
@@ -1086,7 +1088,7 @@
 {
     TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
-        completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb);
+        completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
     } else {
         txn->abort();
     }

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-10-16 12:29:00 UTC (rev 1079)
@@ -100,7 +100,7 @@
             bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
             void destroy(Db& db, const qpid::broker::Persistable& p);
             bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
-            void completed(TPCTxnCtxt& txn, Db& discard, Db& apply);
+            void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit);
             void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
 
             u_int64_t getRecordSize(Db& db, Dbt& key);

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -58,7 +58,7 @@
             if (tdl_itr->_enq_flag) { // enqueue op
                 i->enqueues->add(queue_id, tdl_itr->_rid);
             } else { // dequeue op
-                i->dequeues->add(queue_id, tdl_itr->_rid);
+                i->dequeues->add(queue_id, tdl_itr->_drid);
             }
         }
     }

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/TxnCtxt.h	2007-10-16 12:29:00 UTC (rev 1079)
@@ -43,8 +43,8 @@
 
 class TxnCtxt : public qpid::broker::TransactionContext
 {
-private:
-	typedef std::set<const qpid::broker::PersistableQueue*> ipqdef;
+protected:
+	typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
 	ipqdef impactedQueues; // list of Queues used in the txn
     static unsigned int count;
     mutable qpid::sys::Mutex Lock;
@@ -62,7 +62,7 @@
 	
 	void completeTXN(bool commit){
 		for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
-	   		JournalImpl* jc = static_cast<JournalImpl*>((*i)->getExternalQueueStore());
+	   		JournalImpl* jc = static_cast<JournalImpl*>(*i);
 			if (jc && loggedtx) { /* if using journal */
                 DataTokenImpl* dtokp = new DataTokenImpl;
 				dtokp->set_rid(loggedtx->next());
@@ -70,11 +70,12 @@
 					if (commit)
 						jc->txn_commit(dtokp, getXid());
 					else
+					{
 						jc->txn_abort(dtokp, getXid());
-                	
+                	}
 				} catch (rhm::journal::jexception& e) { 
 		      		std::string str;
-std::cout << "Error commit" << e << std::endl;
+//std::cout << "Error commit" << e << std::endl;
 			  		delete dtokp;
 			  		THROW_STORE_EXCEPTION("Error commit" + e.to_string(str));
 	     		}
@@ -103,7 +104,7 @@
             if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
 			allWritten = true;
 			for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
-	           	JournalImpl* jc = static_cast<JournalImpl*>((*i)->getExternalQueueStore());
+	           	JournalImpl* jc = static_cast<JournalImpl*>(*i);
 				
 				try
 				{
@@ -116,8 +117,7 @@
 					}
 				}catch (rhm::journal::jexception& e) { 
 		      		std::string str;
-std::cout << "Error sync" << e << std::endl;
-					
+//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
 			  		THROW_STORE_EXCEPTION("Error sync" + e.to_string(str));
 	     		}
 			}
@@ -134,8 +134,8 @@
 	virtual const std::string& getXid() { return tid; }
 
 	void deleteXidRecord(){ impactedQueues.clear(); }
-	void addXidRecord(const qpid::broker::PersistableQueue& queue){
-		impactedQueues.insert(&queue); }
+	void addXidRecord(qpid::broker::ExternalQueueStore* queue){
+		impactedQueues.insert(queue); }
 
 };
 
@@ -146,6 +146,11 @@
     TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
     virtual bool isTPC() { return true; }
     virtual const std::string& getXid() { return xid; }
+	// commit the BDB abort, abort commit the jnrl
+    void commit(){ txn->commit(0); txn = 0; }
+    void abort(){ txn->abort(); txn = 0; }
+    void complete(bool commit){ 
+	txn->commit(0); completeTXN(commit); txn = 0; } 
 };
 
 }}

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -375,7 +375,7 @@
 		    std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(), prep_txn_list.end(), *itr);
 			if (pitr == prep_txn_list.end())
 				_tmap.get_remove_tdata_list(*itr);
-		}
+		} 
     }
 }
 
@@ -392,22 +392,19 @@
     {
         case RHM_JDAT_ENQ_MAGIC:
             {
-std::cout << " e" << h._rid << std::flush;
                 enq_rec er;
                 while (!done)
                 {
-std::cout << "*" << std::flush;
                     done = er.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 rd._enq_cnt_list[fid]++;
                 if (er.xid_size())
                 {
-std::cout << "$" << std::flush;
                     er.get_xid(&xidp);
                     assert(xidp != NULL);
                     std::string xid((char*)xidp, er.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+                    _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
                     free(xidp);
                 }
                 else
@@ -418,24 +415,21 @@
             break;
         case RHM_JDAT_DEQ_MAGIC:
             {
-std::cout << " d" << h._rid << std::flush;
                 deq_rec dr;
                 while (!done)
                 {
-std::cout << "*" << std::flush;
                     done = dr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 if (dr.xid_size())
                 {
-std::cout << "$" << std::flush;
                     // If the enqueue is part of a pending txn, it will not yet be in emap
-                    try { _emap.lock(h._rid); }
+                    try { _emap.lock(dr.deq_rid()); }
                     catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
                     dr.get_xid(&xidp);
                     assert(xidp != NULL);
                     std::string xid((char*)xidp, dr.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(dr.deq_rid(), fid, false));
+                    _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
                     free(xidp);
                 }
                 else
@@ -457,12 +451,10 @@
             break;
         case RHM_JDAT_TXA_MAGIC:
             {
-std::cout << " a" << h._rid << std::flush;
                 txn_rec ar;
                 while (!done)
                 {
-std::cout << "*" << std::flush;
-                    done = ar.rcv_decode(h, ifsp, cum_size_read);
+                   done = ar.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 // Delete this txn from tmap, unlock any locked records in emap
@@ -472,14 +464,18 @@
                 txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
                 for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                 {
-                    try { _emap.unlock(itr->_rid); }
+                    try 
+					{ 
+						if (!itr->_enq_flag)
+							_emap.unlock(itr->_drid);
+					}
                     catch(jexception e)
                     {
                         if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
                             throw e;
                     }
                     if (itr->_enq_flag)
-                        _wrfc.decr_enqcnt(itr->_fid);
+                        rd._enq_cnt_list[fid]--;
                 }
                 free(xidp);
                 if (rd._h_rid < h._rid)
@@ -488,11 +484,9 @@
             break;
         case RHM_JDAT_TXC_MAGIC:
             {
-std::cout << " c" << h._rid << std::flush;
                 txn_rec cr;
                 while (!done)
                 {
-std::cout << "*" << std::flush;
                     done = cr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
@@ -500,17 +494,15 @@
                 cr.get_xid(&xidp);
                 assert(xidp != NULL);
                 std::string xid((char*)xidp, cr.xid_size());
-std::cout << "@"  << std::flush;
                 txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
                 for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                 {
-std::cout << " enq_flag=" << itr->_enq_flag << std::flush;
                     if (itr->_enq_flag) // txn enqueue
                         _emap.insert_fid(itr->_rid, itr->_fid);
                     else // txn dequeue
                     {
-                        u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
-                        _wrfc.decr_enqcnt(fid);
+                        u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+                        rd._enq_cnt_list[fid]--;
                     }
                 }
 				
@@ -521,18 +513,15 @@
             break;
         case RHM_JDAT_EMPTY_MAGIC:
             {
-std::cout << " x" << std::flush;
                 u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
                 ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
             }
         break;
         case 0:
-std::cout << " z" << std::flush;
             rd._lfid = fid;
             rd._eo = ifsp->tellg();
             return false;
         default:
-std::cout << " ?" << std::flush;
             std::stringstream ss;
             ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
             throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
@@ -589,7 +578,6 @@
 void
 jcntl::aio_wr_callback(jcntl*  journal, u_int32_t num_dtoks)
 {
-
 //kpvdr TODO -- this list needs to be mutexed...???
 // need to delete the dtok's
     std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
@@ -601,28 +589,23 @@
         data_tok*& dtokp = this_dtok_list.front();
 		if (!journal->is_stopped() && dtokp->getSourceMessage())
 		{
-        	data_tok::write_state st = dtokp->wstate();
-     	    if (st == data_tok::ENQ)
-    	    {
-//std::cout << "----- enqueueComplete rid=" << dtokp->rid() << std::endl;
-		
-    	         dtokp->getSourceMessage()->enqueueComplete();
-		      /// cct  --- if TPC work out what to do !!!
-    	    }
-    	    else if (dtokp->wstate() == data_tok::DEQ)
+			switch (dtokp->wstate())
 			{
-//std::cout << "----- dequeueComplete rid=" << dtokp->rid() << std::endl;
-
-     	        dtokp->getSourceMessage()->dequeueComplete();
-	     
-		     	if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
-		         	dtokp->getSourceMessage()->setPersistenceId(0);
+ 				case data_tok::ENQ:
+     	         	dtokp->getSourceMessage()->enqueueComplete();
+ 					break;
+				case data_tok::DEQ:
+     	        	dtokp->getSourceMessage()->dequeueComplete();
+		     		if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
+		         		dtokp->getSourceMessage()->setPersistenceId(0);
+					break;
+				default:
+					;
 			}
 		}
         this_dtok_list.pop_front();
 		delete dtokp;
     }
-    
 }
 
 void

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -32,6 +32,7 @@
 
 #include <jrnl/rmgr.hpp>
 
+#include <jrnl/jcntl.hpp>
 #include <assert.h>
 #include <cerrno>
 #include <sstream>
@@ -287,8 +288,24 @@
                 }
                 catch (jexception& e)
                 {
-                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                    if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
                         throw e;
+						
+					// Ok, not in emap, now search tmap for recover
+					if (_jc->is_read_only())
+					{
+						std::vector<std::string> xid_list;
+						_tmap.xid_list(xid_list);
+						for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end() && !is_enq; itr++)
+						{
+							txn_data_list tx_list = _tmap.get_tdata_list(*itr);
+							for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq; ditr++)
+							{
+								if (ditr->_rid == _hdr._rid)
+									is_enq = true;
+							}
+						}
+					}
 //std::cout << "-nf" << std::flush;
                 }
 #endif
@@ -299,7 +316,7 @@
                     // Is this locked by a pending dequeue transaction?
                     try
                     {
-                        if (_emap.is_locked(_hdr._rid))
+                        if (_emap.is_locked(_hdr._rid) && !_jc->is_read_only())
                             return RHM_IORES_TXPENDING;
                     }
                     catch (jexception e)

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -36,13 +36,17 @@
 #include <sstream>
 #include <jrnl/jerrno.hpp>
 
+#include <iostream> // for debug
+
 namespace rhm
 {
 namespace journal
 {
 
-txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag):
+txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
+		const bool enq_flag):
         _rid(rid),
+        _drid(drid),
         _fid(fid),
         _enq_flag(enq_flag),
         _aio_compl(false)
@@ -139,10 +143,11 @@
         ss << std::hex << "xid=\"" << xid << "\"";
         throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "is_txn_synced");
     }
-    txn_data_list list = itr->second;
+//std::cout << " its: found XID" << std::flush;
     bool is_synced = true;
-    for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
+//std::cout << " rid=" << litr->_rid << " aioc=" << litr->_aio_compl << std::flush;
         if (!litr->_aio_compl)
         {
             is_synced = false;
@@ -154,7 +159,7 @@
 }
 
 const bool
-txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid) throw (jexception)
 {
     bool ok = true;
     bool found = false;
@@ -164,13 +169,19 @@
         ok = false;
     else
     {
-        txn_data_list list = itr->second;
-        for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+//std::cout << " sac: found XID" << std::flush;
+//        txn_data_list list = itr->second;
+        for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
         {
             if (litr->_rid == rid)
             {
+// txn_data_struct t(litr->_rid, litr->_drid, litr->_fid, litr->_enq_flag);
+// t._aio_compl = true;
+// itr->second.erase(litr);
+// itr->second.push_back(t);
                 found = true;
                 litr->_aio_compl = true;
+//std::cout << " rid=" << rid << " aioc=" << litr->_aio_compl << " ptr=" << std::flush;
                 break;
             }
         }

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -54,10 +54,11 @@
     struct txn_data_struct
     {
         u_int64_t _rid;     ///< Record id for this operation
+        u_int64_t _drid;    ///< Dequeue record id for this operation
         u_int16_t _fid;     ///< File id, to be used when transferring to emap on commit
         bool _enq_flag;     ///< If true, enq op, otherwise deq op
         bool _aio_compl;    ///< Initially false, set to true when AIO returns
-        txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
+        txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid, const bool enq_flag);
     };
     typedef txn_data_struct txn_data;
     typedef std::vector<txn_data> txn_data_list;
@@ -82,7 +83,7 @@
         const txn_data_list get_remove_tdata_list(const std::string& xid) throw (jexception);
         const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
         const bool is_txn_synced(const std::string& xid) throw (jexception);
-        const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
+        const bool set_aio_compl(const std::string& xid, const u_int64_t rid) throw (jexception);
         inline void clear() { _map.clear(); }
         inline const bool empty() const { return _map.empty(); }
         inline const u_int16_t size() const { return (u_int16_t)_map.size(); }

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -167,7 +167,7 @@
             if (xid_len) // If part of transaction, add to transaction map
             {
                 std::string xid((char*)xid_ptr, xid_len);
-                _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), true));
+                _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
             }
             else
                 _emap.insert_fid(rid, dtokp->fid());
@@ -307,10 +307,10 @@
             if (xid_len) // If part of transaction, add to transaction map
             {
                 // If the enqueue is part of a pending txn, it will not yet be in emap
-                try { _emap.lock(rid); }
+                try { _emap.lock(dequeue_rid); }
                 catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
                 std::string xid((char*)xid_ptr, xid_len);
-                _tmap.insert_txn_data(xid, txn_data(dequeue_rid, dtokp->fid(), false));
+                _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false));
             }
             else
             {
@@ -444,7 +444,11 @@
             txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
             for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
             {
-                try { _emap.unlock(itr->_rid); }
+                try
+				{
+					if (!itr->_enq_flag)
+						_emap.unlock(itr->_drid);
+				}
                 catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
                 if (itr->_enq_flag)
                     _wrfc.decr_enqcnt(itr->_fid);
@@ -587,7 +591,7 @@
                     _emap.insert_fid(itr->_rid, itr->_fid);
                 else // txn dequeue
                 {
-                    u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
+                    u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
                     _wrfc.decr_enqcnt(fid);
                 }
             }

Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2007-10-16 12:29:00 UTC (rev 1079)
@@ -41,7 +41,6 @@
 class TwoPhaseCommitTest : public CppUnit::TestCase  
 {
     CPPUNIT_TEST_SUITE(TwoPhaseCommitTest);
-
     CPPUNIT_TEST(testCommitSwap);
     CPPUNIT_TEST(testPrepareAndAbortSwap);
     CPPUNIT_TEST(testAbortNoPrepareSwap);
@@ -72,9 +71,10 @@
     {
         TwoPhaseCommitTest* const test;
         const string messageId;
+		Message::shared_ptr msg;
     public:
         Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
-        void init(){ test->deliver(messageId, test->queueA); }
+        void init(){ msg = test->deliver(messageId, test->queueA); }
         void run(TPCTransactionContext* txn) { test->swap(txn); }
         void check(bool committed) { test->swapCheck(committed, messageId); }
     };
@@ -82,13 +82,16 @@
     class Enqueue : public Strategy
     {
         TwoPhaseCommitTest* const test;
+		Message::shared_ptr msg1;
+		Message::shared_ptr msg2;
+		Message::shared_ptr msg3;
     public:
         Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
         void init() {}
         void run(TPCTransactionContext* txn) { 
-            test->enqueue(txn, "Enqueue1"); 
-            test->enqueue(txn, "Enqueue2"); 
-            test->enqueue(txn, "Enqueue3"); 
+            msg1 = test->enqueue(txn, "Enqueue1"); 
+            msg2 = test->enqueue(txn, "Enqueue2"); 
+            msg3 = test->enqueue(txn, "Enqueue3"); 
         }
         void check(bool committed) { 
             if (committed) {
@@ -103,12 +106,15 @@
     class Dequeue : public Strategy
     {
         TwoPhaseCommitTest* const test;
+		Message::shared_ptr msg1;
+		Message::shared_ptr msg2;
+		Message::shared_ptr msg3;
     public:
         Dequeue(TwoPhaseCommitTest* const test_): test(test_) {}
         void init() {
-            test->deliver("Dequeue1", test->queueA); 
-            test->deliver("Dequeue2", test->queueA); 
-            test->deliver("Dequeue3", test->queueA); 
+            msg1 = test->deliver("Dequeue1", test->queueA); 
+            msg2 = test->deliver("Dequeue2", test->queueA); 
+            msg3 = test->deliver("Dequeue3", test->queueA); 
         }
         void run(TPCTransactionContext* txn) { 
             test->dequeue(txn); 
@@ -132,7 +138,10 @@
     QueueRegistry queues;
     Queue::shared_ptr queueA;
     Queue::shared_ptr queueB;
-
+	Message::shared_ptr msg1;
+	Message::shared_ptr msg2;
+	Message::shared_ptr msg4;
+	
 public:
     TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
 
@@ -214,7 +223,6 @@
         std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
         swap.run(txn.get());
         store->prepare(*txn);
-
         restart();
 
         //check that the message is not available from either queue
@@ -261,29 +269,31 @@
 
     void swap(TPCTransactionContext* txn)
     {
-        Message::shared_ptr msg = queueA->dequeue().payload;//just dequeues in memory
+        msg1 = queueA->dequeue().payload;//just dequeues in memory
         //move the message from one queue to the other as part of a
         //distributed transaction
-        queueB->enqueue(txn, msg);//note: need to enqueue it first to avoid message being deleted
-        queueA->dequeue(txn, msg);
+        queueB->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
+        queueA->dequeue(txn, msg1);
     }
 
     void dequeue(TPCTransactionContext* txn)
     {
-        Message::shared_ptr msg = queueA->dequeue().payload;//just dequeues in memory
-        queueA->dequeue(txn, msg);
+        msg2 = queueA->dequeue().payload;//just dequeues in memory
+        queueA->dequeue(txn, msg2);
     }
 
-    void enqueue(TPCTransactionContext* txn, const string& msgid)
+    Message::shared_ptr enqueue(TPCTransactionContext* txn, const string& msgid)
     {
         Message::shared_ptr msg = createMessage(msgid);        
         queueA->enqueue(txn, msg);
+		return msg;
     }
 
-    void deliver(const string& msgid, Queue::shared_ptr& queue)
+    Message::shared_ptr deliver(const string& msgid, Queue::shared_ptr& queue)
     {
-        Message::shared_ptr msg = createMessage(msgid);        
-        queue->deliver(msg);
+        msg4 = createMessage(msgid);        
+        queue->deliver(msg4);
+		return msg4;
     }
 
     void setup()




More information about the rhmessaging-commits mailing list