[rhmessaging-commits] rhmessaging commits: r4489 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Jan 4 14:28:33 EST 2012


Author: kpvdr
Date: 2012-01-04 14:28:32 -0500 (Wed, 04 Jan 2012)
New Revision: 4489

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
Log:
BZ 769828: Added a high-level lock on the JournalImpl::loadMessage() method, also reworked the logic of the read invalidation. Tests show that this works correctly under multi-threaded conditions, but is VERY SLOW. This was not designed with multiple read contexts in mind, so there is a lot of thrashing and re-reading, but this should be thread-safe now. Ultimately the flow-to-disk facility which uses this "feature" needs a re-design.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2012-01-04 17:00:47 UTC (rev 4488)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2012-01-04 19:28:32 UTC (rev 4489)
@@ -257,6 +257,7 @@
 bool
 JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
 {
+    qpid::sys::Mutex::ScopedLock sl(_read_lock);
     if (_dtok.rid() != rid)
     {
         // Free any previous msg
@@ -274,8 +275,11 @@
         // jumpover points and allow the read to jump back to the first known jumpover point - but this needs
         // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
         // combination of lid/offset.
-        if (oooFlag || rid < lastReadRid)
+        // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing.
+        if (oooFlag || rid < lastReadRid) {
             _rmgr.invalidate();
+            oooRidList.clear();
+        }
         _dlen = 0;
         _dtok.reset();
         _dtok.set_wstate(DataTokenImpl::ENQ);
@@ -285,7 +289,6 @@
         bool transient = false;
         bool done = false;
         bool rid_found = false;
-        oooRidList.clear();
         while (!done) {
             iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
             switch (res) {

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2012-01-04 17:00:47 UTC (rev 4488)
+++ store/trunk/cpp/lib/JournalImpl.h	2012-01-04 19:28:32 UTC (rev 4489)
@@ -83,6 +83,7 @@
     bool getEventsTimerSetFlag;
     boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
     qpid::sys::Mutex _getf_lock;
+    qpid::sys::Mutex _read_lock;
 
     u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
     std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence



More information about the rhmessaging-commits mailing list