[rhmessaging-commits] rhmessaging commits: r1033 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Oct 12 18:20:27 EDT 2007


Author: cctrieloff
Date: 2007-10-12 18:20:26 -0400 (Fri, 12 Oct 2007)
New Revision: 1033

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/deq_rec.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
lots of small fixes for txns

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-12 22:20:26 UTC (rev 1033)
@@ -263,7 +263,7 @@
     //recover transactions:
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {        
         RecoverableTransaction::shared_ptr dtx =
-            registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(i->xid)));
+            registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(i->xid, &messageIdSequence)));
         if (i->enqueues.get()) {
             for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
                 dtx->enqueue(queues[j->first], messages[j->second]);
@@ -401,7 +401,10 @@
     dtokp.set_wstate(DataTokenImpl::ENQ);
 	// read the message from the Journal.
     try {
-        while (read) {
+        
+//std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
+		
+		while (read) {
 
             rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, &dtokp);
     		readSize = dtokp.dsize();
@@ -540,6 +543,9 @@
     std::set<string> prepared;
     collectPreparedXids(prepared);
 
+//std::cout << "prep size:" << prepared.size() << std::endl;
+
+
 	//when using the async journal, it will abort unprepaired xids and populate the locked maps
 	if (!usingJrnl()){
 	    txn_lock_map enqueues;
@@ -551,7 +557,7 @@
     	//abort all known but unprepared xids:
     	for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
 	        if (prepared.find(*i) == prepared.end()) {
-            	TPCTxnCtxt txn(*i);
+            	TPCTxnCtxt txn(*i, NULL);
         	    completed(txn, dequeueXidDb, enqueueXidDb);
         	}
     	}
@@ -562,6 +568,8 @@
   	    }
 	} else {
   	    for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+
+//std::cout << "prep:" << *i << std::endl;
             LockedMappings::shared_ptr enq_ptr;
             enq_ptr.reset(new LockedMappings);
             LockedMappings::shared_ptr deq_ptr;
@@ -900,8 +908,8 @@
     bool written = false;
 	DataTokenImpl* ddtokp =  new DataTokenImpl;
  	ddtokp->setSourceMessage (&msg);
-	ddtokp->set_rid(msg.getPersistenceId()); // message id to be dequeued
-	ddtokp->set_dequeue_rid(messageIdSequence.next());
+	ddtokp->set_rid(messageIdSequence.next()); 
+	ddtokp->set_dequeue_rid(msg.getPersistenceId());
 	ddtokp->set_wstate(DataTokenImpl::ENQ);
 	JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
 	string tid;
@@ -921,7 +929,7 @@
 			  }
          } catch (rhm::journal::jexception& e) { 
 		      std::string str;
-			  delete ddtokp;
+			  //delete ddtokp;
 			  THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
 	     }
          switch (dres)
@@ -1024,14 +1032,19 @@
 
 auto_ptr<TransactionContext> BdbMessageStore::begin() 
 {
-    TxnCtxt* txn(new TxnCtxt(true));
+	// pass sequence number for c/a when using jrnl
+    TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
     txn->begin(env);
     return auto_ptr<TransactionContext>(txn);
 }
 
 std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
 {
-    TPCTxnCtxt* txn(new TPCTxnCtxt(xid));
+	IdSequence* jtx = NULL;
+	if (usingJrnl()) jtx = &messageIdSequence;
+
+	// pass sequence number for c/a when using jrnl
+    TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
     txn->begin(env);
     return auto_ptr<TPCTransactionContext>(txn);
 }

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/TxnCtxt.h	2007-10-12 22:20:26 UTC (rev 1033)
@@ -48,7 +48,7 @@
 	ipqdef impactedQueues; // list of Queues used in the txn
     static unsigned int count;
     mutable qpid::sys::Mutex Lock;
-	bool loggedtx;
+	IdSequence* loggedtx;
 	
 	unsigned int getCount() {
     	qpid::sys::Mutex::ScopedLock locker(Lock);
@@ -63,21 +63,31 @@
 	void completeTXN(bool commit){
 		for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
 	   		JournalImpl* jc = static_cast<JournalImpl*>((*i)->getExternalQueueStore());
-			if (jc) { /* if using journal */
+			if (jc && loggedtx) { /* if using journal */
                 DataTokenImpl* dtokp = new DataTokenImpl;
-				if (commit)
-					jc->txn_commit(dtokp, getXid());
-				else
-					jc->txn_abort(dtokp, getXid());
-                }
+				dtokp->set_rid(loggedtx->next());
+				try{
+					if (commit)
+						jc->txn_commit(dtokp, getXid());
+					else
+						jc->txn_abort(dtokp, getXid());
+                	
+				} catch (rhm::journal::jexception& e) { 
+		      		std::string str;
+std::cout << "Error commit" << e << std::endl;
+			  		delete dtokp;
+			  		THROW_STORE_EXCEPTION("Error commit" + e.to_string(str));
+	     		}
+			
 			}
+		}	
+		sync();
 		deleteXidRecord();
-		sync();
 	}
 	
 public:
 	
-    TxnCtxt(bool _loggedtx=false) : loggedtx(_loggedtx), txn(0)  {
+    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0)  {
 		if (loggedtx) tid = "rhm-tid" + getCount();
 	}
 	
@@ -94,13 +104,22 @@
 			allWritten = true;
 			for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
 	           	JournalImpl* jc = static_cast<JournalImpl*>((*i)->getExternalQueueStore());
-				if (jc && !(jc->is_txn_synced(getXid())))
+				
+				try
 				{
-					if (firstloop)
-						jc->flush();
-					allWritten = false;
-					jc->get_wr_events();
-				}
+					if (jc && !(jc->is_txn_synced(getXid())))
+					{
+						if (firstloop)
+							jc->flush();
+						allWritten = false;
+						jc->get_wr_events();
+					}
+				}catch (rhm::journal::jexception& e) { 
+		      		std::string str;
+std::cout << "Error sync" << e << std::endl;
+					
+			  		THROW_STORE_EXCEPTION("Error sync" + e.to_string(str));
+	     		}
 			}
 			firstloop = false;
 		}
@@ -108,8 +127,8 @@
 	
     virtual ~TxnCtxt() { if(txn) abort(); }
     void begin(DbEnv& env){ env.txn_begin(0, &txn, 0); }
-    void commit(){ txn->commit(0); completeTXN(true); txn = 0; sync();}
-    void abort(){ txn->abort(); completeTXN(false); txn = 0; sync();}
+    void commit(){ txn->commit(0); completeTXN(true); txn = 0; }
+    void abort(){ txn->abort(); completeTXN(false); txn = 0; }
     DbTxn* get(){ return txn; }
     virtual bool isTPC() { return false; }
 	virtual const std::string& getXid() { return tid; }
@@ -124,7 +143,7 @@
 {
     const std::string xid;
 public:
-    TPCTxnCtxt(const std::string& _xid) : xid(_xid) {}
+    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
     virtual bool isTPC() { return true; }
     virtual const std::string& getXid() { return xid; }
 };

Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-10-12 22:20:26 UTC (rev 1033)
@@ -80,6 +80,7 @@
         // Decode used for recover
         const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
 
+        inline const u_int64_t rid() const { return _deq_hdr._hdr._rid; }
         inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
         const size_t get_xid(void** const xidpp);
         std::string& str(std::string& str) const;

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-12 22:20:26 UTC (rev 1033)
@@ -367,13 +367,20 @@
     {
         u_int16_t fid = rd._ffid;
         std::ifstream ifs;
-        while (rcvr_get_next_record(fid, &ifs, rd, prep_txn_list));
+        while (rcvr_get_next_record(fid, &ifs, rd));
+		std::vector<std::string> xid_list;
+		_tmap.xid_list(xid_list);
+		for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
+		{
+		    std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(), prep_txn_list.end(), *itr);
+			if (pitr == prep_txn_list.end())
+				_tmap.get_remove_tdata_list(*itr);
+		}
     }
 }
 
 const bool
-jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
-        const std::vector<std::string>& prep_txn_list) throw (jexception)
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception)
 {
     size_t cum_size_read = 0;
     bool done = false;
@@ -385,24 +392,22 @@
     {
         case RHM_JDAT_ENQ_MAGIC:
             {
-//std::cout << " e" << h._rid << std::flush;
+std::cout << " e" << h._rid << std::flush;
                 enq_rec er;
                 while (!done)
                 {
+std::cout << "*" << std::flush;
                     done = er.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 rd._enq_cnt_list[fid]++;
                 if (er.xid_size())
                 {
-//std::cout << "$" << std::flush;
+std::cout << "$" << std::flush;
                     er.get_xid(&xidp);
                     assert(xidp != NULL);
                     std::string xid((char*)xidp, er.xid_size());
-                    std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
-                            prep_txn_list.end(), xid);
-                    if (cit != prep_txn_list.end())
-                        _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+                    _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
                     free(xidp);
                 }
                 else
@@ -413,26 +418,24 @@
             break;
         case RHM_JDAT_DEQ_MAGIC:
             {
-//std::cout << " d" << h._rid << std::flush;
+std::cout << " d" << h._rid << std::flush;
                 deq_rec dr;
                 while (!done)
                 {
+std::cout << "*" << std::flush;
                     done = dr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 if (dr.xid_size())
                 {
-//std::cout << "$" << std::flush;
+std::cout << "$" << std::flush;
                     // If the enqueue is part of a pending txn, it will not yet be in emap
                     try { _emap.lock(h._rid); }
                     catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
                     dr.get_xid(&xidp);
                     assert(xidp != NULL);
                     std::string xid((char*)xidp, dr.xid_size());
-                    std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
-                            prep_txn_list.end(), xid);
-                    if (cit != prep_txn_list.end())
-                        _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
+                    _tmap.insert_txn_data(xid, txn_data(dr.deq_rid(), fid, false));
                     free(xidp);
                 }
                 else
@@ -454,10 +457,11 @@
             break;
         case RHM_JDAT_TXA_MAGIC:
             {
-//std::cout << " a" << h._rid << std::flush;
+std::cout << " a" << h._rid << std::flush;
                 txn_rec ar;
                 while (!done)
                 {
+std::cout << "*" << std::flush;
                     done = ar.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
@@ -465,22 +469,17 @@
                 ar.get_xid(&xidp);
                 assert(xidp != NULL);
                 std::string xid((char*)xidp, ar.xid_size());
-                std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
-                        prep_txn_list.end(), xid);
-                if (cit != prep_txn_list.end())
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                 {
-                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                    for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                    try { _emap.unlock(itr->_rid); }
+                    catch(jexception e)
                     {
-                        try { _emap.unlock(itr->_rid); }
-                        catch(jexception e)
-                        {
-                            if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
-                                throw e;
-                        }
-                        if (itr->_enq_flag)
-                            _wrfc.decr_enqcnt(itr->_fid);
+                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                            throw e;
                     }
+                    if (itr->_enq_flag)
+                        _wrfc.decr_enqcnt(itr->_fid);
                 }
                 free(xidp);
                 if (rd._h_rid < h._rid)
@@ -489,10 +488,11 @@
             break;
         case RHM_JDAT_TXC_MAGIC:
             {
-//std::cout << " c" << h._rid << std::flush;
+std::cout << " c" << h._rid << std::flush;
                 txn_rec cr;
                 while (!done)
                 {
+std::cout << "*" << std::flush;
                     done = cr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
@@ -500,22 +500,20 @@
                 cr.get_xid(&xidp);
                 assert(xidp != NULL);
                 std::string xid((char*)xidp, cr.xid_size());
-                std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
-                        prep_txn_list.end(), xid);
-                if (cit != prep_txn_list.end())
+std::cout << "@"  << std::flush;
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                 {
-                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                    for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+std::cout << " enq_flag=" << itr->_enq_flag << std::flush;
+                    if (itr->_enq_flag) // txn enqueue
+                        _emap.insert_fid(itr->_rid, itr->_fid);
+                    else // txn dequeue
                     {
-                        if (itr->_enq_flag) // txn enqueue
-                            _emap.insert_fid(itr->_rid, itr->_fid);
-                        else // txn dequeue
-                        {
-                            u_int16_t fid = _emap.get_remove_fid(h._rid, true);
-                            _wrfc.decr_enqcnt(fid);
-                        }
+                        u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
+                        _wrfc.decr_enqcnt(fid);
                     }
                 }
+				
                 free(xidp);
                 if (rd._h_rid < h._rid)
                     rd._h_rid = h._rid;
@@ -523,18 +521,18 @@
             break;
         case RHM_JDAT_EMPTY_MAGIC:
             {
-//std::cout << " x" << std::flush;
+std::cout << " x" << std::flush;
                 u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
                 ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
             }
         break;
         case 0:
-//std::cout << " z" << std::flush;
+std::cout << " z" << std::flush;
             rd._lfid = fid;
             rd._eo = ifsp->tellg();
             return false;
         default:
-//std::cout << " ?" << std::flush;
+std::cout << " ?" << std::flush;
             std::stringstream ss;
             ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
             throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-12 22:20:26 UTC (rev 1033)
@@ -614,8 +614,7 @@
         void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
                 throw (jexception);
 
-        const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
-                const std::vector<std::string>& prep_txn_list) throw (jexception);
+        const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception);
 
         const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
                 const bool jump_fro);

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-12 22:20:26 UTC (rev 1033)
@@ -263,8 +263,15 @@
     }
 
     // TODO: Tidy this up!
-    u_int64_t rid = initialize_rid(cont, dtokp);
+//    u_int64_t rid = initialize_rid(cont, dtokp);
+//    _deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
+    u_int64_t rid = dtokp->getSourceMessage() ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
     u_int64_t dequeue_rid = dtokp->getSourceMessage() ? dtokp->dequeue_rid() : dtokp->rid();
+	if (!dtokp->getSourceMessage())
+	{
+		dtokp->set_rid(rid);
+		dtokp->set_dequeue_rid(dequeue_rid);
+	}
     _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len);
     if (!cont)
     {
@@ -303,11 +310,11 @@
                 try { _emap.lock(rid); }
                 catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
                 std::string xid((char*)xid_ptr, xid_len);
-                _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), false));
+                _tmap.insert_txn_data(xid, txn_data(dequeue_rid, dtokp->fid(), false));
             }
             else
             {
-                u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
+                u_int16_t fid = _emap.get_remove_fid(dtokp->dequeue_rid());
                 _wrfc.decr_enqcnt(fid);
             }
 #endif
@@ -399,7 +406,7 @@
     else
         _abort_busy = true;
 
-    u_int64_t rid = initialize_rid(cont, dtokp);
+    u_int64_t rid = dtokp->rid() ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
     _txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len);
     if (!cont)
     {
@@ -538,7 +545,7 @@
     else
         _commit_busy = true;
 
-    u_int64_t rid = initialize_rid(cont, dtokp);
+    u_int64_t rid = dtokp->rid() ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
     _txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len);
     if (!cont)
     {
@@ -806,7 +813,7 @@
                     break;
                 default:
                     std::stringstream ss;
-                    ss << "dtok_state=" << dtp->wstate_str();
+                    ss << "dtok_id=" << dtp->id() << "dtok_state=" << dtp->wstate_str();
                     throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(), "wmgr",
                             "get_events");
                 }




More information about the rhmessaging-commits mailing list