[rhmessaging-commits] rhmessaging commits: r1021 - store/trunk/cpp/lib/jrnl.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Oct 12 11:08:10 EDT 2007


Author: kpvdr
Date: 2007-10-12 11:08:10 -0400 (Fri, 12 Oct 2007)
New Revision: 1021

Modified:
   store/trunk/cpp/lib/jrnl/enq_map.cpp
   store/trunk/cpp/lib/jrnl/enq_map.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
Log:
Corrected is_txn_synced() to take into account the commit/abort record as well as the enqueues and dequeues

Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp	2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp	2007-10-12 15:08:10 UTC (rev 1021)
@@ -59,7 +59,7 @@
 }
 
 void
-enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw (jexception)
+enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked) throw (jexception)
 {
     fid_lock_pair rec(fid, locked);
     pthread_mutex_lock(&_mutex);
@@ -95,7 +95,7 @@
 }
 
 const u_int16_t
-enq_map::get_remove_fid(const u_int64_t rid, const bool tx_flag) throw (jexception)
+enq_map::get_remove_fid(const u_int64_t rid, const bool txn_flag) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
     emap_itr itr = _map.find(rid);
@@ -106,7 +106,7 @@
         ss << std::hex << "rid=0x" << rid;
         throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map", "get_remove_fid");
     }
-    if (itr->second.second && !tx_flag) // locked
+    if (itr->second.second && !txn_flag) // locked, but not a commit/abort
     {
         pthread_mutex_unlock(&_mutex);
         std::stringstream ss;

Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp	2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp	2007-10-12 15:08:10 UTC (rev 1021)
@@ -72,9 +72,11 @@
         ~enq_map();
 
         void insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception);
-        void insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw (jexception);
+        void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked)
+                throw (jexception);
         const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
-        const u_int16_t get_remove_fid(const u_int64_t rid, const bool tx_flag=false) throw (jexception);
+        const u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false)
+                throw (jexception);
         void lock(const u_int64_t rid) throw (jexception);
         void unlock(const u_int64_t rid) throw (jexception);
         const bool is_locked(const u_int64_t rid) throw (jexception);

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-12 15:08:10 UTC (rev 1021)
@@ -257,7 +257,7 @@
 const bool
 jcntl::is_txn_synced(const std::string& xid) throw (jexception)
 {
-    return _tmap.is_txn_synced(xid);
+    return _wmgr.is_txn_synced(xid);
 }
 
 const u_int32_t

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-12 15:08:10 UTC (rev 1021)
@@ -54,7 +54,8 @@
         _enq_busy(false),
         _deq_busy(false),
         _abort_busy(false),
-        _commit_busy(false)
+        _commit_busy(false),
+        _txn_pending_set()
 {}
 
 wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc, std::deque<data_tok*>* const dtokl,
@@ -70,7 +71,8 @@
         _enq_busy(false),
         _deq_busy(false),
         _abort_busy(false),
-        _commit_busy(false)
+        _commit_busy(false),
+        _txn_pending_set()
 {}
 
 wmgr::~wmgr()
@@ -438,6 +440,13 @@
                 if (itr->_enq_flag)
                     _wrfc.decr_enqcnt(itr->_fid);
             }
+            std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+            if (!res.second)
+            {
+                std::stringstream ss;
+                ss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
+                throw jexception(jerrno::JERR_MAP_DUPLICATE, ss.str(), "wmgr", "abort");
+            }
 #endif
 
             done = true;
@@ -573,6 +582,13 @@
                     _wrfc.decr_enqcnt(fid);
                 }
             }
+            std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
+            if (!res.second)
+            {
+                std::stringstream ss;
+                ss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
+                throw jexception(jerrno::JERR_MAP_DUPLICATE, ss.str(), "wmgr", "commit");
+            }
 #endif
 
             done = true;
@@ -747,23 +763,44 @@
             for (u_int32_t k=0; k<s; k++)
             {
                 data_tok* dtp = pcbp->_pdtokl->at(k);
+                std::set<std::string>::iterator it;
                 switch (dtp->wstate())
                 {
                 case data_tok::ENQ_SUBM:
                     assert(dtp->wstate() == data_tok::ENQ_SUBM);
                     dtp->set_wstate(data_tok::ENQ);
+                    if (dtp->has_xid())
+                        _tmap.set_aio_compl(dtp->xid(), dtp->rid());
                     break;
                 case data_tok::DEQ_SUBM:
                     assert(dtp->wstate() == data_tok::DEQ_SUBM);
                     dtp->set_wstate(data_tok::DEQ);
+                    if (dtp->has_xid())
+                        _tmap.set_aio_compl(dtp->xid(), dtp->rid());
                     break;
                 case data_tok::ABORT_SUBM:
                     assert(dtp->wstate() == data_tok::ABORT_SUBM);
                     dtp->set_wstate(data_tok::ABORTED);
+                    it = _txn_pending_set.find(dtp->xid());
+                    if (it == _txn_pending_set.end())
+                    {
+                        std::stringstream ss;
+                        ss << std::hex << "_txn_pending_set: abort xid=\"" << dtp->xid() << "\"";
+                        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "wmgr", "get_events");
+                    }
+                    _txn_pending_set.erase(it);
                     break;
                 case data_tok::COMMIT_SUBM:
                     assert(dtp->wstate() == data_tok::COMMIT_SUBM);
                     dtp->set_wstate(data_tok::COMMITTED);
+                    it = _txn_pending_set.find(dtp->xid());
+                    if (it == _txn_pending_set.end())
+                    {
+                        std::stringstream ss;
+                        ss << std::hex << "_txn_pending_set: commit xid=\"" << dtp->xid() << "\"";
+                        throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "wmgr", "get_events");
+                    }
+                    _txn_pending_set.erase(it);
                     break;
                 default:
                     std::stringstream ss;
@@ -771,8 +808,6 @@
                     throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(), "wmgr",
                             "get_events");
                 }
-                if (dtp->has_xid())
-                    _tmap.set_aio_compl(dtp->xid(), dtp->rid());
                 _dtokl->push_back(dtp);
             }
             tot_data_toks += s;
@@ -805,6 +840,24 @@
     return tot_data_toks;
 }
 
+const bool
+wmgr::is_txn_synced(const std::string& xid) throw (jexception)
+{
+    bool is_synced = true;
+    // Check for outstanding enqueues/dequeues
+    try { is_synced = _tmap.is_txn_synced(xid); }
+    catch (jexception e)
+    {
+        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+            throw e;
+    }
+    if (!is_synced)
+        return false;
+    // Check for outstanding commit/aborts
+    std::set<std::string>::iterator it = _txn_pending_set.find(xid);
+    return it != _txn_pending_set.end();
+}
+
 void
 wmgr::initialize() throw (jexception)
 {

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-10-12 15:08:10 UTC (rev 1021)
@@ -43,6 +43,7 @@
 
 #include <jrnl/pmgr.hpp>
 #include <jrnl/wrfc.hpp>
+#include <set>
 
 namespace rhm
 {
@@ -90,6 +91,7 @@
         enq_rec _enq_rec;               ///< Enqueue record used for encoding/decoding
         deq_rec _deq_rec;               ///< Dequeue record used for encoding/decoding
         txn_rec _txn_rec;               ///< Transaction record used for encoding/decoding
+        std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
         aio_cb _cb;                     ///< Callback function pointer for AIO events
 
     public:
@@ -112,6 +114,7 @@
                 throw (jexception);
         const iores flush();
         const u_int32_t get_events(page_state state) throw (jexception);
+        const bool is_txn_synced(const std::string& xid) throw (jexception);
 
     private:
         void initialize() throw (jexception);




More information about the rhmessaging-commits mailing list