[rhmessaging-commits] rhmessaging commits: r4027 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Jun 15 14:47:09 EDT 2010


Author: kpvdr
Date: 2010-06-15 14:47:09 -0400 (Tue, 15 Jun 2010)
New Revision: 4027

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Refactor to remove exceptions from tmap execution path

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2010-06-15 18:47:09 UTC (rev 4027)
@@ -1,5 +1,5 @@
 /*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
 
  This file is part of the Qpid async store library msgstore.so.
 
@@ -216,21 +216,14 @@
     if (prep_tx_list_ptr)
     {
         for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
-            try {
-                txn_data_list tdl = _tmap.get_tdata_list(i->xid);
-                assert(tdl.size()); // should never be empty
-                for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
-                    if (tdl_itr->_enq_flag) { // enqueue op
-                        i->enqueues->add(queue_id, tdl_itr->_rid);
-                    } else { // dequeue op
-                        i->dequeues->add(queue_id, tdl_itr->_drid);
-                    }
+            txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
+            for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+                if (tdl_itr->_enq_flag) { // enqueue op
+                    i->enqueues->add(queue_id, tdl_itr->_rid);
+                } else { // dequeue op
+                    i->dequeues->add(queue_id, tdl_itr->_drid);
                 }
             }
-            catch (const jexception& e) {
-                if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
-                    throw;
-            }
         }
     }
     std::ostringstream oss2;

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-06-15 18:47:09 UTC (rev 4027)
@@ -1,5 +1,5 @@
 /*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
 
  This file is part of the Qpid async store library msgstore.so.
 
@@ -971,7 +971,7 @@
                         } else {
                             // Enqueue and/or dequeue tx
                             journal::txn_map& tmap = jc->get_txn_map();
-                            journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                            journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
                             bool enq = false;
                             bool deq = false;
                             for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
@@ -1081,13 +1081,8 @@
                 bool is2PC = *(static_cast<char*>(dbuff)) != 0;
 
                 // Check transaction details; add to recover map
-                // NOTE: There is a small but finite probability that the xid read above may have been removed by
-                // another thread on one of the active queues by the time the get_tdata_list() call below is made.
-                // Since reading the TPL is not considered a high-speed operation and is used for recovery and other
-                // infrequent uses, the following try-catch will work as well as attempting to lock down the
-                // entire transaction map for this operation - but with less complexity.
-                try {
-                    journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                journal::txn_data_list txnList = tmap.get_tdata_list(xid); //  txnList will be empty if xid not found
+                if (!txnList.empty()) { // xid found in tmap
                     unsigned enqCnt = 0;
                     unsigned deqCnt = 0;
                     u_int64_t rid = 0;
@@ -1109,12 +1104,6 @@
                     assert(deqCnt <= 1);
                     tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
                 }
-                catch (const journal::jexception& e) {
-                    ::free(xidbuff);
-                    aio_sleep_cnt = 0;
-                    if (e.err_code() == journal::jerrno::JERR_MAP_NOTFOUND) break; // ignore this xid; move on
-                    throw;
-                }
 
                 ::free(xidbuff);
                 aio_sleep_cnt = 0;

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
  *
  * \author Kim van der Riet
  *
- * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
  *
  * This file is part of the Qpid async store library msgstore.so.
  *
@@ -623,7 +623,7 @@
                         std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
                 if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
                 {
-                    txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
                     // Unlock any affected enqueues in emap
                     for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
                     {
@@ -691,7 +691,13 @@
                         assert(xidp != 0);
                         std::string xid((char*)xidp, er.xid_size());
                         _tmap.insert_txn_data(xid, txn_data(h._rid, 0, start_fid, true));
-                        _tmap.set_aio_compl(xid, h._rid);
+                        if (_tmap.set_aio_compl(xid, h._rid)) // xid or rid not found
+                        {
+                            std::ostringstream oss;
+                            oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid;
+                            oss << "\" rid=0x" << h._rid;
+                            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
+                        }
                         std::free(xidp);
                     }
                     else
@@ -718,7 +724,13 @@
                     std::string xid((char*)xidp, dr.xid_size());
                     _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), start_fid, false,
                             dr.is_txn_coml_commit()));
-                    _tmap.set_aio_compl(xid, dr.rid());
+                    if (_tmap.set_aio_compl(xid, dr.rid())) // xid or rid not found
+                    {
+                        std::ostringstream oss;
+                        oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid;
+                        oss << "\" rid=0x" << dr.rid();
+                        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "jcntl", "rcvr_get_next_record");
+                    }
                     std::free(xidp);
                 }
                 else
@@ -746,7 +758,7 @@
                 std::string xid((char*)xidp, ar.xid_size());
                 try
                 {
-                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                     for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                     {
                         if (itr->_enq_flag)
@@ -779,7 +791,7 @@
                 std::string xid((char*)xidp, cr.xid_size());
                 try
                 {
-                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                     for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                     {
                         if (itr->_enq_flag) // txn enqueue

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
  *
  * \author Kim van der Riet
  *
- * Copyright (c) 2007, 2008, 2009 Red Hat Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
  *
  * This file is part of the Qpid async store library msgstore.so.
  *
@@ -66,6 +66,12 @@
     _pfid_txn_cnt.set_size(num_jfiles);
 }
 
+u_int32_t
+txn_map::get_txn_pfid_cnt(const u_int16_t pfid) const
+{
+    return _pfid_txn_cnt.cnt(pfid);
+}
+
 bool
 txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
 {
@@ -98,11 +104,7 @@
 {
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
-    {
-        std::ostringstream oss;
-        oss << std::hex << "xid=" << xid_format(xid);
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_tdata_list_nolock");
-    }
+        return _empty_data_list;
     return itr->second;
 }
 
@@ -112,11 +114,7 @@
     slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
-    {
-        std::ostringstream oss;
-        oss << std::hex << "xid=" << xid_format(xid);
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_remove_tdata_list");
-    }
+        return _empty_data_list;
     txn_data_list list = itr->second;
     _map.erase(itr);
     for (tdl_itr i=list.begin(); i!=list.end(); i++)
@@ -129,26 +127,22 @@
 {
     slock s(_mutex);
     xmap_itr itr= _map.find(xid);
-    if (itr == _map.end()) // not found in map
-        return false;
-    return true;
+    return itr != _map.end();
 }
 
 u_int32_t
-txn_map::get_rid_count(const std::string& xid)
+txn_map::enq_cnt()
 {
-    slock s(_mutex);
-    xmap_itr itr = _map.find(xid);
-    if (itr == _map.end()) // not found in map
-    {
-        std::ostringstream oss;
-        oss << std::hex << "xid=" << xid_format(xid);
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_rid_count");
-    }
-    return itr->second.size();
+    return cnt(true);
 }
 
 u_int32_t
+txn_map::deq_cnt()
+{
+    return cnt(true);
+}
+
+u_int32_t
 txn_map::cnt(const bool enq_flag)
 {
     slock s(_mutex);
@@ -164,33 +158,13 @@
     return c;
 }
 
-u_int32_t
-txn_map::cnt(const std::string& xid, const bool enq_flag)
-{
-    slock s(_mutex);
-    u_int32_t c = 0;
-    xmap_itr i = _map.find(xid);
-    if (i == _map.end()) // not found in map
-        return 0;
-    for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
-    {
-        if (j->_enq_flag == enq_flag)
-            c++;
-    }
-    return c;
-}
-
-bool
+int8_t
 txn_map::is_txn_synced(const std::string& xid)
 {
     slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
-    {
-        std::ostringstream oss;
-        oss << std::hex << "xid=" << xid_format(xid);
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "is_txn_synced");
-    }
+        return -1;
     bool is_synced = true;
     for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
@@ -200,43 +174,30 @@
             break;
         }
     }
-    return is_synced;
+    return is_synced ? 1 : 0;
 }
 
-bool
+int8_t
 txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
 {
-    bool ok = true;
-    bool found = false;
+    slock s(_mutex);
+    xmap_itr itr = _map.find(xid);
+    if (itr == _map.end()) // xid not found in map
+        return -1;
+    for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
     {
-        slock s(_mutex);
-        xmap_itr itr = _map.find(xid);
-        if (itr == _map.end()) // not found in map
-            ok = false;
-        else
+        if (litr->_rid == rid)
         {
-            for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
-            {
-                if (litr->_rid == rid)
-                {
-                    found = true;
-                    litr->_aio_compl = true;
-                    break;
-                }
-            }
+            litr->_aio_compl = true;
+            return 0; // rid found
         }
     }
-    if (ok && !found)
-    {
-        std::ostringstream oss;
-        oss << std::hex << "xid=" << xid_format(xid) << " rid=0x" << rid;
-        throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "set_aio_compl");
-    }
-    return ok;
+    // xid present, but rid not found
+    return -2;
 }
 
-const txn_data&
-txn_map::get_data(const std::string& xid, const u_int64_t rid)
+bool
+txn_map::data_exists(const std::string& xid, const u_int64_t rid)
 {
     bool found = false;
     {
@@ -248,14 +209,8 @@
             found = itr->_rid == rid;
             itr++;
         }
-        if (!found)
-        {
-            std::ostringstream oss;
-            oss << std::hex << "xid=" << xid_format(xid) << " rid=0x" << rid;
-            throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_data");
-        }
-        return *itr;
     }
+    return found;
 }
 
 bool
@@ -290,17 +245,5 @@
     }
 }
 
-// static fn
-std::string
-txn_map::xid_format(const std::string& xid)
-{
-    if (xid.size() < 100)
-        return xid;
-    std::ostringstream oss;
-    oss << "\"" << xid.substr(0, 20) << " ... " << xid.substr(xid.size() - 20, 20);
-    oss << "\" [size: " << xid.size() << "]";
-    return oss.str();
-}
-
 } // namespace journal
 } // namespace mrg

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
  *
  * \author Kim van der Riet
  *
- * Copyright (c) 2007, 2008, 2009 Red Hat Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
  *
  * This file is part of the Qpid async store library msgstore.so.
  *
@@ -119,37 +119,30 @@
         xmap _map;
         smutex _mutex;
         arr_cnt _pfid_txn_cnt;
+        const txn_data_list _empty_data_list;
 
     public:
         txn_map();
         virtual ~txn_map();
 
         void set_num_jfiles(const u_int16_t num_jfiles);
-        inline u_int32_t get_txn_pfid_cnt(const u_int16_t pfid) const
-                { return _pfid_txn_cnt.cnt(pfid); };
-
+        u_int32_t get_txn_pfid_cnt(const u_int16_t pfid) const;
         bool insert_txn_data(const std::string& xid, const txn_data& td);
         const txn_data_list get_tdata_list(const std::string& xid);
         const txn_data_list get_remove_tdata_list(const std::string& xid);
         bool in_map(const std::string& xid);
-        u_int32_t get_rid_count(const std::string& xid);
-        inline u_int32_t enq_cnt() { return cnt(true); }
-        inline u_int32_t enq_cnt(const std::string& xid) { return cnt(xid, true); }
-        inline u_int32_t deq_cnt() { return cnt(true); }
-        inline u_int32_t deq_cnt(const std::string& xid) { return cnt(xid, false); }
-        bool is_txn_synced(const std::string& xid);
-        bool set_aio_compl(const std::string& xid, const u_int64_t rid);
-        const txn_data& get_data(const std::string& xid, const u_int64_t rid);
+        u_int32_t enq_cnt();
+        u_int32_t deq_cnt();
+        int8_t is_txn_synced(const std::string& xid); // -1=xid not found; 0=not synced; 1=synced
+        int8_t set_aio_compl(const std::string& xid, const u_int64_t rid); // -2=rid not found; -1=xid not found; 0=done
+        bool data_exists(const std::string& xid, const u_int64_t rid);
         bool is_enq(const u_int64_t rid);
         inline void clear() { _map.clear(); }
         inline bool empty() const { return _map.empty(); }
-        inline u_int32_t size() const { return u_int32_t(_map.size()); }
+        inline size_t size() const { return _map.size(); }
         void xid_list(std::vector<std::string>& xv);
     private:
         u_int32_t cnt(const bool enq_flag);
-        u_int32_t cnt(const std::string& xid, const bool enq_flag);
-        static std::string xid_format(const std::string& xid);
-
         const txn_data_list get_tdata_list_nolock(const std::string& xid);
     };
 

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
  *
  * \author Kim van der Riet
  *
- * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
  *
  * This file is part of the Qpid async store library msgstore.so.
  *
@@ -372,7 +372,7 @@
 
             // Delete this txn from tmap, unlock any locked records in emap
             std::string xid((char*)xid_ptr, xid_len);
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+            txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
             for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
             {
                 try
@@ -469,7 +469,7 @@
 
             // Delete this txn from tmap, process records into emap
             std::string xid((char*)xid_ptr, xid_len);
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+            txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
             for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
             {
                 if (itr->_enq_flag) // txn enqueue
@@ -684,6 +684,8 @@
                         tot_data_toks++;
                         dtokp->set_wstate(data_tok::ENQ);
                         if (dtokp->has_xid())
+                            // Ignoring return value here. A non-zero return can signify that the transaction
+                            // has committed or aborted, and which was completed prior to the aio returning.
                             _tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
                         break;
                     case data_tok::DEQ_SUBM:
@@ -691,6 +693,7 @@
                         tot_data_toks++;
                         dtokp->set_wstate(data_tok::DEQ);
                         if (dtokp->has_xid())
+                            // Ignoring return value - see note above.
                             _tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
                         break;
                     case data_tok::ABORT_SUBM:
@@ -772,14 +775,7 @@
 bool
 wmgr::is_txn_synced(const std::string& xid)
 {
-    bool is_synced = true;
-    // Check for outstanding enqueues/dequeues
-    try { is_synced = _tmap.is_txn_synced(xid); }
-    catch (const jexception& e)
-    {
-        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
-    }
-    if (!is_synced)
+    if (_tmap.is_txn_synced(xid) == 0) // not synced
         return false;
     // Check for outstanding commit/aborts
     std::set<std::string>::iterator it = _txn_pending_set.find(xid);
@@ -898,16 +894,7 @@
         if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
             throw;
         if (xid.size())
-            try
-            {
-                _tmap.get_data(xid, drid); // not in emap, try tmap
-                found = true;
-            }
-            catch (const jexception& e)
-            {
-                if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
-                    throw;
-            }
+            found = _tmap.data_exists(xid, drid);
     }
     if (!found)
     {



More information about the rhmessaging-commits mailing list