[rhmessaging-commits] rhmessaging commits: r1012 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Oct 11 17:46:52 EDT 2007


Author: kpvdr
Date: 2007-10-11 17:46:52 -0400 (Thu, 11 Oct 2007)
New Revision: 1012

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/deq_rec.cpp
   store/trunk/cpp/lib/jrnl/deq_rec.hpp
   store/trunk/cpp/lib/jrnl/enq_rec.cpp
   store/trunk/cpp/lib/jrnl/enq_rec.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jrec.hpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/lib/jrnl/txn_rec.cpp
   store/trunk/cpp/lib/jrnl/txn_rec.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Added is_synched() calls, other tidy-ups and bugfixes

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -304,7 +304,7 @@
 	
 	          try
 	          {
-                  jQueue->recover(prepared); // start recovery
+                  jQueue->recover(prepared, key.id); // start recovery
                   recoverMessages(txn, registry, queue, prepared, messages); 
 				  jQueue->recover_complete(); // start journal.
 	          } catch (journal::jexception& e) {
@@ -540,11 +540,11 @@
     std::set<string> prepared;
     collectPreparedXids(prepared);
 
-	txn_lock_map enqueues;
- 	txn_lock_map dequeues;
 	//when using the async journal, it will abort unprepaired xids and populate the locked maps
 	if (!usingJrnl()){
-    	std::set<string> known;
+	    txn_lock_map enqueues;
+ 	    txn_lock_map dequeues;
+        std::set<string> known;
     	readXids(enqueueXidDb, known);
     	readXids(dequeueXidDb, known);
 
@@ -557,12 +557,19 @@
     	}
 	    readLockedMappings(enqueueXidDb, enqueues);
  	    readLockedMappings(dequeueXidDb, dequeues);
-	}
-
-  	for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
-  	    txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
-  	}
-	
+  	    for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+  	        txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
+  	    }
+	} else {
+  	    for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+            LockedMappings::shared_ptr enq_ptr;
+            enq_ptr.reset(new LockedMappings);
+            LockedMappings::shared_ptr deq_ptr;
+            deq_ptr.reset(new LockedMappings);
+  	        txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+  	    }
+        
+    }
 }
 
 void BdbMessageStore::readXids(Db& db, std::set<string>& xids)

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-10-11 21:46:52 UTC (rev 1012)
@@ -113,7 +113,7 @@
 	  	  	void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
 	  	  	string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
 	  	  	string getJrnlDir(const char* queueName);
-	  	  	static inline bool usingJrnl() {return false;} // make configurable
+	  	  	static inline bool usingJrnl() {return true;} // make configurable
 	  	  	string getJrnlBaseDir(); 
 
 

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -37,12 +37,29 @@
 void
 JournalImpl::recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
         std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
-        boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw (journal::jexception)
+        boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t queue_id)
+        throw (journal::jexception)
 {
     // Create list of prepared xids
     std::vector<std::string> prep_xid_list;
     for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
-            i != prep_tx_list.end(); i++)
+            i != prep_tx_list.end(); i++) {
         prep_xid_list.push_back(i->xid);
+    }
+
     journal::jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list);
+        
+    // Populate PreparedTransaction lists from _tmap
+    for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+            i != prep_tx_list.end(); i++) {
+        journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+        assert(tdl.size()); // should never be empty
+        for (journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+            if (tdl_itr->_enq_flag) { // enqueue op
+                i->enqueues->add(queue_id, tdl_itr->_rid);
+            } else { // dequeue op
+                i->dequeues->add(queue_id, tdl_itr->_rid);
+            }
+        }
+    }
 }

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-10-11 21:46:52 UTC (rev 1012)
@@ -41,14 +41,14 @@
             ~JournalImpl();
             void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
 			        std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
-                    boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
-                    throw (journal::jexception);
+                    boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+                    u_int64_t queue_id) throw (journal::jexception);
 
-            void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
-                    throw (journal::jexception)
+            void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+                    u_int64_t queue_id) throw (journal::jexception)
             {
                 recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
-                    &aio_wr_callback, prep_tx_list);
+                    &aio_wr_callback, prep_tx_list, queue_id);
             }
         };
 

Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -319,11 +319,18 @@
 }
 
 const bool 
-deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
 {
-    if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+    if (rec_offs) // Contunue decoding xid from previous decode call
     {
-        // TODO
+        ifsp->read((char*)_buff + rec_offs, _deq_hdr._xidsize - rec_offs);
+        size_t size_read = ifsp->gcount();
+        if (size_read < _deq_hdr._xidsize - rec_offs)
+        {
+            assert(ifsp->eof());
+            rec_offs += size_read;
+            return false;
+        }
     }
     else // Start at beginning of record
     {
@@ -347,24 +354,17 @@
             }
             // Decode xid
             ifsp->read((char*)_buff, _deq_hdr._xidsize);
-            if ((size_t)ifsp->gcount() == _deq_hdr._xidsize)
+            size_t size_read = ifsp->gcount();
+            if (size_read < _deq_hdr._xidsize)
             {
-                ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) -
-                        _deq_hdr._xidsize);
-                return true;
+                assert(ifsp->eof());
+                rec_offs = size_read;
+                return false;
             }
-            else
-                ; // TODO
         }
-        else
-        {
-            // Igore rest of record
-            rec_offs_dblks = rec_size_dblks();
-            ifsp->ignore(rec_offs_dblks * JRNL_DBLK_SIZE - sizeof(_deq_hdr));
-            return true;
-        }
     }
-    return false;
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) - _deq_hdr._xidsize);
+    return true;
 }
 
 const size_t

Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -78,8 +78,8 @@
         const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
                 u_int32_t max_size_dblks) throw (jexception);
         // Decode used for recover
-        const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
-                throw (jexception);
+        const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+
         inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
         const size_t get_xid(void** const xidpp);
         std::string& str(std::string& str) const;

Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -428,11 +428,18 @@
 }
 
 const bool
-enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
 {
-    if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+    if (rec_offs) // Contunue decoding xid from previous decode call
     {
-        // TODO
+        ifsp->read((char*)_buff + rec_offs, _enq_hdr._xidsize - rec_offs);
+        size_t size_read = ifsp->gcount();
+        if (size_read < _enq_hdr._xidsize - rec_offs)
+        {
+            assert(ifsp->eof());
+            rec_offs += size_read;
+            return false;
+        }
     }
     else // Start at beginning of record
     {
@@ -462,23 +469,17 @@
             }
             // Decode xid
             ifsp->read((char*)_buff, _enq_hdr._xidsize);
-            if ((size_t)ifsp->gcount() == _enq_hdr._xidsize)
+            size_t size_read = ifsp->gcount();
+            if (size_read < _enq_hdr._xidsize)
             {
-                ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) -
-                        _enq_hdr._xidsize);
-                return true;
+                assert(ifsp->eof());
+                rec_offs = size_read;
+                return false;
             }
-            else
-                ; // TODO
         }
-        else
-        {
-            // Igore rest of record
-            ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr));
-            return true;
-        }
     }
-    return false;
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) - _enq_hdr._xidsize);
+    return true;
 }
 
 const size_t

Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -89,8 +89,8 @@
         const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
                 u_int32_t max_size_dblks) throw (jexception);
         // Decode used for recover
-        const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
-                throw (jexception);
+        const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+
         const size_t get_xid(void** const xidpp);
         const size_t get_data(void** const datapp);
         inline const bool is_transient() const { return _enq_hdr.is_transient(); }

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -255,9 +255,9 @@
 }
 
 const bool
-jcntl::is_txn_synced(const std::string& /*xid*/) throw (jexception)
+jcntl::is_txn_synced(const std::string& xid) throw (jexception)
 {
-    return RHM_IORES_NOTIMPL;
+    return _tmap.is_txn_synced(xid);
 }
 
 const u_int32_t
@@ -375,7 +375,7 @@
 jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
         const std::vector<std::string>& prep_txn_list) throw (jexception)
 {
-    u_int32_t dblks_read = 0;
+    size_t cum_size_read = 0;
     bool done = false;
     void* xidp = NULL;
     hdr h;
@@ -389,7 +389,7 @@
                 enq_rec er;
                 while (!done)
                 {
-                    done = er.rcv_decode(h, ifsp, dblks_read);
+                    done = er.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 rd._enq_cnt_list[fid]++;
@@ -417,7 +417,7 @@
                 deq_rec dr;
                 while (!done)
                 {
-                    done = dr.rcv_decode(h, ifsp, dblks_read);
+                    done = dr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 if (dr.xid_size())
@@ -458,7 +458,7 @@
                 txn_rec ar;
                 while (!done)
                 {
-                    done = ar.rcv_decode(h, ifsp, dblks_read);
+                    done = ar.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 // Delete this txn from tmap, unlock any locked records in emap
@@ -493,7 +493,7 @@
                 txn_rec cr;
                 while (!done)
                 {
-                    done = cr.rcv_decode(h, ifsp, dblks_read);
+                    done = cr.rcv_decode(h, ifsp, cum_size_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
                 // Delete this txn from tmap, process records into emap

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -204,8 +204,8 @@
 
     // class enq_map, txn_map
     _err_map[JERR_MAP_DUPLICATE] = std::string("JERR_MAP_DUPLICATE: "
-            "Attempted to insert enqueue record using duplicate key.");
-    _err_map[JERR_MAP_NOTFOUND] = std::string("JERR_MAP_NOTFOUND: Key not found in enqueue map.");
+            "Attempted to insert record into map using duplicate key.");
+    _err_map[JERR_MAP_NOTFOUND] = std::string("JERR_MAP_NOTFOUND: Key not found in map.");
     _err_map[JERR_MAP_LOCKED] = std::string("JERR_MAP_LOCKED: "
             "Record ID locked by a pending transaction.");
 

Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -150,7 +150,7 @@
         virtual const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
                 u_int32_t max_size_dblks) throw (jexception) = 0;
 
-        virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
+        virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs)
                 throw (jexception) = 0;
 
         virtual std::string& str(std::string& str) const = 0;

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -44,7 +44,8 @@
 txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag):
         _rid(rid),
         _fid(fid),
-        _enq_flag(enq_flag)
+        _enq_flag(enq_flag),
+        _aio_compl(false)
 {}
 
 txn_map::txn_map():
@@ -58,21 +59,24 @@
     pthread_mutex_destroy(&_mutex);
 }
 
-void
+const bool
 txn_map::insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception)
 {
+    bool ok = true;
     pthread_mutex_lock(&_mutex);
     xmap_itr itr = _map.find(xid);
-    pthread_mutex_unlock(&_mutex);
     if (itr == _map.end()) // not found in map
     {
         txn_data_list list;
         list.push_back(td);
         std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
-        // TODO: check for failure here?
+        if (!ret.second) // duplicate
+            ok = false;
     }
     else
         itr->second.push_back(td);
+    pthread_mutex_unlock(&_mutex);
+    return ok;
 }
 
 const txn_data_list
@@ -85,7 +89,7 @@
     {
         std::stringstream ss;
         ss << std::hex << "xid=\"" << xid << "\"";
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_fid");
+        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "txn_data_list");
     }
     return itr->second;
 }
@@ -100,7 +104,7 @@
         pthread_mutex_unlock(&_mutex);
         std::stringstream ss;
         ss << std::hex << "xid=\"" << xid << "\"";
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_remove_fid");
+        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_remove_tdata_list");
     }
     txn_data_list list = itr->second;
     _map.erase(itr);
@@ -118,11 +122,69 @@
     {
         std::stringstream ss;
         ss << std::hex << "xid=\"" << xid << "\"";
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_fid");
+        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_rid_count");
     }
     return itr->second.size();
 }
 
+const bool
+txn_map::is_txn_synced(const std::string& xid) throw (jexception)
+{
+    pthread_mutex_lock(&_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+    {
+        pthread_mutex_unlock(&_mutex);
+        std::stringstream ss;
+        ss << std::hex << "xid=\"" << xid << "\"";
+        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "is_txn_synced");
+    }
+    txn_data_list list = itr->second;
+    bool is_synced = true;
+    for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+    {
+        if (!litr->_aio_compl)
+        {
+            is_synced = false;
+            break;
+        }
+    }
+    pthread_mutex_unlock(&_mutex);
+    return is_synced;
+}
+
+const bool
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
+{
+    bool ok = true;
+    bool found = false;
+    pthread_mutex_lock(&_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // not found in map
+        ok = false;
+    else
+    {
+        txn_data_list list = itr->second;
+        for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+        {
+            if (litr->_rid == rid)
+            {
+                found = true;
+                litr->_aio_compl = true;
+                break;
+            }
+        }
+    }
+    pthread_mutex_unlock(&_mutex);
+    if (ok && !found)
+    {
+        std::stringstream ss;
+        ss << std::hex << "xid=\"" << xid << "\" rid=" << rid;
+        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "set_aio_compl");
+    }
+    return ok;
+}
+
 void
 txn_map::xid_list(std::vector<std::string>& xv)
 {

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -53,9 +53,10 @@
 
     struct txn_data_struct
     {
-        u_int64_t _rid;
-        u_int16_t _fid;
-        bool _enq_flag;
+        u_int64_t _rid;     ///< Record id for this operation
+        u_int16_t _fid;     ///< File id, to be used when transferring to emap on commit
+        bool _enq_flag;     ///< If true, enq op, otherwise deq op
+        bool _aio_compl;    ///< Initially false, set to true when AIO returns
         txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
     };
     typedef txn_data_struct txn_data;
@@ -76,10 +77,12 @@
         txn_map();
         ~txn_map();
 
-        void insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception);
+        const bool insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception);
         const txn_data_list get_tdata_list(const std::string& xid) throw (jexception);
         const txn_data_list get_remove_tdata_list(const std::string& xid) throw (jexception);
         const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
+        const bool is_txn_synced(const std::string& xid) throw (jexception);
+        const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
         inline void clear() { _map.clear(); }
         inline const bool empty() const { return _map.empty(); }
         inline const u_int16_t size() const { return (u_int16_t)_map.size(); }

Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -314,11 +314,18 @@
 }
 
 const bool 
-txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
 {
-    if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+    if (rec_offs) // Contunue decoding xid from previous decode call
     {
-        // TODO
+        ifsp->read((char*)_buff + rec_offs, _txn_hdr._xidsize - rec_offs);
+        size_t size_read = ifsp->gcount();
+        if (size_read < _txn_hdr._xidsize - rec_offs)
+        {
+            assert(ifsp->eof());
+            rec_offs += size_read;
+            return false;
+        }
     }
     else // Start at beginning of record
     {
@@ -339,15 +346,16 @@
         }
         // Decode xid
         ifsp->read((char*)_buff, _txn_hdr._xidsize);
-        if ((size_t)ifsp->gcount() == _txn_hdr._xidsize)
+        size_t size_read = ifsp->gcount();
+        if (size_read < _txn_hdr._xidsize)
         {
-            ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
-            return true;
+            assert(ifsp->eof());
+            rec_offs = size_read;
+            return false;
         }
-        else
-            ; // TODO
     }
-    return false;
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
+    return true;
 }
 
 const size_t

Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -78,8 +78,8 @@
         const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
                 u_int32_t max_size_dblks) throw (jexception);
         // Decode used for recover
-        const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
-                throw (jexception);
+        const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+
         const size_t get_xid(void** const xidpp);
         std::string& str(std::string& str) const;
         inline const size_t data_size() const { return 0; } // This record never carries data

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-11 21:46:52 UTC (rev 1012)
@@ -123,7 +123,6 @@
         _enq_busy = true;
 
     u_int64_t rid = initialize_rid(cont, dtokp);
-    
     _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient);
     if (!cont)
     {
@@ -262,8 +261,14 @@
     }
 
     u_int64_t rid = initialize_rid(cont, dtokp);
-    
     _deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
+    if (!cont)
+    {
+        if (xid_len)
+            dtokp->set_xid(xid_ptr, xid_len);
+        else
+            dtokp->clear_xid();
+    }
     bool done = false;
     while (!done)
     {
@@ -392,6 +397,14 @@
 
     u_int64_t rid = initialize_rid(cont, dtokp);
     _txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len);
+    if (!cont)
+    {
+        dtokp->set_rid(rid);
+        if (xid_len)
+            dtokp->set_xid(xid_ptr, xid_len);
+        else
+            dtokp->clear_xid();
+    }
     bool done = false;
     while (!done)
     {
@@ -516,6 +529,14 @@
 
     u_int64_t rid = initialize_rid(cont, dtokp);
     _txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len);
+    if (!cont)
+    {
+        dtokp->set_rid(rid);
+        if (xid_len)
+            dtokp->set_xid(xid_ptr, xid_len);
+        else
+            dtokp->clear_xid();
+    }
     bool done = false;
     while (!done)
     {
@@ -750,6 +771,8 @@
                     throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(), "wmgr",
                             "get_events");
                 }
+                if (dtp->has_xid())
+                    _tmap.set_aio_compl(dtp->xid(), dtp->rid());
                 _dtokl->push_back(dtp);
             }
             tot_data_toks += s;
@@ -766,6 +789,7 @@
         }
         else // File header writes have no pcb
         {
+            // get fid from original file header record, update pointers for that fid
             file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
             u_int32_t fid = fhp->_fid;
             nlfh* nlfhp = _wrfc.file_handle(fid);




More information about the rhmessaging-commits mailing list