[rhmessaging-commits] rhmessaging commits: r3980 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri May 21 08:49:22 EDT 2010


Author: kpvdr
Date: 2010-05-21 08:49:22 -0400 (Fri, 21 May 2010)
New Revision: 3980

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
Log:
Added a lock to protect MessageList in MessageStoreImpl and the static variables in JournalImpl; Switched all locks at this level to qpid::sys::Mutex and qpid::sys::ScopedLock for consistency.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2010-05-21 12:49:22 UTC (rev 3980)
@@ -40,12 +40,13 @@
 using qpid::management::ManagementAgent;
 namespace _qmf = qmf::com::redhat::rhm::store;
 
+qpid::sys::Mutex JournalImpl::_static_lock;
 qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
 u_int32_t JournalImpl::cnt = 0;
 
-void InactivityFireEvent::fire() { slock s(_ife_mutex); if (_parent) _parent->flushFire(); }
+void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
 
-void GetEventsFireEvent::fire() { slock s(_gefe_mutex); if (_parent) _parent->getEventsFire(); }
+void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
 
 JournalImpl::JournalImpl(const std::string& journalId,
                          const std::string& journalDirectory,
@@ -68,12 +69,15 @@
 {
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
-    if (journalTimerPtr == 0)
-        journalTimerPtr = new qpid::sys::Timer;
-    assert (journalTimerPtr != 0);
-    cnt++;
-    journalTimerPtr->start();
-    journalTimerPtr->add(inactivityFireEventPtr);
+    {
+        qpid::sys::Mutex::ScopedLock sl(_static_lock);
+        if (journalTimerPtr == 0)
+            journalTimerPtr = new qpid::sys::Timer;
+        assert (journalTimerPtr != 0);
+        cnt++;
+        journalTimerPtr->start();
+        journalTimerPtr->add(inactivityFireEventPtr);
+    }
 
     if (_agent != 0)
     {
@@ -112,11 +116,13 @@
     inactivityFireEventPtr->cancel();
     free_read_buffers();
 
-    // TODO: Make this if() thread-safe
-    if (journalTimerPtr && --cnt == 0)
     {
-        delete journalTimerPtr;
-        journalTimerPtr = 0;
+        qpid::sys::Mutex::ScopedLock sl(_static_lock);
+        if (journalTimerPtr && --cnt == 0)
+        {
+            delete journalTimerPtr;
+            journalTimerPtr = 0;
+        }
     }
 
     if (_mgmtObject != 0) {
@@ -503,7 +509,7 @@
 {
     const iores res = jcntl::flush(block_till_aio_cmpl);
     {
-        slock s(_getf_mutex);
+        qpid::sys::Mutex::ScopedLock sl(_getf_lock);
         if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
     }
     return res;
@@ -533,7 +539,7 @@
 void
 JournalImpl::getEventsFire()
 {
-    slock s(_getf_mutex);
+    qpid::sys::Mutex::ScopedLock sl(_getf_lock);
     getEventsTimerSetFlag = false;
     if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
     if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
@@ -552,8 +558,11 @@
         }
     }
     inactivityFireEventPtr->setupNextFire();
-    assert(journalTimerPtr != 0);
-    journalTimerPtr->add(inactivityFireEventPtr);
+    {
+        qpid::sys::Mutex::ScopedLock sl(_static_lock);
+        assert(journalTimerPtr != 0);
+        journalTimerPtr->add(inactivityFireEventPtr);
+    }
 }
 
 void

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/JournalImpl.h	2010-05-21 12:49:22 UTC (rev 3980)
@@ -27,8 +27,6 @@
 #include <set>
 #include "jrnl/enums.hpp"
 #include "jrnl/jcntl.hpp"
-#include "jrnl/slock.hpp"
-#include "jrnl/smutex.hpp"
 #include "DataTokenImpl.h"
 #include "PreparedTransaction.h"
 #include <qpid/broker/PersistableQueue.h>
@@ -47,38 +45,39 @@
         class InactivityFireEvent : public qpid::sys::TimerTask
         {
             JournalImpl* _parent;
-            mrg::journal::smutex _ife_mutex;
+            qpid::sys::Mutex _ife_lock;
 
         public:
 	        InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
                 qpid::sys::TimerTask(timeout), _parent(p) {}
             virtual ~InactivityFireEvent() {}
             void fire();
-            inline void cancel() { mrg::journal::slock s(_ife_mutex); _parent = 0; }
+            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
         };
 
         class GetEventsFireEvent : public qpid::sys::TimerTask
         {
             JournalImpl* _parent;
-            mrg::journal::smutex _gefe_mutex;
+            qpid::sys::Mutex _gefe_lock;
 
         public:
 	        GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
                 qpid::sys::TimerTask(timeout), _parent(p) {}
             virtual ~GetEventsFireEvent() {}
             void fire();
-            inline void cancel() { mrg::journal::slock s(_gefe_mutex); _parent = 0; }
+            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
         };
 
         class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
         {
         private:
+            static qpid::sys::Mutex _static_lock;
             static qpid::sys::Timer* journalTimerPtr;
             static u_int32_t cnt;
 
             bool getEventsTimerSetFlag;
             boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
-            mrg::journal::smutex _getf_mutex;
+            qpid::sys::Mutex _getf_lock;
 
             u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
             std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-05-21 12:49:22 UTC (rev 3980)
@@ -362,10 +362,13 @@
 void MessageStoreImpl::finalize()
 {
     if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
-    for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
     {
-        JournalImpl* jQueue = i->second;
-        if (jQueue->is_ready()) jQueue->stop(true);
+        qpid::sys::Mutex::ScopedLock sl(journalListLock);
+        for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
+        {
+            JournalImpl* jQueue = i->second;
+            if (jQueue->is_ready()) jQueue->stop(true);
+        }
     }
 
     if (mgmtObject != 0) {
@@ -377,10 +380,13 @@
 void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
 {
     if (isInit) {
-        if (journalList.size()) { // check no queues exist
-            std::ostringstream oss;
-            oss << "truncateInit() called with " << journalList.size() << " queues still in existence";
-            THROW_STORE_EXCEPTION(oss.str());
+        {
+            qpid::sys::Mutex::ScopedLock sl(journalListLock);
+            if (journalList.size()) { // check no queues exist
+                std::ostringstream oss;
+                oss << "truncateInit() called with " << journalList.size() << " queues still in existence";
+                THROW_STORE_EXCEPTION(oss.str());
+            }
         }
         for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
@@ -402,6 +408,7 @@
 
 void MessageStoreImpl::chkTplStoreInit()
 {
+    // Don't take lock unless necessary
     if (!tplStorePtr->is_ready()) {
         qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
         if (!tplStorePtr->is_ready()) {
@@ -480,6 +487,9 @@
         jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
                                  std::string("JournalData"), defJournalGetEventsTimeout,
                                  defJournalFlushTimeout, agent);
+    }
+    {
+        qpid::sys::Mutex::ScopedLock sl(journalListLock);
         journalList[queue.getName()]=jQueue;
     }
 
@@ -517,7 +527,10 @@
         JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
         jQueue->delete_jrnl_files();
         queue.setExternalQueueStore(0); // will delete the journal if exists
-        journalList.erase(journalList.find(queue.getName()));
+        {
+            qpid::sys::Mutex::ScopedLock sl(journalListLock);
+            journalList.erase(journalList.find(queue.getName()));
+        }
     }
 }
 
@@ -759,6 +772,9 @@
         {
             qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
             jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+        }
+        {
+            qpid::sys::Mutex::ScopedLock sl(journalListLock);
             journalList[queueName] = jQueue;
         }
         queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2010-05-20 15:22:29 UTC (rev 3979)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2010-05-21 12:49:22 UTC (rev 3980)
@@ -126,6 +126,7 @@
     boost::shared_ptr<TplJournalImpl> tplStorePtr;
     TplRecoverMap tplRecoverMap;
     JournalListMap journalList;
+    qpid::sys::Mutex journalListLock;
 
     IdSequence queueIdSequence;
     IdSequence exchangeIdSequence;



More information about the rhmessaging-commits mailing list