[rhmessaging-commits] rhmessaging commits: r2801 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Nov 14 11:23:43 EST 2008


Author: kpvdr
Date: 2008-11-14 11:23:43 -0500 (Fri, 14 Nov 2008)
New Revision: 2801

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Fix for BZ471601 "Read tests may fail with RHM_IORES_EMPTY" and BZ471606 "TwoPhaseCommitTest fails intermittently with valgrind illegal read/write error"

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-11-14 16:23:43 UTC (rev 2801)
@@ -25,7 +25,6 @@
 
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
-#include "jrnl/slock.hpp"
 #include "qpid/log/Statement.h"
 #include "qpid/agent/ManagementAgent.h"
 #include "qmf/com/redhat/rhm/store/ArgsJournalExpand.h"
@@ -40,12 +39,10 @@
 qpid::broker::Timer* JournalImpl::journalTimerPtr = 0;
 u_int32_t JournalImpl::cnt = 0;
 
-void InactivityFireEvent::fire() { if (parent) parent->flushFire(); }
-void GetEventsFireEvent::fire() {
-    if (parent) parent->getEventsFire();
-    release();
-}
+void InactivityFireEvent::fire() { slock s(&_ife_mutex); if (parent) parent->flushFire(); }
 
+void GetEventsFireEvent::fire() { slock s(&_gefe_mutex); if (parent) parent->getEventsFire(); }
+
 JournalImpl::JournalImpl(const std::string& journalId,
                          const std::string& journalDirectory,
                          const std::string& journalBaseFilename,
@@ -453,9 +450,7 @@
 {
     slock s(&_getf_mutex);
     getEventsTimerSetFlag = false;
-    if (_wmgr.get_aio_evt_rem()) {
-        jcntl::get_wr_events();
-    }
+    if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
     if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
 }
 

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-11-14 16:23:43 UTC (rev 2801)
@@ -26,6 +26,7 @@
 
 #include <set>
 #include "jrnl/jcntl.hpp"
+#include "jrnl/slock.hpp"
 #include "DataTokenImpl.h"
 #include "PreparedTransaction.h"
 #include <qpid/broker/Timer.h>
@@ -44,25 +45,27 @@
         class InactivityFireEvent : public virtual qpid::broker::TimerTask
         {
             JournalImpl*    parent;
+            pthread_mutex_t _ife_mutex;
 
         public:
 	        InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::broker::TimerTask(timeout), parent(p) {}
-            virtual ~InactivityFireEvent() {}
+                qpid::broker::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_ife_mutex, 0); }
+            virtual ~InactivityFireEvent() { ::pthread_mutex_destroy(&_ife_mutex); }
             void fire();
-		    inline void cancel() { parent=0; }
+            inline void cancel() { mrg::journal::slock s(&_ife_mutex); parent = 0; }
         };
 
         class GetEventsFireEvent : public virtual qpid::broker::TimerTask
         {
             JournalImpl*    parent;
+            pthread_mutex_t _gefe_mutex;
 
         public:
 	        GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::broker::TimerTask(timeout), parent(p) {}
-            virtual ~GetEventsFireEvent() {}
+                qpid::broker::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_gefe_mutex, 0); }
+            virtual ~GetEventsFireEvent() { ::pthread_mutex_destroy(&_gefe_mutex); }
             void fire();
-		    inline void cancel() { parent=0; }
+            inline void cancel() { mrg::journal::slock s(&_gefe_mutex); parent = 0; }
         };
 
         class JournalImpl : public qpid::broker::ExternalQueueStore, public journal::jcntl
@@ -73,7 +76,7 @@
 
             bool getEventsTimerSetFlag;
             boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
-            pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
+            pthread_mutex_t _getf_mutex;
             
             u_int64_t lastReadRid; // rid of last read msg for loadMsgContent()
 
@@ -197,7 +200,6 @@
 
             inline void setGetEventTimer()
             {
-                getEventsFireEventsPtr->addRef();
                 assert(journalTimerPtr != 0);
                 journalTimerPtr->add(getEventsFireEventsPtr);
                 getEventsTimerSetFlag = true;
@@ -236,7 +238,7 @@
                 return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
             }
             inline void read_reset() { _rmgr.invalidate(); }
-        };
+        }; // class TplJournalImpl
 
     } // namespace msgstore
 } // namespace mrg

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-11-14 16:23:43 UTC (rev 2801)
@@ -110,6 +110,9 @@
         * journal will no longer accept messages until either initialize() or recover() is called.
         * There is no way other than through initialization to reset this flag.
         */
+        // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called,
+        // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During
+        // this period, however, no new enqueue/dequeue/read requests may be accepted.
         bool _stop_flag;
 
         /**

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-11-14 16:23:43 UTC (rev 2801)
@@ -132,10 +132,13 @@
         if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
         {
             aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
-            if (_jc->unflushed_dblks() > 0)
-                _jc->flush();
-            else if (!_aio_evt_rem)
-                return RHM_IORES_EMPTY;
+            if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+            {
+                if (_jc->unflushed_dblks() > 0)
+                    _jc->flush();
+                else if (!_aio_evt_rem)
+                    return RHM_IORES_EMPTY;
+            }
         }
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
         {
@@ -359,10 +362,13 @@
     if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
     {
         aio_cycle();   // check if any AIOs have returned
-        if (_jc->unflushed_dblks() > 0)
-            _jc->flush();
-        else if (!_aio_evt_rem)
-            return RHM_IORES_EMPTY;
+        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+        {
+            if (_jc->unflushed_dblks() > 0)
+                _jc->flush();
+            else if (!_aio_evt_rem)
+                return RHM_IORES_EMPTY;
+        }
     }
 
     // Check write state of this token is ENQ - required for read

Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp	2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp	2008-11-14 16:23:43 UTC (rev 2801)
@@ -107,21 +107,24 @@
 
         test_jrnl jc(test_name, test_dir, test_name);
         jc.initialize(2*NUM_TEST_JFILES, false, 0, 10*TEST_JFSIZE_SBLKS);
-        for (int m=0; m<NUM_MSGS*125; m++)
-            enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
-        jc.flush();
-        for (int m=0; m<NUM_MSGS*125; m++)
+        for (int i=0; i<10; i++)
         {
-            read_msg(jc, rmsg, xid, transientFlag, externalFlag);
-            BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
-            BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
-            BOOST_CHECK_EQUAL(transientFlag, false);
-            BOOST_CHECK_EQUAL(externalFlag, false);
+            for (int m=0; m<NUM_MSGS*125; m++)
+                enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
+            jc.flush();
+            for (int m=0; m<NUM_MSGS*125; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+            for (int m=0; m<NUM_MSGS*125; m++)
+                deq_msg(jc, m, m+NUM_MSGS*125);
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
         }
-        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
-        for (int m=0; m<NUM_MSGS*125; m++)
-            deq_msg(jc, m, m+NUM_MSGS*125);
-        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }
     cout << "ok" << endl;
@@ -140,7 +143,7 @@
 
         test_jrnl jc(test_name, test_dir, test_name);
         jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
-        for (int m=0; m<2*NUM_MSGS; m+=2)
+        for (int m=0; m<500*NUM_MSGS; m+=2)
         {
             enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
             jc.flush();




More information about the rhmessaging-commits mailing list