[rhmessaging-commits] rhmessaging commits: r1501 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Dec 17 11:21:15 EST 2007


Author: kpvdr
Date: 2007-12-17 11:21:15 -0500 (Mon, 17 Dec 2007)
New Revision: 1501

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/slock.hpp
Log:
Removed locks and monitors from class JournalImpl, returned to simpler approach of waiting for AIO to return by sleeping and periodically cheching for aio returns on the thread that had its write interrupted, while all other write threads remain locked by jcntl's write mutex.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-12-17 16:21:15 UTC (rev 1501)
@@ -50,8 +50,6 @@
                          getEventsTimerSetFlag(false),
                          writeActivityFlag(false),
                          flushTriggeredFlag(true),
-                         aioWait(false),
-						 aioWaitLock(false),
                          _xidp(0),
                          _datap(0),
                          _dlen(0),
@@ -179,63 +177,52 @@
 JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
         const size_t this_data_len, data_tok* dtokp, const bool transient)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp,
-            transient)));
+    handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
 }
 
 void
 JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
         const bool transient)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)));
+   handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
 }
 
 void
 JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
         const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len,
-            dtokp, xid, transient)));
+    handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
 }
 
 void
 JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
         const std::string& xid, const bool transient)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
-            transient)));
+    handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
 }
 
 void
 JournalImpl::dequeue_data_record(data_tok* const dtokp)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::dequeue_data_record(dtokp)));
+    handleIoResult(jcntl::dequeue_data_record(dtokp));
 }
 
 void
 JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid)));
+    handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid));
 }
 
 void
 JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::txn_abort(dtokp, xid)));
+   handleIoResult(jcntl::txn_abort(dtokp, xid));
 }
 
 void
 JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
 {
-    writeLockCheck();
-    while(handleIoResult(jcntl::txn_commit(dtokp, xid)));
+    handleIoResult(jcntl::txn_commit(dtokp, xid));
 }
 
 void
@@ -295,36 +282,13 @@
 }
 
 void
-JournalImpl::writeLockCheck()
-{
-    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
-    if (aioWait)
-    {
-        aioWaitLock = true;
-        writeLock.wait();
-    }   
-}
-
-const bool
 JournalImpl::handleIoResult(const iores r)
 {
     writeActivityFlag = true;
     switch (r)
     {
         case rhm::journal::RHM_IORES_SUCCESS:
-            {
-			    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
-                aioWait = false;
-			}
-            return false;
-        case rhm::journal::RHM_IORES_AIO_WAIT:
-            {
-                 qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
-                 aioWait = true;
-            }	 
-            usleep(1000); // TODO: add counter here to limit the time spent in this loop?
-            get_wr_events();
-            return true;
+            return;
         case rhm::journal::RHM_IORES_ENQCAPTHRESH:
             {
                 std::ostringstream oss;
@@ -382,7 +346,6 @@
 		dtokp->release();
         this_dtok_list.pop_front();
     }
-    jip->notifyWriteMonitor();
 }
 
 void

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-12-17 16:21:15 UTC (rev 1501)
@@ -74,10 +74,6 @@
             bool writeActivityFlag;
             bool flushTriggeredFlag;
             qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
-
-            qpid::sys::Monitor writeLock;
-            bool aioWait;
-            bool aioWaitLock; // monitor lock taken
             
             // temp local vars for loadMsgContent below
             void* _xidp;
@@ -152,18 +148,10 @@
             // TimerTask callback
             void getEventsFire();
             void flushFire();
-            
-            // Notify write monitor
-            inline void notifyWriteMonitor()
-                { 
-				    qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
-				    if (aioWaitLock) { aioWaitLock = false; writeLock.notify(); } 
-				}
 
         private:
-            void writeLockCheck();
-            const bool handleIoResult(const journal::iores r);
-            static void  aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
+            void handleIoResult(const journal::iores r);
+            static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
             static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
         }; // class JournalImpl
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-17 16:21:15 UTC (rev 1501)
@@ -42,6 +42,7 @@
 #include <jrnl/file_hdr.hpp>
 #include <jrnl/jerrno.hpp>
 #include <jrnl/jinf.hpp>
+#include <jrnl/slock.hpp>
 #include <sstream>
 #include <unistd.h>
 
@@ -207,16 +208,26 @@
         const size_t this_data_len, data_tok* dtokp, const bool transient)
 {
     check_wstatus("enqueue_data_record");
-    slock s(&_mutex);
-    return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient, false);
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL,
+                0, transient, false), r));
+        return r;
+    }
 }
 
 const iores
 jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
 {
     check_wstatus("enqueue_extern_data_record");
-    slock s(&_mutex);
-    return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient, true);
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient,
+                true), r));
+        return r;
+    }
 }
 
 const iores
@@ -225,9 +236,13 @@
         const bool transient)
 {
     check_wstatus("enqueue_tx_data_record");
-    slock s(&_mutex);
-    return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
-            transient, false);
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp,
+                xid.data(), xid.size(), transient, false), r));
+        return r;
+    }
 }
 
 const iores
@@ -235,8 +250,13 @@
         const std::string& xid, const bool transient)
 {
     check_wstatus("enqueue_extern_txn_data_record");
-    slock s(&_mutex);
-    return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true);
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(),
+                transient, true), r));
+        return r;
+    }
 }
 
 const iores
@@ -266,32 +286,48 @@
 jcntl::dequeue_data_record(data_tok* const dtokp)
 {
     check_wstatus("dequeue_data");
-    slock s(&_mutex);
-    return _wmgr.dequeue(dtokp, NULL, 0);
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, NULL, 0), r));
+        return r;
+    }
 }
 
 const iores
 jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
 {
     check_wstatus("dequeue_data");
-    slock s(&_mutex);
-    return _wmgr.dequeue(dtokp, xid.data(), xid.size());
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r));
+        return r;
+    }
 }
 
 const iores
 jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
 {
     check_wstatus("txn_abort");
-    slock s(&_mutex);
-    return _wmgr.abort(dtokp, xid.data(), xid.size());
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r));
+        return r;
+    }
 }
 
 const iores
 jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
 {
     check_wstatus("txn_commit");
-    slock s(&_mutex);
-    return _wmgr.commit(dtokp, xid.data(), xid.size());
+    {
+        slock s(&_mutex);
+        iores r;
+        while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r));
+        return r;
+    }
 }
 
 const bool
@@ -338,8 +374,10 @@
         return RHM_IORES_SUCCESS;
     if (_readonly_flag)
         throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
-    slock s(&_mutex);
-    return _wmgr.flush();
+    {
+        slock s(&_mutex);
+        return _wmgr.flush();
+    }
 }
 
 // Private functions
@@ -394,6 +432,24 @@
     }
 }
 
+const bool
+jcntl::handle_aio_wait(const iores res, iores& resout)
+{
+    resout = res;
+    if (res == RHM_IORES_AIO_WAIT)
+    {
+        u_int32_t cnt = 0;
+        while (get_wr_events() == 0)
+        {
+            if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+                throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
+            usleep(AIO_CMPL_SLEEP);
+        }
+        return true;
+    }
+    return false;
+}
+
 void
 jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
 {

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-17 16:21:15 UTC (rev 1501)
@@ -46,7 +46,6 @@
 #include <jrnl/lfh.hpp>
 #include <jrnl/rcvdat.hpp>
 #include <jrnl/rmgr.hpp>
-#include <jrnl/slock.hpp>
 #include <jrnl/wmgr.hpp>
 #include <jrnl/wrfc.hpp>
 
@@ -609,6 +608,12 @@
         void aio_cmpl_wait();
 
         /**
+        * \brief Call that blocks until at least one message returns; used to wait for
+        *     AIO wait conditions to clear.
+        */
+        const bool handle_aio_wait(const iores res, iores& resout);
+
+        /**
         * \brief Analyze journal for recovery.
         */
         void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);

Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp	2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/jrnl/slock.hpp	2007-12-17 16:21:15 UTC (rev 1501)
@@ -3,8 +3,8 @@
 *
 * Red Hat Messaging - Message Journal
 *
-* Messaging journal top-level control and interface class
-* rhm::journal::slock. See class documentation for details.
+* Messaging journal scoped lock class rhm::journal::slock and scoped try-clock
+* class rhm::journal::stlock.
 *
 * \author Kim van der Riet
 *




More information about the rhmessaging-commits mailing list