Author: kpvdr
Date: 2010-05-21 08:49:22 -0400 (Fri, 21 May 2010)
New Revision: 3980
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
Log:
Added a lock to protect MessageList in MessageStoreImpl and the static variables in
JournalImpl; Switched all locks at this level to qpid::sys::Mutex and
qpid::sys::ScopedLock for consistency.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-05-21 12:49:22 UTC (rev 3980)
@@ -40,12 +40,13 @@
using qpid::management::ManagementAgent;
namespace _qmf = qmf::com::redhat::rhm::store;
+qpid::sys::Mutex JournalImpl::_static_lock;
qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
u_int32_t JournalImpl::cnt = 0;
-void InactivityFireEvent::fire() { slock s(_ife_mutex); if (_parent)
_parent->flushFire(); }
+void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if
(_parent) _parent->flushFire(); }
-void GetEventsFireEvent::fire() { slock s(_gefe_mutex); if (_parent)
_parent->getEventsFire(); }
+void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if
(_parent) _parent->getEventsFire(); }
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
@@ -68,12 +69,15 @@
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
- if (journalTimerPtr == 0)
- journalTimerPtr = new qpid::sys::Timer;
- assert (journalTimerPtr != 0);
- cnt++;
- journalTimerPtr->start();
- journalTimerPtr->add(inactivityFireEventPtr);
+ {
+ qpid::sys::Mutex::ScopedLock sl(_static_lock);
+ if (journalTimerPtr == 0)
+ journalTimerPtr = new qpid::sys::Timer;
+ assert (journalTimerPtr != 0);
+ cnt++;
+ journalTimerPtr->start();
+ journalTimerPtr->add(inactivityFireEventPtr);
+ }
if (_agent != 0)
{
@@ -112,11 +116,13 @@
inactivityFireEventPtr->cancel();
free_read_buffers();
- // TODO: Make this if() thread-safe
- if (journalTimerPtr && --cnt == 0)
{
- delete journalTimerPtr;
- journalTimerPtr = 0;
+ qpid::sys::Mutex::ScopedLock sl(_static_lock);
+ if (journalTimerPtr && --cnt == 0)
+ {
+ delete journalTimerPtr;
+ journalTimerPtr = 0;
+ }
}
if (_mgmtObject != 0) {
@@ -503,7 +509,7 @@
{
const iores res = jcntl::flush(block_till_aio_cmpl);
{
- slock s(_getf_mutex);
+ qpid::sys::Mutex::ScopedLock sl(_getf_lock);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
setGetEventTimer(); }
}
return res;
@@ -533,7 +539,7 @@
void
JournalImpl::getEventsFire()
{
- slock s(_getf_mutex);
+ qpid::sys::Mutex::ScopedLock sl(_getf_lock);
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
@@ -552,8 +558,11 @@
}
}
inactivityFireEventPtr->setupNextFire();
- assert(journalTimerPtr != 0);
- journalTimerPtr->add(inactivityFireEventPtr);
+ {
+ qpid::sys::Mutex::ScopedLock sl(_static_lock);
+ assert(journalTimerPtr != 0);
+ journalTimerPtr->add(inactivityFireEventPtr);
+ }
}
void
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/JournalImpl.h 2010-05-21 12:49:22 UTC (rev 3980)
@@ -27,8 +27,6 @@
#include <set>
#include "jrnl/enums.hpp"
#include "jrnl/jcntl.hpp"
-#include "jrnl/slock.hpp"
-#include "jrnl/smutex.hpp"
#include "DataTokenImpl.h"
#include "PreparedTransaction.h"
#include <qpid/broker/PersistableQueue.h>
@@ -47,38 +45,39 @@
class InactivityFireEvent : public qpid::sys::TimerTask
{
JournalImpl* _parent;
- mrg::journal::smutex _ife_mutex;
+ qpid::sys::Mutex _ife_lock;
public:
InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout), _parent(p) {}
virtual ~InactivityFireEvent() {}
void fire();
- inline void cancel() { mrg::journal::slock s(_ife_mutex); _parent = 0; }
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent =
0; }
};
class GetEventsFireEvent : public qpid::sys::TimerTask
{
JournalImpl* _parent;
- mrg::journal::smutex _gefe_mutex;
+ qpid::sys::Mutex _gefe_lock;
public:
GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout), _parent(p) {}
virtual ~GetEventsFireEvent() {}
void fire();
- inline void cancel() { mrg::journal::slock s(_gefe_mutex); _parent = 0; }
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent =
0; }
};
class JournalImpl : public qpid::broker::ExternalQueueStore, public
mrg::journal::jcntl, public mrg::journal::aio_callback
{
private:
+ static qpid::sys::Mutex _static_lock;
static qpid::sys::Timer* journalTimerPtr;
static u_int32_t cnt;
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
- mrg::journal::smutex _getf_mutex;
+ qpid::sys::Mutex _getf_lock;
u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects
out-of-order read requests
std::vector<u_int64_t> oooRidList; // list of out-of-order rids
(greater than current rid) encountered during read sequence
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-05-21 12:49:22 UTC (rev 3980)
@@ -362,10 +362,13 @@
void MessageStoreImpl::finalize()
{
if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
- for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
{
- JournalImpl* jQueue = i->second;
- if (jQueue->is_ready()) jQueue->stop(true);
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
+ {
+ JournalImpl* jQueue = i->second;
+ if (jQueue->is_ready()) jQueue->stop(true);
+ }
}
if (mgmtObject != 0) {
@@ -377,10 +380,13 @@
void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
{
if (isInit) {
- if (journalList.size()) { // check no queues exist
- std::ostringstream oss;
- oss << "truncateInit() called with " <<
journalList.size() << " queues still in existence";
- THROW_STORE_EXCEPTION(oss.str());
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ if (journalList.size()) { // check no queues exist
+ std::ostringstream oss;
+ oss << "truncateInit() called with " <<
journalList.size() << " queues still in existence";
+ THROW_STORE_EXCEPTION(oss.str());
+ }
}
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
@@ -402,6 +408,7 @@
void MessageStoreImpl::chkTplStoreInit()
{
+ // Don't take lock unless necessary
if (!tplStorePtr->is_ready()) {
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
if (!tplStorePtr->is_ready()) {
@@ -480,6 +487,9 @@
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
std::string("JournalData"),
defJournalGetEventsTimeout,
defJournalFlushTimeout, agent);
+ }
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queue.getName()]=jQueue;
}
@@ -517,7 +527,10 @@
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
jQueue->delete_jrnl_files();
queue.setExternalQueueStore(0); // will delete the journal if exists
- journalList.erase(journalList.find(queue.getName()));
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ journalList.erase(journalList.find(queue.getName()));
+ }
}
}
@@ -759,6 +772,9 @@
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName),
std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout,
agent);
+ }
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queueName] = jQueue;
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2010-05-21 12:49:22 UTC (rev 3980)
@@ -126,6 +126,7 @@
boost::shared_ptr<TplJournalImpl> tplStorePtr;
TplRecoverMap tplRecoverMap;
JournalListMap journalList;
+ qpid::sys::Mutex journalListLock;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;