Author: kpvdr
Date: 2010-05-10 14:11:41 -0400 (Mon, 10 May 2010)
New Revision: 3955
Added:
store/trunk/cpp/lib/jrnl/smutex.cpp
store/trunk/cpp/lib/jrnl/smutex.hpp
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/arr_cnt.hpp
store/trunk/cpp/lib/jrnl/cvar.hpp
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jexception.hpp
store/trunk/cpp/lib/jrnl/slock.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/tests/jrnl/jtt/data_src.cpp
store/trunk/cpp/tests/jrnl/jtt/data_src.hpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
Log:
Thread lock reorganization to solve static thread protection problem
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,9 +43,9 @@
qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
u_int32_t JournalImpl::cnt = 0;
-void InactivityFireEvent::fire() { slock s(&_ife_mutex); if (parent)
parent->flushFire(); }
+void InactivityFireEvent::fire() { slock s(_ife_mutex); if (_parent)
_parent->flushFire(); }
-void GetEventsFireEvent::fire() { slock s(&_gefe_mutex); if (parent)
parent->getEventsFire(); }
+void GetEventsFireEvent::fire() { slock s(_gefe_mutex); if (_parent)
_parent->getEventsFire(); }
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
@@ -66,7 +66,6 @@
_agent(a),
_mgmtObject(0)
{
- ::pthread_mutex_init(&_getf_mutex, 0);
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
if (journalTimerPtr == 0)
@@ -125,7 +124,6 @@
_mgmtObject = 0;
}
- ::pthread_mutex_destroy(&_getf_mutex);
log(LOG_DEBUG, "Destroyed");
}
@@ -505,7 +503,7 @@
{
const iores res = jcntl::flush(block_till_aio_cmpl);
{
- slock s(&_getf_mutex);
+ slock s(_getf_mutex);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
setGetEventTimer(); }
}
return res;
@@ -535,7 +533,7 @@
void
JournalImpl::getEventsFire()
{
- slock s(&_getf_mutex);
+ slock s(_getf_mutex);
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/JournalImpl.h 2010-05-10 18:11:41 UTC (rev 3955)
@@ -28,6 +28,7 @@
#include "jrnl/enums.hpp"
#include "jrnl/jcntl.hpp"
#include "jrnl/slock.hpp"
+#include "jrnl/smutex.hpp"
#include "DataTokenImpl.h"
#include "PreparedTransaction.h"
#include <qpid/broker/PersistableQueue.h>
@@ -45,28 +46,28 @@
class InactivityFireEvent : public qpid::sys::TimerTask
{
- JournalImpl* parent;
- pthread_mutex_t _ife_mutex;
+ JournalImpl* _parent;
+ mrg::journal::smutex _ife_mutex;
public:
InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout), parent(p) {
::pthread_mutex_init(&_ife_mutex, 0); }
- virtual ~InactivityFireEvent() { ::pthread_mutex_destroy(&_ife_mutex); }
+ qpid::sys::TimerTask(timeout), _parent(p) {}
+ virtual ~InactivityFireEvent() {}
void fire();
- inline void cancel() { mrg::journal::slock s(&_ife_mutex); parent = 0; }
+ inline void cancel() { mrg::journal::slock s(_ife_mutex); _parent = 0; }
};
class GetEventsFireEvent : public qpid::sys::TimerTask
{
- JournalImpl* parent;
- pthread_mutex_t _gefe_mutex;
+ JournalImpl* _parent;
+ mrg::journal::smutex _gefe_mutex;
public:
GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout), parent(p) {
::pthread_mutex_init(&_gefe_mutex, 0); }
- virtual ~GetEventsFireEvent() { ::pthread_mutex_destroy(&_gefe_mutex); }
+ qpid::sys::TimerTask(timeout), _parent(p) {}
+ virtual ~GetEventsFireEvent() {}
void fire();
- inline void cancel() { mrg::journal::slock s(&_gefe_mutex); parent = 0;
}
+ inline void cancel() { mrg::journal::slock s(_gefe_mutex); _parent = 0; }
};
class JournalImpl : public qpid::broker::ExternalQueueStore, public
mrg::journal::jcntl, public mrg::journal::aio_callback
@@ -77,7 +78,7 @@
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
- pthread_mutex_t _getf_mutex;
+ mrg::journal::smutex _getf_mutex;
u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects
out-of-order read requests
std::vector<u_int64_t> oooRidList; // list of out-of-order rids
(greater than current rid) encountered during read sequence
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/Makefile.am 2010-05-10 18:11:41 UTC (rev 3955)
@@ -80,6 +80,7 @@
jrnl/rfc.cpp \
jrnl/rrfc.cpp \
jrnl/slock.cpp \
+ jrnl/smutex.cpp \
jrnl/time_ns.cpp \
jrnl/txn_map.cpp \
jrnl/txn_rec.cpp \
@@ -115,6 +116,7 @@
jrnl/rfc.hpp \
jrnl/rrfc.hpp \
jrnl/slock.hpp \
+ jrnl/smutex.hpp \
jrnl/time_ns.hpp \
jrnl/txn_hdr.hpp \
jrnl/txn_map.hpp \
Modified: store/trunk/cpp/lib/jrnl/arr_cnt.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/arr_cnt.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/arr_cnt.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -47,6 +47,8 @@
* for which the number of elements is unknown, but for which the efficiency of a
static
* array is required. None of the counts may go below zero.
*/
+
+ // TODO: Replace this class with instance of std::vector<u_int32_t>
class arr_cnt
{
private:
Modified: store/trunk/cpp/lib/jrnl/cvar.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/cvar.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/cvar.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -35,6 +35,7 @@
#include <cstring>
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
+#include "jrnl/smutex.hpp"
#include "jrnl/time_ns.hpp"
#include <pthread.h>
#include <sstream>
@@ -48,37 +49,35 @@
class cvar
{
private:
- pthread_mutex_t& _m;
+ const smutex& _sm;
pthread_cond_t _c;
public:
- inline cvar(pthread_mutex_t& m) : _m(m) { ::pthread_cond_init(&_c, 0); }
+ inline cvar(const smutex& sm) : _sm(sm) { ::pthread_cond_init(&_c, 0); }
inline ~cvar() { ::pthread_cond_destroy(&_c); }
inline void wait()
{
- PTHREAD_CHK(::pthread_cond_wait(&_c, &_m),
"pthread_cond_wait", "cvar", "wait");
+ PTHREAD_CHK(::pthread_cond_wait(&_c, _sm.get()),
"::pthread_cond_wait", "cvar", "wait");
}
inline void timedwait(timespec& ts)
{
- PTHREAD_CHK(::pthread_cond_timedwait(&_c, &_m, &ts),
"pthread_cond_timedwait", "cvar",
- "timedwait");
+ PTHREAD_CHK(::pthread_cond_timedwait(&_c, _sm.get(), &ts),
"::pthread_cond_timedwait", "cvar", "timedwait");
}
inline bool waitintvl(const long intvl_ns)
{
time_ns t; t.now(); t+=intvl_ns;
- int ret = ::pthread_cond_timedwait(&_c, &_m, &t);
+ int ret = ::pthread_cond_timedwait(&_c, _sm.get(), &t);
if (ret == ETIMEDOUT)
return true;
- PTHREAD_CHK(ret, "pthread_cond_timedwait", "cvar",
"waitintvl");
+ PTHREAD_CHK(ret, "::pthread_cond_timedwait", "cvar",
"waitintvl");
return false;
}
inline void signal()
{
- PTHREAD_CHK(::pthread_cond_signal(&_c), "pthread_cond_signal",
"cvar", "notify");
+ PTHREAD_CHK(::pthread_cond_signal(&_c),
"::pthread_cond_signal", "cvar", "notify");
}
inline void broadcast()
{
- PTHREAD_CHK(::pthread_cond_broadcast(&_c),
"pthread_cond_broadcast", "cvar",
- "broadcast");
+ PTHREAD_CHK(::pthread_cond_broadcast(&_c),
"::pthread_cond_broadcast", "cvar", "broadcast");
}
};
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -35,6 +35,7 @@
#include <iomanip>
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
+#include "jrnl/slock.hpp"
#include <sstream>
namespace mrg
@@ -45,6 +46,7 @@
// Static members
u_int64_t data_tok::_cnt = 0;
+smutex data_tok::_mutex;
data_tok::data_tok():
_wstate(NONE),
@@ -59,16 +61,11 @@
_dequeue_rid(0),
_external_rid(false)
{
- pthread_mutex_init(&_mutex, 0);
- pthread_mutex_lock(&_mutex);
+ slock s(_mutex);
_icnt = _cnt++;
- pthread_mutex_unlock(&_mutex);
}
-data_tok::~data_tok()
-{
- pthread_mutex_destroy(&_mutex);
-}
+data_tok::~data_tok() {}
const char*
data_tok::wstate_str() const
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,6 +43,7 @@
#include <cassert>
#include <cstddef>
+#include "jrnl/smutex.hpp"
#include <pthread.h>
#include <string>
#include <sys/types.h>
@@ -94,7 +95,7 @@
};
protected:
- pthread_mutex_t _mutex;
+ static smutex _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
write_state _wstate; ///< Enqueued / dequeued state of data
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -46,14 +46,9 @@
enq_map::enq_map():
_map(),
_pfid_enq_cnt()
-{
- pthread_mutex_init(&_mutex, 0);
-}
+{}
-enq_map::~enq_map()
-{
- pthread_mutex_destroy(&_mutex);
-}
+enq_map::~enq_map() {}
void
enq_map::set_num_jfiles(const u_int16_t num_jfiles)
@@ -73,7 +68,7 @@
std::pair<emap_itr, bool> ret;
emap_data_struct rec(pfid, locked);
{
- slock s(&_mutex);
+ slock s(_mutex);
ret = _map.insert(emap_param(rid, rec));
}
if (ret.second == false)
@@ -88,11 +83,8 @@
u_int16_t
enq_map::get_pfid(const u_int64_t rid)
{
- emap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(rid);
- }
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -111,7 +103,7 @@
u_int16_t
enq_map::get_remove_pfid(const u_int64_t rid, const bool txn_flag)
{
- slock s(&_mutex);
+ slock s(_mutex);
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
@@ -134,7 +126,7 @@
bool
enq_map::is_enqueued(const u_int64_t rid, bool ignore_lock)
{
- slock s(&_mutex);
+ slock s(_mutex);
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
return false;
@@ -146,7 +138,7 @@
void
enq_map::lock(const u_int64_t rid)
{
- slock s(&_mutex);
+ slock s(_mutex);
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
@@ -160,7 +152,7 @@
void
enq_map::unlock(const u_int64_t rid)
{
- slock s(&_mutex);
+ slock s(_mutex);
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
@@ -174,11 +166,8 @@
bool
enq_map::is_locked(const u_int64_t rid)
{
- emap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(rid);
- }
+ slock s(_mutex);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -193,7 +182,7 @@
{
rv.clear();
{
- slock s(&_mutex);
+ slock s(_mutex);
for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
rv.push_back(itr->first);
}
@@ -204,7 +193,7 @@
{
fv.clear();
{
- slock s(&_mutex);
+ slock s(_mutex);
for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
fv.push_back(itr->second._pfid);
}
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,6 +43,7 @@
#include "jrnl/arr_cnt.hpp"
#include "jrnl/jexception.hpp"
+#include "jrnl/smutex.hpp"
#include <map>
#include <pthread.h>
#include <vector>
@@ -87,7 +88,7 @@
typedef emap::iterator emap_itr;
emap _map;
- pthread_mutex_t _mutex;
+ smutex _mutex;
arr_cnt _pfid_enq_cnt;
public:
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -73,10 +73,7 @@
_rmgr(this, _emap, _tmap, _rrfc),
_wmgr(this, _emap, _tmap, _wrfc),
_rcvdat()
-{
- ::pthread_mutex_init(&_wr_mutex, 0);
- ::pthread_mutex_init(&_gev_mutex, 0);
-}
+{}
jcntl::~jcntl()
{
@@ -84,8 +81,6 @@
try { stop(true); }
catch (const jexception& e) { std::cerr << e << std::endl; }
_lpmgr.finalize();
- ::pthread_mutex_destroy(&_gev_mutex);
- ::pthread_mutex_destroy(&_wr_mutex);
}
void
@@ -201,7 +196,7 @@
{
check_wstatus("enqueue_data_record");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len,
dtokp, 0,
0, transient, false), r, dtokp)) ;
@@ -214,7 +209,7 @@
{
check_wstatus("enqueue_extern_data_record");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient,
true), r, dtokp)) ;
@@ -229,7 +224,7 @@
{
check_wstatus("enqueue_tx_data_record");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len,
dtokp,
xid.data(), xid.size(), transient, false), r, dtokp)) ;
@@ -243,7 +238,7 @@
{
check_wstatus("enqueue_extern_txn_data_record");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(),
xid.size(),
transient, true), r, dtokp)) ;
@@ -301,7 +296,7 @@
{
check_wstatus("dequeue_data");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
return r;
@@ -313,7 +308,7 @@
{
check_wstatus("dequeue_data");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(),
txn_coml_commit), r, dtokp)) ;
return r;
@@ -325,7 +320,7 @@
{
check_wstatus("txn_abort");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp)) ;
return r;
@@ -337,7 +332,7 @@
{
check_wstatus("txn_commit");
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp)) ;
return r;
@@ -347,14 +342,14 @@
bool
jcntl::is_txn_synced(const std::string& xid)
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
return _wmgr.is_txn_synced(xid);
}
u_int32_t
jcntl::get_wr_events()
{
- stlock t(&_wr_mutex);
+ stlock t(_wr_mutex);
if (t.locked())
return _wmgr.get_events(pmgr::UNUSED);
return 0;
@@ -404,7 +399,7 @@
throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl",
"flush");
iores res;
{
- slock s(&_wr_mutex);
+ slock s(_wr_mutex);
res = _wmgr.flush();
}
if (block_till_aio_cmpl)
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -47,6 +47,7 @@
#include "jrnl/fcntl.hpp"
#include "jrnl/lpmgr.hpp"
#include "jrnl/rcvdat.hpp"
+#include "jrnl/smutex.hpp"
#include "jrnl/rmgr.hpp"
#include "jrnl/wmgr.hpp"
#include "jrnl/wrfc.hpp"
@@ -143,8 +144,7 @@
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
rcvdat _rcvdat; ///< Recovery data used for recovery
- pthread_mutex_t _wr_mutex; ///< Mutex for journal writes
- pthread_mutex_t _gev_mutex; ///< Mutex for get_events
+ smutex _wr_mutex; ///< Mutex for journal writes
public:
/**
Modified: store/trunk/cpp/lib/jrnl/jexception.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/jexception.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -43,10 +43,13 @@
}
}
+#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <exception>
+#include "jrnl/jerrno.hpp"
+#include <sstream>
#include <string>
#include <sys/types.h>
Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/slock.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -33,12 +33,9 @@
#ifndef mrg_journal_slock_hpp
#define mrg_journal_slock_hpp
-#include <cerrno>
-#include <cstring>
-#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
+#include "jrnl/smutex.hpp"
#include <pthread.h>
-#include <sstream>
namespace mrg
{
@@ -49,15 +46,15 @@
class slock
{
protected:
- pthread_mutex_t* _m;
+ const smutex& _sm;
public:
- inline slock(pthread_mutex_t* m) : _m(m)
+ inline slock(const smutex& sm) : _sm(sm)
{
- PTHREAD_CHK(::pthread_mutex_lock(_m), "pthread_mutex_lock",
"slock", "slock");
+ PTHREAD_CHK(::pthread_mutex_lock(_sm.get()),
"::pthread_mutex_lock", "slock", "slock");
}
inline ~slock()
{
- PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock",
"slock", "~slock");
+ PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()),
"::pthread_mutex_unlock", "slock", "~slock");
}
};
@@ -65,21 +62,19 @@
class stlock
{
protected:
- pthread_mutex_t* _m;
+ const smutex& _sm;
bool _locked;
public:
- inline stlock(pthread_mutex_t* m) : _m(m), _locked(false)
+ inline stlock(const smutex& sm) : _sm(sm), _locked(false)
{
- int ret = ::pthread_mutex_trylock(_m);
+ int ret = ::pthread_mutex_trylock(_sm.get());
_locked = (ret == 0); // check if lock obtained
- if (!_locked && ret != EBUSY) PTHREAD_CHK(ret,
"pthread_mutex_trylock", "stlock",
- "stlock");
+ if (!_locked && ret != EBUSY) PTHREAD_CHK(ret,
"::pthread_mutex_trylock", "stlock", "stlock");
}
inline ~stlock()
{
if (_locked)
- PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock",
"stlock",
- "~stlock");
+ PTHREAD_CHK(::pthread_mutex_unlock(_sm.get()),
"::pthread_mutex_unlock", "stlock", "~stlock");
}
inline bool locked() const { return _locked; }
};
Added: store/trunk/cpp/lib/jrnl/smutex.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/smutex.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/smutex.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -0,0 +1,33 @@
+/**
+ * \file smutex.cpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * File containing code for class mrg::journal::smutex (scoped mutex). See
+ * comments in file smutex.hpp for details.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
+
+#include "jrnl/smutex.hpp"
Added: store/trunk/cpp/lib/jrnl/smutex.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/smutex.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/smutex.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -0,0 +1,64 @@
+/**
+ * \file smutex.hpp
+ *
+ * Qpid asynchronous store plugin library
+ *
+ * Messaging journal scoped mutex class mrg::journal::smutex.
+ *
+ * \author Kim van der Riet
+ *
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ * This file is part of the Qpid async store library msgstore.so.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ * USA
+ *
+ * The GNU Lesser General Public License is available in the file COPYING.
+ */
+
+
+#ifndef mrg_journal_smutex_hpp
+#define mrg_journal_smutex_hpp
+
+#include "jrnl/jexception.hpp"
+#include <pthread.h>
+
+namespace mrg
+{
+namespace journal
+{
+
+ // Ultra-simple scoped mutex class that allows a posix mutex to be initialized and
destroyed with error checks
+ class smutex
+ {
+ protected:
+ mutable pthread_mutex_t _m;
+ public:
+ inline smutex()
+ {
+ PTHREAD_CHK(::pthread_mutex_init(&_m, 0),
"::pthread_mutex_init", "smutex", "smutex");
+ }
+ inline virtual ~smutex()
+ {
+ PTHREAD_CHK(::pthread_mutex_destroy(&_m),
"::pthread_mutex_destroy", "smutex", "~smutex");
+ }
+ inline pthread_mutex_t* get() const { return &_m; }
+ };
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_smutex_hpp
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -56,14 +56,9 @@
txn_map::txn_map():
_map(),
_pfid_txn_cnt()
-{
- pthread_mutex_init(&_mutex, 0);
-}
+{}
-txn_map::~txn_map()
-{
- pthread_mutex_destroy(&_mutex);
-}
+txn_map::~txn_map() {}
void
txn_map::set_num_jfiles(const u_int16_t num_jfiles)
@@ -75,7 +70,7 @@
txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
{
bool ok = true;
- slock s(&_mutex);
+ slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
@@ -94,7 +89,7 @@
const txn_data_list
txn_map::get_tdata_list(const std::string& xid)
{
- slock s(&_mutex);
+ slock s(_mutex);
return get_tdata_list_nolock(xid);
}
@@ -114,7 +109,7 @@
const txn_data_list
txn_map::get_remove_tdata_list(const std::string& xid)
{
- slock s(&_mutex);
+ slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
@@ -132,7 +127,7 @@
bool
txn_map::in_map(const std::string& xid)
{
- slock s(&_mutex);
+ slock s(_mutex);
xmap_itr itr= _map.find(xid);
if (itr == _map.end()) // not found in map
return false;
@@ -142,7 +137,7 @@
u_int32_t
txn_map::get_rid_count(const std::string& xid)
{
- slock s(&_mutex);
+ slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
@@ -156,7 +151,7 @@
u_int32_t
txn_map::cnt(const bool enq_flag)
{
- slock s(&_mutex);
+ slock s(_mutex);
u_int32_t c = 0;
for (xmap_itr i = _map.begin(); i != _map.end(); i++)
{
@@ -172,7 +167,7 @@
u_int32_t
txn_map::cnt(const std::string& xid, const bool enq_flag)
{
- slock s(&_mutex);
+ slock s(_mutex);
u_int32_t c = 0;
xmap_itr i = _map.find(xid);
if (i == _map.end()) // not found in map
@@ -188,7 +183,7 @@
bool
txn_map::is_txn_synced(const std::string& xid)
{
- slock s(&_mutex);
+ slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
@@ -214,7 +209,7 @@
bool ok = true;
bool found = false;
{
- slock s(&_mutex);
+ slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
ok = false;
@@ -245,7 +240,7 @@
{
bool found = false;
{
- slock s(&_mutex);
+ slock s(_mutex);
txn_data_list tdl = get_tdata_list_nolock(xid);
tdl_itr itr = tdl.begin();
while (itr != tdl.end() && !found)
@@ -268,7 +263,7 @@
{
bool found = false;
{
- slock s(&_mutex);
+ slock s(_mutex);
for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
{
txn_data_list list = i->second;
@@ -289,7 +284,7 @@
{
xv.clear();
{
- slock s(&_mutex);
+ slock s(_mutex);
for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
xv.push_back(itr->first);
}
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -42,6 +42,7 @@
}
#include "jrnl/arr_cnt.hpp"
+#include "jrnl/smutex.hpp"
#include <map>
#include <pthread.h>
#include <string>
@@ -116,7 +117,7 @@
typedef xmap::iterator xmap_itr;
xmap _map;
- pthread_mutex_t _mutex;
+ smutex _mutex;
arr_cnt _pfid_txn_cnt;
public:
Modified: store/trunk/cpp/tests/jrnl/jtt/data_src.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/data_src.cpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/data_src.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -36,7 +36,7 @@
char data_src::_xid_src[data_src::max_xsize];
bool data_src::_initialized = data_src::__init();
u_int64_t data_src::_xid_cnt = 0ULL;
-data_src::mutex data_src::_m;
+mrg::journal::smutex data_src::_sm;
data_src::data_src()
{}
Modified: store/trunk/cpp/tests/jrnl/jtt/data_src.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/data_src.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/data_src.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -26,6 +26,7 @@
#include <cstddef>
#include "jrnl/slock.hpp"
+#include "jrnl/smutex.hpp"
#include <pthread.h>
#include <string>
#include <sys/types.h>
@@ -48,16 +49,7 @@
static char _xid_src[];
static u_int64_t _xid_cnt;
static bool _initialized;
- class mutex
- {
- private:
- pthread_mutex_t _mutex;
- public:
- inline mutex() { pthread_mutex_init(&_mutex, 0); }
- inline ~mutex() { pthread_mutex_destroy(&_mutex); }
- inline pthread_mutex_t* m() { return &_mutex; }
- };
- static mutex _m;
+ static mrg::journal::smutex _sm;
public:
static const char* get_data(const std::size_t offs);
@@ -65,7 +57,7 @@
private:
data_src();
- static u_int64_t get_xid_cnt() { mrg::journal::slock s(_m.m()); return
_xid_cnt++; }
+ static u_int64_t get_xid_cnt() { mrg::journal::slock s(_sm); return _xid_cnt++;
}
static const char* get_xid_content(const std::size_t offs);
static bool __init();
};
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -55,12 +55,7 @@
_deq_list_cv(_deq_list_mutex),
_tcp(),
_tcrp()
-{
- pthread_mutex_init(&_rd_aio_mutex, 0);
- pthread_mutex_init(&_wr_full_mutex, 0);
- pthread_mutex_init(&_rd_list_mutex, 0);
- pthread_mutex_init(&_deq_list_mutex, 0);
-}
+{}
jrnl_instance::jrnl_instance(const jrnl_init_params::shared_ptr& p):
mrg::journal::jcntl(p->jid(), p->jdir(), p->base_filename()),
@@ -76,20 +71,9 @@
_deq_list_cv(_deq_list_mutex),
_tcp(),
_tcrp()
-{
- pthread_mutex_init(&_rd_aio_mutex, 0);
- pthread_mutex_init(&_wr_full_mutex, 0);
- pthread_mutex_init(&_rd_list_mutex, 0);
- pthread_mutex_init(&_deq_list_mutex, 0);
-}
+{}
-jrnl_instance::~jrnl_instance()
-{
- pthread_mutex_destroy(&_rd_aio_mutex);
- pthread_mutex_destroy(&_wr_full_mutex);
- pthread_mutex_destroy(&_rd_list_mutex);
- pthread_mutex_destroy(&_deq_list_mutex);
-}
+jrnl_instance::~jrnl_instance() {}
void
@@ -209,7 +193,7 @@
}
else if (get_wr_events() == 0)
{
- mrg::journal::slock sl(&_wr_full_mutex);
+ mrg::journal::slock sl(_wr_full_mutex);
_wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
}
break;
@@ -239,7 +223,7 @@
{
journal::data_tok* dtokp = 0;
{
- mrg::journal::slock sl(&_rd_list_mutex);
+ mrg::journal::slock sl(_rd_list_mutex);
if (_dtok_rd_list.empty())
_rd_list_cv.wait();
if (!_dtok_rd_list.empty())
@@ -273,7 +257,7 @@
{
case mrg::journal::RHM_IORES_SUCCESS:
{
- mrg::journal::slock sl(&_deq_list_mutex);
+ mrg::journal::slock sl(_deq_list_mutex);
_dtok_deq_list.push_back(dtokp);
_deq_list_cv.broadcast();
}
@@ -291,7 +275,7 @@
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
if (get_rd_events() == 0)
{
- mrg::journal::slock sl(&_rd_aio_mutex);
+ mrg::journal::slock sl(_rd_aio_mutex);
_rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); //
MAX_RD_WAIT in ms
}
break;
@@ -301,7 +285,7 @@
oss << "\" returned " <<
mrg::journal::iores_str(res) << ".";
_tcrp->add_exception(oss.str());
{
- mrg::journal::slock sl(&_deq_list_mutex);
+ mrg::journal::slock sl(_deq_list_mutex);
_deq_list_cv.broadcast(); // wake up deq thread
}
}
@@ -326,7 +310,7 @@
{
journal::data_tok* dtokp = 0;
{
- mrg::journal::slock sl(&_deq_list_mutex);
+ mrg::journal::slock sl(_deq_list_mutex);
if (_dtok_deq_list.empty())
_deq_list_cv.wait();
if (!_dtok_deq_list.empty())
@@ -426,20 +410,20 @@
{
if (_args_ptr->read_mode.val() == read_arg::NONE)
{
- mrg::journal::slock sl(&_deq_list_mutex);
+ mrg::journal::slock sl(_deq_list_mutex);
_dtok_deq_list.push_back(dtokp);
_deq_list_cv.broadcast();
}
else
{
- mrg::journal::slock sl(&_rd_list_mutex);
+ mrg::journal::slock sl(_rd_list_mutex);
_dtok_rd_list.push_back(dtokp);
_rd_list_cv.broadcast();
}
}
else // DEQ
{
- mrg::journal::slock sl(&_wr_full_mutex);
+ mrg::journal::slock sl(_wr_full_mutex);
_wr_full_cv.broadcast();
}
}
@@ -449,7 +433,7 @@
void
jrnl_instance::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
{
- mrg::journal::slock sl(&_rd_aio_mutex);
+ mrg::journal::slock sl(_rd_aio_mutex);
_rd_aio_cv.broadcast();
}
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2010-05-10 17:48:27 UTC (rev 3954)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2010-05-10 18:11:41 UTC (rev 3955)
@@ -33,6 +33,7 @@
#include "jrnl/data_tok.hpp"
#include "jrnl/jcntl.hpp"
#include "jrnl/slock.hpp"
+#include "jrnl/smutex.hpp"
#include <list>
#include <vector>
@@ -54,14 +55,14 @@
std::vector<dtok_ptr> _dtok_master_txn_list;
std::list<journal::data_tok*> _dtok_rd_list;
std::list<journal::data_tok*> _dtok_deq_list;
- pthread_mutex_t _rd_aio_mutex; ///< Mutex for read aio wait conditions
- mrg::journal::cvar _rd_aio_cv; ///< Condition var for read aio wait
conditions
- pthread_mutex_t _wr_full_mutex; ///< Mutex for write full conditions
- mrg::journal::cvar _wr_full_cv; ///< Condition var for write full
conditions
- pthread_mutex_t _rd_list_mutex; ///< Mutex for _dtok_rd_list
- mrg::journal::cvar _rd_list_cv; ///< Condition var for _dtok_rd_list
- pthread_mutex_t _deq_list_mutex; ///< Mutex for _dtok_deq_list
- mrg::journal::cvar _deq_list_cv; ///< Condition var for _dtok_deq_list
+ mrg::journal::smutex _rd_aio_mutex; ///< Mutex for read aio wait
conditions
+ mrg::journal::cvar _rd_aio_cv; ///< Condition var for read aio wait
conditions
+ mrg::journal::smutex _wr_full_mutex; ///< Mutex for write full conditions
+ mrg::journal::cvar _wr_full_cv; ///< Condition var for write full
conditions
+ mrg::journal::smutex _rd_list_mutex; ///< Mutex for _dtok_rd_list
+ mrg::journal::cvar _rd_list_cv; ///< Condition var for _dtok_rd_list
+ mrg::journal::smutex _deq_list_mutex; ///< Mutex for _dtok_deq_list
+ mrg::journal::cvar _deq_list_cv; ///< Condition var for _dtok_deq_list
pthread_t _enq_thread;
pthread_t _deq_thread;
pthread_t _read_thread;