[rhmessaging-commits] rhmessaging commits: r3955 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon May 10 14:11:42 EDT 2010


Author: kpvdr
Date: 2010-05-10 14:11:41 -0400 (Mon, 10 May 2010)
New Revision: 3955

Added:
   store/trunk/cpp/lib/jrnl/smutex.cpp
   store/trunk/cpp/lib/jrnl/smutex.hpp
Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/jrnl/arr_cnt.hpp
   store/trunk/cpp/lib/jrnl/cvar.hpp
   store/trunk/cpp/lib/jrnl/data_tok.cpp
   store/trunk/cpp/lib/jrnl/data_tok.hpp
   store/trunk/cpp/lib/jrnl/enq_map.cpp
   store/trunk/cpp/lib/jrnl/enq_map.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jexception.hpp
   store/trunk/cpp/lib/jrnl/slock.hpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/tests/jrnl/jtt/data_src.cpp
   store/trunk/cpp/tests/jrnl/jtt/data_src.hpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
Log:
Thread lock reorganization to solve static thread protection problem

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,9 +43,9 @@
 qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
 u_int32_t JournalImpl::cnt = 0;
 
-void InactivityFireEvent::fire() { slock s(&_ife_mutex); if (parent) parent->flushFire(); }
+void InactivityFireEvent::fire() { slock s(_ife_mutex); if (_parent) _parent->flushFire(); }
 
-void GetEventsFireEvent::fire() { slock s(&_gefe_mutex); if (parent) parent->getEventsFire(); }
+void GetEventsFireEvent::fire() { slock s(_gefe_mutex); if (_parent) _parent->getEventsFire(); }
 
 JournalImpl::JournalImpl(const std::string& journalId,
                          const std::string& journalDirectory,
@@ -66,7 +66,6 @@
                          _agent(a),
                          _mgmtObject(0)
 {
-    ::pthread_mutex_init(&_getf_mutex, 0);
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
     if (journalTimerPtr == 0)
@@ -125,7 +124,6 @@
         _mgmtObject = 0;
     }
 
-    ::pthread_mutex_destroy(&_getf_mutex);
     log(LOG_DEBUG, "Destroyed");
 }
 
@@ -505,7 +503,7 @@
 {
     const iores res = jcntl::flush(block_till_aio_cmpl);
     {
-        slock s(&_getf_mutex);
+        slock s(_getf_mutex);
         if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
     }
     return res;
@@ -535,7 +533,7 @@
 void
 JournalImpl::getEventsFire()
 {
-    slock s(&_getf_mutex);
+    slock s(_getf_mutex);
     getEventsTimerSetFlag = false;
     if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
     if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/JournalImpl.h	2010-05-10 18:11:41 UTC (rev 3955)
@@ -28,6 +28,7 @@
 #include "jrnl/enums.hpp"
 #include "jrnl/jcntl.hpp"
 #include "jrnl/slock.hpp"
+#include "jrnl/smutex.hpp"
 #include "DataTokenImpl.h"
 #include "PreparedTransaction.h"
 #include <qpid/broker/PersistableQueue.h>
@@ -45,28 +46,28 @@
 
         class InactivityFireEvent : public qpid::sys::TimerTask
         {
-            JournalImpl*    parent;
-            pthread_mutex_t _ife_mutex;
+            JournalImpl* _parent;
+            mrg::journal::smutex _ife_mutex;
 
         public:
 	        InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::sys::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_ife_mutex, 0); }
-            virtual ~InactivityFireEvent() { ::pthread_mutex_destroy(&_ife_mutex); }
+                qpid::sys::TimerTask(timeout), _parent(p) {}
+            virtual ~InactivityFireEvent() {}
             void fire();
-            inline void cancel() { mrg::journal::slock s(&_ife_mutex); parent = 0; }
+            inline void cancel() { mrg::journal::slock s(_ife_mutex); _parent = 0; }
         };
 
         class GetEventsFireEvent : public qpid::sys::TimerTask
         {
-            JournalImpl*    parent;
-            pthread_mutex_t _gefe_mutex;
+            JournalImpl* _parent;
+            mrg::journal::smutex _gefe_mutex;
 
         public:
 	        GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::sys::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_gefe_mutex, 0); }
-            virtual ~GetEventsFireEvent() { ::pthread_mutex_destroy(&_gefe_mutex); }
+                qpid::sys::TimerTask(timeout), _parent(p) {}
+            virtual ~GetEventsFireEvent() {}
             void fire();
-            inline void cancel() { mrg::journal::slock s(&_gefe_mutex); parent = 0; }
+            inline void cancel() { mrg::journal::slock s(_gefe_mutex); _parent = 0; }
         };
 
         class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
@@ -77,7 +78,7 @@
 
             bool getEventsTimerSetFlag;
             boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
-            pthread_mutex_t _getf_mutex;
+            mrg::journal::smutex _getf_mutex;
 
             u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
             std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/Makefile.am	2010-05-10 18:11:41 UTC (rev 3955)
@@ -80,6 +80,7 @@
   jrnl/rfc.cpp                  \
   jrnl/rrfc.cpp                 \
   jrnl/slock.cpp                \
+  jrnl/smutex.cpp               \
   jrnl/time_ns.cpp              \
   jrnl/txn_map.cpp              \
   jrnl/txn_rec.cpp              \
@@ -115,6 +116,7 @@
   jrnl/rfc.hpp                  \
   jrnl/rrfc.hpp                 \
   jrnl/slock.hpp                \
+  jrnl/smutex.hpp               \
   jrnl/time_ns.hpp              \
   jrnl/txn_hdr.hpp              \
   jrnl/txn_map.hpp              \

Modified: store/trunk/cpp/lib/jrnl/arr_cnt.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/arr_cnt.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/arr_cnt.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -47,6 +47,8 @@
     *     for which the number of elements is unknown, but for which the efficiency of a static
     *     array is required. None of the counts may go below zero.
     */
+
+    // TODO: Replace this class with instance of std::vector<u_int32_t>
     class arr_cnt
     {
     private:

Modified: store/trunk/cpp/lib/jrnl/cvar.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/cvar.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/cvar.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -35,6 +35,7 @@
 #include <cstring>
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
+#include "jrnl/smutex.hpp"
 #include "jrnl/time_ns.hpp"
 #include <pthread.h>
 #include <sstream>
@@ -48,37 +49,35 @@
     class cvar
     {
     private:
-        pthread_mutex_t& _m;
+        const smutex& _sm;
         pthread_cond_t _c;
     public:
-        inline cvar(pthread_mutex_t& m) : _m(m) { ::pthread_cond_init(&_c, 0); }
+        inline cvar(const smutex& sm) : _sm(sm) { ::pthread_cond_init(&_c, 0); }
         inline ~cvar() { ::pthread_cond_destroy(&_c); }
         inline void wait()
         {
-            PTHREAD_CHK(::pthread_cond_wait(&_c, &_m), "pthread_cond_wait", "cvar", "wait");
+            PTHREAD_CHK(::pthread_cond_wait(&_c, _sm.get()), "::pthread_cond_wait", "cvar", "wait");
         }
         inline void timedwait(timespec& ts)
         {
-            PTHREAD_CHK(::pthread_cond_timedwait(&_c, &_m, &ts), "pthread_cond_timedwait", "cvar",
-                    "timedwait");
+            PTHREAD_CHK(::pthread_cond_timedwait(&_c, _sm.get(), &ts), "::pthread_cond_timedwait", "cvar", "timedwait");
         }
         inline bool waitintvl(const long intvl_ns)
         {
             time_ns t; t.now(); t+=intvl_ns;
-            int ret = ::pthread_cond_timedwait(&_c, &_m, &t);
+            int ret = ::pthread_cond_timedwait(&_c, _sm.get(), &t);
             if (ret == ETIMEDOUT)
                 return true;
-            PTHREAD_CHK(ret, "pthread_cond_timedwait", "cvar", "waitintvl");
+            PTHREAD_CHK(ret, "::pthread_cond_timedwait", "cvar", "waitintvl");
             return false;
         }
         inline void signal()
         {
-            PTHREAD_CHK(::pthread_cond_signal(&_c), "pthread_cond_signal", "cvar", "notify");
+            PTHREAD_CHK(::pthread_cond_signal(&_c), "::pthread_cond_signal", "cvar", "notify");
         }
         inline void broadcast()
         {
-            PTHREAD_CHK(::pthread_cond_broadcast(&_c), "pthread_cond_broadcast", "cvar",
-                    "broadcast");
+            PTHREAD_CHK(::pthread_cond_broadcast(&_c), "::pthread_cond_broadcast", "cvar", "broadcast");
         }
     };
 

Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -35,6 +35,7 @@
 #include <iomanip>
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
+#include "jrnl/slock.hpp"
 #include <sstream>
 
 namespace mrg
@@ -45,6 +46,7 @@
 // Static members
 
 u_int64_t data_tok::_cnt = 0;
+smutex data_tok::_mutex;
 
 data_tok::data_tok():
     _wstate(NONE),
@@ -59,16 +61,11 @@
     _dequeue_rid(0),
     _external_rid(false)
 {
-    pthread_mutex_init(&_mutex, 0);
-    pthread_mutex_lock(&_mutex);
+    slock s(_mutex);
     _icnt = _cnt++;
-    pthread_mutex_unlock(&_mutex);
 }
 
-data_tok::~data_tok()
-{
-    pthread_mutex_destroy(&_mutex);
-}
+data_tok::~data_tok() {}
 
 const char*
 data_tok::wstate_str() const

Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,6 +43,7 @@
 
 #include <cassert>
 #include <cstddef>
+#include "jrnl/smutex.hpp"
 #include <pthread.h>
 #include <string>
 #include <sys/types.h>
@@ -94,7 +95,7 @@
         };
 
     protected:
-        pthread_mutex_t _mutex;
+        static smutex _mutex;
         static u_int64_t _cnt;
         u_int64_t   _icnt;
         write_state _wstate;        ///< Enqueued / dequeued state of data

Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -46,14 +46,9 @@
 enq_map::enq_map():
         _map(),
         _pfid_enq_cnt()
-{
-    pthread_mutex_init(&_mutex, 0);
-}
+{}
 
-enq_map::~enq_map()
-{
-    pthread_mutex_destroy(&_mutex);
-}
+enq_map::~enq_map() {}
 
 void
 enq_map::set_num_jfiles(const u_int16_t num_jfiles)
@@ -73,7 +68,7 @@
     std::pair<emap_itr, bool> ret;
     emap_data_struct rec(pfid, locked);
     {
-        slock s(&_mutex);
+        slock s(_mutex);
         ret = _map.insert(emap_param(rid, rec));
     }
     if (ret.second == false)
@@ -88,11 +83,8 @@
 u_int16_t
 enq_map::get_pfid(const u_int64_t rid)
 {
-    emap_itr itr;
-    {
-        slock s(&_mutex);
-        itr = _map.find(rid);
-    }
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
@@ -111,7 +103,7 @@
 u_int16_t
 enq_map::get_remove_pfid(const u_int64_t rid, const bool txn_flag)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
@@ -134,7 +126,7 @@
 bool
 enq_map::is_enqueued(const u_int64_t rid, bool ignore_lock)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
         return false;
@@ -146,7 +138,7 @@
 void
 enq_map::lock(const u_int64_t rid)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
@@ -160,7 +152,7 @@
 void
 enq_map::unlock(const u_int64_t rid)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
@@ -174,11 +166,8 @@
 bool
 enq_map::is_locked(const u_int64_t rid)
 {
-    emap_itr itr;
-    {
-        slock s(&_mutex);
-        itr = _map.find(rid);
-    }
+    slock s(_mutex);
+    emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
@@ -193,7 +182,7 @@
 {
     rv.clear();
     {
-        slock s(&_mutex);
+        slock s(_mutex);
         for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
             rv.push_back(itr->first);
     }
@@ -204,7 +193,7 @@
 {
     fv.clear();
     {
-        slock s(&_mutex);
+        slock s(_mutex);
         for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
             fv.push_back(itr->second._pfid);
     }

Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,6 +43,7 @@
 
 #include "jrnl/arr_cnt.hpp"
 #include "jrnl/jexception.hpp"
+#include "jrnl/smutex.hpp"
 #include <map>
 #include <pthread.h>
 #include <vector>
@@ -87,7 +88,7 @@
         typedef emap::iterator emap_itr;
 
         emap _map;
-        pthread_mutex_t _mutex;
+        smutex _mutex;
         arr_cnt _pfid_enq_cnt;
 
     public:

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -73,10 +73,7 @@
     _rmgr(this, _emap, _tmap, _rrfc),
     _wmgr(this, _emap, _tmap, _wrfc),
     _rcvdat()
-{
-    ::pthread_mutex_init(&_wr_mutex, 0);
-    ::pthread_mutex_init(&_gev_mutex, 0);
-}
+{}
 
 jcntl::~jcntl()
 {
@@ -84,8 +81,6 @@
         try { stop(true); }
         catch (const jexception& e) { std::cerr << e << std::endl; }
     _lpmgr.finalize();
-    ::pthread_mutex_destroy(&_gev_mutex);
-    ::pthread_mutex_destroy(&_wr_mutex);
 }
 
 void
@@ -201,7 +196,7 @@
 {
     check_wstatus("enqueue_data_record");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0,
                 0, transient, false), r, dtokp)) ;
@@ -214,7 +209,7 @@
 {
     check_wstatus("enqueue_extern_data_record");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient,
                 true), r, dtokp)) ;
@@ -229,7 +224,7 @@
 {
     check_wstatus("enqueue_tx_data_record");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp,
                 xid.data(), xid.size(), transient, false), r, dtokp)) ;
@@ -243,7 +238,7 @@
 {
     check_wstatus("enqueue_extern_txn_data_record");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(),
                 transient, true), r, dtokp)) ;
@@ -301,7 +296,7 @@
 {
     check_wstatus("dequeue_data");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
         return r;
@@ -313,7 +308,7 @@
 {
     check_wstatus("dequeue_data");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
         return r;
@@ -325,7 +320,7 @@
 {
     check_wstatus("txn_abort");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp)) ;
         return r;
@@ -337,7 +332,7 @@
 {
     check_wstatus("txn_commit");
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         iores r;
         while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp)) ;
         return r;
@@ -347,14 +342,14 @@
 bool
 jcntl::is_txn_synced(const std::string& xid)
 {
-    slock s(&_wr_mutex);
+    slock s(_wr_mutex);
     return _wmgr.is_txn_synced(xid);
 }
 
 u_int32_t
 jcntl::get_wr_events()
 {
-    stlock t(&_wr_mutex);
+    stlock t(_wr_mutex);
     if (t.locked())
         return _wmgr.get_events(pmgr::UNUSED);
     return 0;
@@ -404,7 +399,7 @@
         throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
     iores res;
     {
-        slock s(&_wr_mutex);
+        slock s(_wr_mutex);
         res = _wmgr.flush();
     }
     if (block_till_aio_cmpl)

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -47,6 +47,7 @@
 #include "jrnl/fcntl.hpp"
 #include "jrnl/lpmgr.hpp"
 #include "jrnl/rcvdat.hpp"
+#include "jrnl/smutex.hpp"
 #include "jrnl/rmgr.hpp"
 #include "jrnl/wmgr.hpp"
 #include "jrnl/wrfc.hpp"
@@ -143,8 +144,7 @@
         rmgr _rmgr;                 ///< Read page manager which manages AIO
         wmgr _wmgr;                 ///< Write page manager which manages AIO
         rcvdat _rcvdat;             ///< Recovery data used for recovery
-        pthread_mutex_t _wr_mutex;  ///< Mutex for journal writes
-        pthread_mutex_t _gev_mutex; ///< Mutex for get_events
+        smutex _wr_mutex;           ///< Mutex for journal writes
 
     public:
         /**

Modified: store/trunk/cpp/lib/jrnl/jexception.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/jexception.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,10 +43,13 @@
 }
 }
 
+#include <cerrno>
 #include <cstdio>
 #include <cstdlib>
 #include <cstring>
 #include <exception>
+#include "jrnl/jerrno.hpp"
+#include <sstream>
 #include <string>
 #include <sys/types.h>
 

Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/slock.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -33,12 +33,9 @@
 #ifndef mrg_journal_slock_hpp
 #define mrg_journal_slock_hpp
 
-#include <cerrno>
-#include <cstring>
-#include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
+#include "jrnl/smutex.hpp"
 #include <pthread.h>
-#include <sstream>
 
 namespace mrg
 {
@@ -49,15 +46,15 @@
     class slock
     {
     protected:
-        pthread_mutex_t* _m;
+        const smutex& _sm;
     public:
-        inline slock(pthread_mutex_t* m) : _m(m)
+        inline slock(const smutex& sm) : _sm(sm)
         {
-            PTHREAD_CHK(::pthread_mutex_lock(_m), "pthread_mutex_lock", "slock", "slock");
+            PTHREAD_CHK(::pthread_mutex_lock(_sm.get()), "::pthread_mutex_lock", "slock", "slock");
         }
         inline ~slock()
         {
-            PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock", "slock", "~slock");
+            PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "slock", "~slock");
         }
     };
 
@@ -65,21 +62,19 @@
     class stlock
     {
     protected:
-        pthread_mutex_t* _m;
+        const smutex& _sm;
         bool _locked;
     public:
-        inline stlock(pthread_mutex_t* m) : _m(m), _locked(false)
+        inline stlock(const smutex& sm) : _sm(sm), _locked(false)
         {
-            int ret = ::pthread_mutex_trylock(_m);
+            int ret = ::pthread_mutex_trylock(_sm.get());
             _locked = (ret == 0); // check if lock obtained
-            if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "pthread_mutex_trylock", "stlock",
-                    "stlock");
+            if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "::pthread_mutex_trylock", "stlock", "stlock");
         }
         inline ~stlock()
         {
             if (_locked)
-                PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock", "stlock",
-                        "~stlock");
+                PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()), "::pthread_mutex_unlock", "stlock", "~stlock");
         }
         inline bool locked() const { return _locked; }
     };

Added: store/trunk/cpp/lib/jrnl/smutex.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/smutex.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/smutex.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -0,0 +1,33 @@
+/**
+ * \file smutex.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::smutex (scoped mutex). See
+ * comments in file smutex.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
+
+#include "jrnl/smutex.hpp"

Added: store/trunk/cpp/lib/jrnl/smutex.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/smutex.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/smutex.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -0,0 +1,64 @@
+/**
+ * \file smutex.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal scoped mutex class mrg::journal::smutex.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
+
+
+#ifndef mrg_journal_smutex_hpp
+#define mrg_journal_smutex_hpp
+
+#include "jrnl/jexception.hpp"
+#include <pthread.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+    // Ultra-simple scoped mutex class that allows a posix mutex to be initialized and destroyed with error checks
+    class smutex
+    {
+    protected:
+        mutable pthread_mutex_t _m;
+    public:
+        inline smutex()
+        {
+            PTHREAD_CHK(::pthread_mutex_init(&_m, 0), "::pthread_mutex_init", "smutex", "smutex");
+        }
+        inline virtual ~smutex()
+        {
+            PTHREAD_CHK(::pthread_mutex_destroy(&_m), "::pthread_mutex_destroy", "smutex", "~smutex");
+        }
+        inline pthread_mutex_t* get() const { return &_m; }
+    };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_smutex_hpp

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -56,14 +56,9 @@
 txn_map::txn_map():
         _map(),
         _pfid_txn_cnt()
-{
-    pthread_mutex_init(&_mutex, 0);
-}
+{}
 
-txn_map::~txn_map()
-{
-    pthread_mutex_destroy(&_mutex);
-}
+txn_map::~txn_map() {}
 
 void
 txn_map::set_num_jfiles(const u_int16_t num_jfiles)
@@ -75,7 +70,7 @@
 txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
 {
     bool ok = true;
-    slock s(&_mutex);
+    slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
@@ -94,7 +89,7 @@
 const txn_data_list
 txn_map::get_tdata_list(const std::string& xid)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     return get_tdata_list_nolock(xid);
 }
 
@@ -114,7 +109,7 @@
 const txn_data_list
 txn_map::get_remove_tdata_list(const std::string& xid)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
@@ -132,7 +127,7 @@
 bool
 txn_map::in_map(const std::string& xid)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     xmap_itr itr= _map.find(xid);
     if (itr == _map.end()) // not found in map
         return false;
@@ -142,7 +137,7 @@
 u_int32_t
 txn_map::get_rid_count(const std::string& xid)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
@@ -156,7 +151,7 @@
 u_int32_t
 txn_map::cnt(const bool enq_flag)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     u_int32_t c = 0;
     for (xmap_itr i = _map.begin(); i != _map.end(); i++)
     {
@@ -172,7 +167,7 @@
 u_int32_t
 txn_map::cnt(const std::string& xid, const bool enq_flag)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     u_int32_t c = 0;
     xmap_itr i = _map.find(xid);
     if (i == _map.end()) // not found in map
@@ -188,7 +183,7 @@
 bool
 txn_map::is_txn_synced(const std::string& xid)
 {
-    slock s(&_mutex);
+    slock s(_mutex);
     xmap_itr itr = _map.find(xid);
     if (itr == _map.end()) // not found in map
     {
@@ -214,7 +209,7 @@
     bool ok = true;
     bool found = false;
     {
-        slock s(&_mutex);
+        slock s(_mutex);
         xmap_itr itr = _map.find(xid);
         if (itr == _map.end()) // not found in map
             ok = false;
@@ -245,7 +240,7 @@
 {
     bool found = false;
     {
-        slock s(&_mutex);
+        slock s(_mutex);
         txn_data_list tdl = get_tdata_list_nolock(xid);
         tdl_itr itr = tdl.begin();
         while (itr != tdl.end() && !found)
@@ -268,7 +263,7 @@
 {
     bool found = false;
     {
-        slock s(&_mutex);
+        slock s(_mutex);
         for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
         {
             txn_data_list list = i->second;
@@ -289,7 +284,7 @@
 {
     xv.clear();
     {
-        slock s(&_mutex);
+        slock s(_mutex);
         for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
             xv.push_back(itr->first);
     }

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -42,6 +42,7 @@
 }
 
 #include "jrnl/arr_cnt.hpp"
+#include "jrnl/smutex.hpp"
 #include <map>
 #include <pthread.h>
 #include <string>
@@ -116,7 +117,7 @@
         typedef xmap::iterator xmap_itr;
 
         xmap _map;
-        pthread_mutex_t _mutex;
+        smutex _mutex;
         arr_cnt _pfid_txn_cnt;
 
     public:

Modified: store/trunk/cpp/tests/jrnl/jtt/data_src.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/data_src.cpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/data_src.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -36,7 +36,7 @@
 char data_src::_xid_src[data_src::max_xsize];
 bool data_src::_initialized = data_src::__init();
 u_int64_t data_src::_xid_cnt = 0ULL;
-data_src::mutex data_src::_m;
+mrg::journal::smutex data_src::_sm;
 
 data_src::data_src()
 {}

Modified: store/trunk/cpp/tests/jrnl/jtt/data_src.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/data_src.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/data_src.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -26,6 +26,7 @@
 
 #include <cstddef>
 #include "jrnl/slock.hpp"
+#include "jrnl/smutex.hpp"
 #include <pthread.h>
 #include <string>
 #include <sys/types.h>
@@ -48,16 +49,7 @@
         static char _xid_src[];
         static u_int64_t _xid_cnt;
         static bool _initialized;
-        class mutex
-        {
-        private:
-            pthread_mutex_t _mutex;
-        public:
-            inline mutex() { pthread_mutex_init(&_mutex, 0); }
-            inline ~mutex() { pthread_mutex_destroy(&_mutex); }
-            inline pthread_mutex_t* m() { return &_mutex; }
-        };
-        static mutex _m;
+        static mrg::journal::smutex _sm;
 
     public:
         static const char* get_data(const std::size_t offs);
@@ -65,7 +57,7 @@
 
     private:
         data_src();
-        static u_int64_t get_xid_cnt() { mrg::journal::slock s(_m.m()); return _xid_cnt++; }
+        static u_int64_t get_xid_cnt() { mrg::journal::slock s(_sm); return _xid_cnt++; }
         static const char* get_xid_content(const std::size_t offs);
         static bool __init();
     };

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -55,12 +55,7 @@
         _deq_list_cv(_deq_list_mutex),
         _tcp(),
         _tcrp()
-{
-    pthread_mutex_init(&_rd_aio_mutex, 0);
-    pthread_mutex_init(&_wr_full_mutex, 0);
-    pthread_mutex_init(&_rd_list_mutex, 0);
-    pthread_mutex_init(&_deq_list_mutex, 0);
-}
+{}
 
 jrnl_instance::jrnl_instance(const jrnl_init_params::shared_ptr& p):
         mrg::journal::jcntl(p->jid(), p->jdir(), p->base_filename()),
@@ -76,20 +71,9 @@
         _deq_list_cv(_deq_list_mutex),
         _tcp(),
         _tcrp()
-{
-    pthread_mutex_init(&_rd_aio_mutex, 0);
-    pthread_mutex_init(&_wr_full_mutex, 0);
-    pthread_mutex_init(&_rd_list_mutex, 0);
-    pthread_mutex_init(&_deq_list_mutex, 0);
-}
+{}
 
-jrnl_instance::~jrnl_instance()
-{
-    pthread_mutex_destroy(&_rd_aio_mutex);
-    pthread_mutex_destroy(&_wr_full_mutex);
-    pthread_mutex_destroy(&_rd_list_mutex);
-    pthread_mutex_destroy(&_deq_list_mutex);
-}
+jrnl_instance::~jrnl_instance() {}
 
 
 void
@@ -209,7 +193,7 @@
                 }
                 else if (get_wr_events() == 0)
                 {
-                    mrg::journal::slock sl(&_wr_full_mutex);
+                    mrg::journal::slock sl(_wr_full_mutex);
                     _wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
                 }
                 break;
@@ -239,7 +223,7 @@
             {
                 journal::data_tok* dtokp = 0;
                 {
-                    mrg::journal::slock sl(&_rd_list_mutex);
+                    mrg::journal::slock sl(_rd_list_mutex);
                     if (_dtok_rd_list.empty())
                         _rd_list_cv.wait();
                     if (!_dtok_rd_list.empty())
@@ -273,7 +257,7 @@
                         {
                         case mrg::journal::RHM_IORES_SUCCESS:
                             {
-                                mrg::journal::slock sl(&_deq_list_mutex);
+                                mrg::journal::slock sl(_deq_list_mutex);
                                 _dtok_deq_list.push_back(dtokp);
                                 _deq_list_cv.broadcast();
                             }
@@ -291,7 +275,7 @@
                         case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
                             if (get_rd_events() == 0)
                             {
-                                mrg::journal::slock sl(&_rd_aio_mutex);
+                                mrg::journal::slock sl(_rd_aio_mutex);
                                 _rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); // MAX_RD_WAIT in ms
                             }
                             break;
@@ -301,7 +285,7 @@
                             oss << "\" returned " << mrg::journal::iores_str(res) << ".";
                             _tcrp->add_exception(oss.str());
                             {
-                                mrg::journal::slock sl(&_deq_list_mutex);
+                                mrg::journal::slock sl(_deq_list_mutex);
                                 _deq_list_cv.broadcast(); // wake up deq thread
                             }
                         }
@@ -326,7 +310,7 @@
             {
                 journal::data_tok* dtokp = 0;
                 {
-                    mrg::journal::slock sl(&_deq_list_mutex);
+                    mrg::journal::slock sl(_deq_list_mutex);
                     if (_dtok_deq_list.empty())
                         _deq_list_cv.wait();
                     if (!_dtok_deq_list.empty())
@@ -426,20 +410,20 @@
             {
                 if (_args_ptr->read_mode.val() == read_arg::NONE)
                 {
-                    mrg::journal::slock sl(&_deq_list_mutex);
+                    mrg::journal::slock sl(_deq_list_mutex);
                     _dtok_deq_list.push_back(dtokp);
                     _deq_list_cv.broadcast();
                 }
                 else
                 {
-                    mrg::journal::slock sl(&_rd_list_mutex);
+                    mrg::journal::slock sl(_rd_list_mutex);
                     _dtok_rd_list.push_back(dtokp);
                     _rd_list_cv.broadcast();
                 }
             }
             else // DEQ
             {
-                mrg::journal::slock sl(&_wr_full_mutex);
+                mrg::journal::slock sl(_wr_full_mutex);
                 _wr_full_cv.broadcast();
             }
         }
@@ -449,7 +433,7 @@
 void
 jrnl_instance::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
 {
-    mrg::journal::slock sl(&_rd_aio_mutex);
+    mrg::journal::slock sl(_rd_aio_mutex);
     _rd_aio_cv.broadcast();
 }
 

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp	2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp	2010-05-10 18:11:41 UTC (rev 3955)
@@ -33,6 +33,7 @@
 #include "jrnl/data_tok.hpp"
 #include "jrnl/jcntl.hpp"
 #include "jrnl/slock.hpp"
+#include "jrnl/smutex.hpp"
 #include <list>
 #include <vector>
 
@@ -54,14 +55,14 @@
         std::vector<dtok_ptr> _dtok_master_txn_list;
         std::list<journal::data_tok*> _dtok_rd_list;
         std::list<journal::data_tok*> _dtok_deq_list;
-        pthread_mutex_t _rd_aio_mutex;      ///< Mutex for read aio wait conditions
-        mrg::journal::cvar _rd_aio_cv;      ///< Condition var for read aio wait conditions
-        pthread_mutex_t _wr_full_mutex;     ///< Mutex for write full conditions
-        mrg::journal::cvar _wr_full_cv;     ///< Condition var for write full conditions
-        pthread_mutex_t _rd_list_mutex;     ///< Mutex for _dtok_rd_list
-        mrg::journal::cvar _rd_list_cv;     ///< Condition var for _dtok_rd_list
-        pthread_mutex_t _deq_list_mutex;    ///< Mutex for _dtok_deq_list
-        mrg::journal::cvar _deq_list_cv;    ///< Condition var for _dtok_deq_list
+        mrg::journal::smutex _rd_aio_mutex;     ///< Mutex for read aio wait conditions
+        mrg::journal::cvar _rd_aio_cv;          ///< Condition var for read aio wait conditions
+        mrg::journal::smutex _wr_full_mutex;    ///< Mutex for write full conditions
+        mrg::journal::cvar _wr_full_cv;         ///< Condition var for write full conditions
+        mrg::journal::smutex _rd_list_mutex;    ///< Mutex for _dtok_rd_list
+        mrg::journal::cvar _rd_list_cv;         ///< Condition var for _dtok_rd_list
+        mrg::journal::smutex _deq_list_mutex;   ///< Mutex for _dtok_deq_list
+        mrg::journal::cvar _deq_list_cv;        ///< Condition var for _dtok_deq_list
         pthread_t _enq_thread;
         pthread_t _deq_thread;
         pthread_t _read_thread;



More information about the rhmessaging-commits mailing list