[rhmessaging-commits] rhmessaging commits: r4148 - in store/trunk/cpp: lib/jrnl and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jul 26 15:48:22 EDT 2010


Author: kpvdr
Date: 2010-07-26 15:48:20 -0400 (Mon, 26 Jul 2010)
New Revision: 4148

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/TxnCtxt.cpp
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/aio.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/tests/Makefile.am
   store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
Log:
Performance and efficiency improvement - removed loops using usleep for blocking calls waiting on AIO getevents() and used built-in timeout instead.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -237,8 +237,8 @@
                         _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE);
 }
 
-#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
-#define AIO_SLEEP_TIME_US   10 // 0.01 ms
+//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
+//#define AIO_SLEEP_TIME_US   10 // 0.01 ms
 // Return true if content is recovered from store; false if content is external and must be recovered from an external store.
 // Throw exception for all errors.
 bool
@@ -272,7 +272,6 @@
         bool transient = false;
         bool done = false;
         bool rid_found = false;
-        unsigned aio_sleep_cnt = 0;
         oooRidList.clear();
         while (!done) {
             iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
@@ -295,13 +294,10 @@
                     }
                     break;
                 case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
-                    if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
-                        get_wr_events();
-                        usleep(AIO_SLEEP_TIME_US);
-                    } else {
+                    if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT) {
                         std::stringstream ss;
                         ss << "read_data_record() returned " << mrg::journal::iores_str(res);
-                        ss << "; exceeded maximum wait time";
+                        ss << "; timed out waiting for page to be processed.";
                         throw jexception(mrg::journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
                             "loadMsgContent");
                     }
@@ -425,30 +421,6 @@
     }
 }
 
-#define MAX_INVALID_RETRYS 5 // This will yield a total wait time of 10 sec with 5 log messages at 2 sec intervals
-iores
-JournalImpl::read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff, size_t& xid_len,
-        bool& transient, bool& external, mrg::journal::data_tok* const dtokp, bool ignore_pending_txns)
-{
-    int retry_cnt = 0;
-    iores res;
-    do {
-        res = jcntl::read_data_record(data_buff, tot_data_len, xid_buff, xid_len, transient, external, dtokp, ignore_pending_txns);
-        if (res == mrg::journal::RHM_IORES_RCINVALID) {
-            retry_cnt++;
-            std::ostringstream oss;
-            if (retry_cnt < MAX_INVALID_RETRYS) {
-                oss << "Store read pipeline on queue " << _jid << " timed out waiting for journal header file read, retrying...";
-                log(LOG_WARN, oss.str());
-            } else {
-                oss << "Store read pipeline on queue " << _jid << " timed out waiting for journal header file read, aborting read with RHM_IORES_RCINVALID";
-                log(LOG_ERROR, oss.str());
-            }
-        }
-    } while (res == mrg::journal::RHM_IORES_RCINVALID && retry_cnt < MAX_INVALID_RETRYS);
-    return res;
-}
-
 void
 JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
 {
@@ -524,7 +496,7 @@
 {
     qpid::sys::Mutex::ScopedLock sl(_getf_lock);
     getEventsTimerSetFlag = false;
-    if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
+    if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
     if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
 }
 

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/JournalImpl.h	2010-07-26 19:48:20 UTC (rev 4148)
@@ -185,10 +185,6 @@
 
     void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
 
-    mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
-                                         size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
-                                         bool ignore_pending_txns = false);
-
     void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
 
     void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);

Modified: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/TxnCtxt.cpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -1,7 +1,6 @@
 #include "TxnCtxt.h"
 
 #include <sstream>
-#include <unistd.h> // ::usleep()
 
 #include "jrnl/jexception.hpp"
 #include "StoreException.h"
@@ -76,42 +75,35 @@
 
 TxnCtxt::~TxnCtxt() { if(txn) abort(); }
 
-#define MAX_SYNC_SLEEPS 100000 // tot: ~1 sec
-#define SYNC_SLEEP_TIME_US  10 // 0.01 ms
-
 void TxnCtxt::sync() {
-    bool allWritten = false;
-    bool firstloop = true;
-    long sleep_cnt = 0L;
-    while (loggedtx && !allWritten) {
-        if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
-        if (!firstloop) {
-            ::usleep(SYNC_SLEEP_TIME_US);
-            sleep_cnt++;
-        } // move this into the get events call aiolib..
-        allWritten = true;
-        for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
-            sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
+    if (loggedtx) {
+        try {
+            for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+                jrnl_flush(static_cast<JournalImpl*>(*i));
+            if (preparedXidStorePtr)
+                jrnl_flush(preparedXidStorePtr);
+            for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+                jrnl_sync(static_cast<JournalImpl*>(*i), &journal::jcntl::_aio_cmpl_timeout);
+            if (preparedXidStorePtr)
+                jrnl_sync(preparedXidStorePtr, &journal::jcntl::_aio_cmpl_timeout);
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what());
         }
-        if (preparedXidStorePtr)
-            sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
-        firstloop = false;
     }
 }
 
-void TxnCtxt::sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
-    try {
-        if (jc && !(jc->is_txn_synced(getXid()))) {
-            if (firstloop)
-                jc->flush();
-            allWritten = false;
-            jc->get_wr_events();
-        }
-    } catch (const journal::jexception& e) {
-        THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
-    }
+void TxnCtxt::jrnl_flush(JournalImpl* jc) {
+    if (jc && !(jc->is_txn_synced(getXid())))
+        jc->flush();
 }
 
+void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
+    if (!jc || jc->is_txn_synced(getXid()))
+        return;
+    if (jc->get_wr_events(timeout) == journal::jcntl::AIO_TIMEOUT && timeout)
+        THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()"));
+}
+
 void TxnCtxt::begin(DbEnv* env, bool sync) {
     env->txn_begin(0, &txn, 0);
     if (sync)

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/TxnCtxt.h	2010-07-26 19:48:20 UTC (rev 4148)
@@ -69,6 +69,8 @@
 
     virtual void completeTxn(bool commit);
     void commitTxn(JournalImpl* jc, bool commit);
+    void jrnl_flush(JournalImpl* jc);
+    void jrnl_sync(JournalImpl* jc, timespec* timeout);
 
   public:
     TxnCtxt(IdSequence* _loggedtx=NULL);
@@ -81,7 +83,6 @@
      *@return if the data successfully synced.
      */
     void sync();
-    void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
     void begin(DbEnv* env, bool sync = false);
     void commit();
     void abort();

Modified: store/trunk/cpp/lib/jrnl/aio.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio.hpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/aio.hpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -68,7 +68,7 @@
         return ::io_submit(ctx, nr, aios);
     }
 
-    static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events, timespec* timeout)
+    static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events, timespec* const timeout)
     {
         return ::io_getevents(ctx, min_nr, nr, events, timeout);
     }

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -54,6 +54,27 @@
 namespace journal
 {
 
+#define AIO_CMPL_TIMEOUT_SEC   5
+#define AIO_CMPL_TIMEOUT_NSEC  0
+#define FINAL_AIO_CMPL_TIMEOUT_SEC   15
+#define FINAL_AIO_CMPL_TIMEOUT_NSEC  0
+
+// Static
+timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+bool jcntl::_init = init_statics();
+bool jcntl::init_statics()
+{
+    _aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC;
+    _aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC;
+    _final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC;
+    _final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
+    return true;
+}
+int32_t jcntl::AIO_TIMEOUT = int32_t(-1);
+int32_t jcntl::THREAD_BLOCKED = int32_t(-2);
+
+
 // Functions
 
 jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename):
@@ -263,31 +284,21 @@
     return _rmgr.discard(dtokp);
 } */
 
-// These two combined make a wait time of approx. 2 sec.
-#define MAX_RCINVALID_CNT     20000 // tot: ~ 2 sec
-#define RCINVALID_SLEEP_TIME_US 100 // 0.1 ms
 iores
 jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
         bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns)
 {
     check_rstatus("read_data");
-    unsigned cnt = 0;
-    iores res;
-    do
+    iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
+    if (res == RHM_IORES_RCINVALID)
     {
+        get_wr_events(0); // check for outstanding write events
+        iores sres = _rmgr.synchronize(); // flushes all outstanding read events
+        if (sres != RHM_IORES_SUCCESS)
+            return sres;
+        _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout occurs
         res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
-        if (res == RHM_IORES_RCINVALID)
-        {
-            get_wr_events(); // check for outstanding write events
-            iores sres = _rmgr.synchronize();
-            if (sres != RHM_IORES_SUCCESS)
-                return sres;
-            if (cnt > 0)
-                ::usleep(RCINVALID_SLEEP_TIME_US);
-        }
-        cnt++;
     }
-    while (cnt < MAX_RCINVALID_CNT && res == RHM_IORES_RCINVALID);
     return res;
 }
 
@@ -346,19 +357,19 @@
     return _wmgr.is_txn_synced(xid);
 }
 
-u_int32_t
-jcntl::get_wr_events()
+int32_t
+jcntl::get_wr_events(timespec* const timeout)
 {
     stlock t(_wr_mutex);
-    if (t.locked())
-        return _wmgr.get_events(pmgr::UNUSED);
-    return 0;
+    if (!t.locked())
+        return THREAD_BLOCKED;
+    return _wmgr.get_events(pmgr::UNUSED, timeout);
 }
 
-u_int32_t
-jcntl::get_rd_events()
+int32_t
+jcntl::get_rd_events(timespec* const timeout)
 {
-    return _rmgr.get_events();
+    return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
 }
 
 void
@@ -429,21 +440,14 @@
         _rmgr.invalidate();
 }
 
-#define MAX_AIO_CMPL_SLEEPS 1000000 // tot: ~10 sec
-#define AIO_CMPL_SLEEP_US        10 // 0.01 ms
-
 void
 jcntl::fhdr_wr_sync(const u_int16_t lid)
 {
-    long cnt = 0;
     fcntl* fcntlp = _lpmgr.get_fcntlp(lid);
-    get_wr_events();
     while (fcntlp->wr_fhdr_aio_outstanding())
     {
-        if (++cnt > MAX_AIO_CMPL_SLEEPS)
+        if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
             throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "fhdr_wr_sync");
-        ::usleep(AIO_CMPL_SLEEP_US);
-        get_wr_events();
     }
 }
 
@@ -496,36 +500,28 @@
 void
 jcntl::aio_cmpl_wait()
 {
-    u_int32_t cnt = 0;
     while (_wmgr.get_aio_evt_rem())
     {
-        get_wr_events();
-        if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+        if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
             throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
-        ::usleep(AIO_CMPL_SLEEP_US);
     }
 }
 
 bool
 jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
 {
-    // TODO: factor out the common while loops below into a common fn
-    u_int32_t cnt = 0;
     resout = res;
     if (res == RHM_IORES_PAGE_AIOWAIT)
     {
         while (_wmgr.curr_pg_blocked())
         {
-            _wmgr.get_events(pmgr::UNUSED);
-            if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+            if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
             {
                 std::ostringstream oss;
-                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: ";
-                oss << _wmgr.status_str();
+                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
                 this->log(LOG_CRITICAL, oss.str());
                 throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
             }
-            ::usleep(AIO_CMPL_SLEEP_US);
         }
         return true;
     }
@@ -533,16 +529,13 @@
     {
         while (_wmgr.curr_file_blocked())
         {
-            _wmgr.get_events(pmgr::UNUSED);
-            if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+            if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
             {
                 std::ostringstream oss;
-                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: ";
-                oss << _wmgr.status_str();
+                oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
                 this->log(LOG_CRITICAL, oss.str());
                 throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
             }
-            ::usleep(AIO_CMPL_SLEEP_US);
         }
         _wrfc.wr_reset();
         resout = RHM_IORES_SUCCESS;

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -147,6 +147,11 @@
         smutex _wr_mutex;           ///< Mutex for journal writes
 
     public:
+        static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+        static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
+        static int32_t AIO_TIMEOUT;
+        static int32_t THREAD_BLOCKED;
+
         /**
         * \brief Journal constructor.
         *
@@ -527,7 +532,7 @@
         * dequeue() operations, but if these operations cease, then this call needs to be made to
         * force the processing of any outstanding AIO operations.
         */
-        u_int32_t get_wr_events();
+        int32_t get_wr_events(timespec* const timeout);
 
         /**
         * \brief Forces a check for returned AIO read events.
@@ -536,7 +541,7 @@
         * operations, but if these operations cease, then this call needs to be made to force the
         * processing of any outstanding AIO operations.
         */
-        u_int32_t get_rd_events();
+        int32_t get_rd_events(timespec* const timeout);
 
         /**
         * \brief Stop the journal from accepting any further requests to read or write data.
@@ -659,6 +664,9 @@
         static fcntl* new_fcntl(jcntl* const jcp, const u_int16_t lid, const u_int16_t fid, const rcvdat* const rdp);
 
     protected:
+        static bool _init;
+        static bool init_statics();
+
         /**
         * \brief Check status of journal before allowing write operations.
         */

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -123,7 +123,7 @@
         pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
         virtual ~pmgr();
 
-        virtual u_int32_t get_events(page_state state) = 0;
+        virtual int32_t get_events(page_state state, timespec* const timeout, bool flush = false) = 0;
         inline u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
         static const char* page_state_str(page_state ps);
         inline u_int32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -229,18 +229,23 @@
     }
 }
 
-u_int32_t
-rmgr::get_events(page_state state)
+int32_t
+rmgr::get_events(page_state state, timespec* const timeout, bool flush)
 {
-    int ret = 0;
-    if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
+    if (_aio_evt_rem == 0) // no events to get
+        return 0;
+
+    int32_t ret;
+    if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
     {
-        if (ret == -EINTR) // No events
+        if (ret == -EINTR) // Interrupted by signal
             return 0;
         std::ostringstream oss;
         oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
         throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
     }
+    if (ret == 0 && timeout)
+        return _jc->AIO_TIMEOUT;
 
     std::vector<u_int16_t> pil;
     pil.reserve(ret);
@@ -315,26 +320,16 @@
         _rrfc.set_invalid();
 }
 
-#define MAX_AIO_SLEEPS 1000  // 10 sec
-#define AIO_SLEEP_TIME 10000 // 10 ms
 void
-rmgr::init_validation()
+rmgr::flush(timespec* timeout)
 {
     // Wait for any outstanding AIO read operations to complete before synchronizing
-    int aio_sleep_cnt = 0;
     while (_aio_evt_rem)
     {
-        get_events();
-        if (_aio_evt_rem)
+        if (get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT) // timed out, nothing returned
         {
-            if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
-            {
-                get_events();
-                usleep(AIO_SLEEP_TIME);
-            }
-            else
-                throw jexception(jerrno::JERR__TIMEOUT,
-                    "Invalidate timed out waiting for outstanding read aio to return", "rmgr", "invalidate");
+            throw jexception(jerrno::JERR__TIMEOUT,
+                            "Timed out waiting for outstanding read aio to return", "rmgr", "init_validation");
         }
     }
 
@@ -346,11 +341,24 @@
     _pg_offset_dblks = 0;
 }
 
+bool
+rmgr::wait_for_validity(timespec* timeout, const bool throw_on_timeout)
+{
+    bool timed_out = false;
+    while (!_rrfc.is_valid() && !timed_out)
+    {
+        timed_out = get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT;
+        if (timed_out && throw_on_timeout)
+            throw jexception(jerrno::JERR__TIMEOUT, "Timed out waiting for read validity", "rmgr", "wait_for_validity");
+    }
+    return _rrfc.is_valid();
+}
+
 iores
 rmgr::pre_read_check(data_tok* dtokp)
 {
     if (_aio_evt_rem)
-        get_events();
+        get_events(AIO_COMPLETE, 0);
 
     if (!_rrfc.is_valid())
         return RHM_IORES_RCINVALID;
@@ -524,11 +532,13 @@
 rmgr::aio_cycle()
 {
     // Perform validity checks
-    if (_fhdr_rd_outstanding)
+    if (_fhdr_rd_outstanding) // read of file header still outstanding in aio
         return RHM_IORES_SUCCESS;
     if (!_rrfc.is_valid())
     {
-        init_validation(); // flush outstanding read aio ops (if any), set all pages to UNUSED state, reset counters.
+        // Flush and reset all read states and pointers
+        flush(&jcntl::_aio_cmpl_timeout);
+
         _jc->get_earliest_fid(); // determine initial file to read; calls _rrfc.set_findex() to set value
         // If this file has not yet been written to, return RHM_IORES_EMPTY
         if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
@@ -570,7 +580,7 @@
     else if (num_compl == _cache_num_pages) // This condition exists after invalidation
         res = init_aio_reads(0, _cache_num_pages);
     if (outstanding)
-        get_events();
+        get_events(AIO_COMPLETE, 0);
     return res;
 }
 

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -78,10 +78,11 @@
         iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
                 std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp,
                 bool ignore_pending_txns);
-        u_int32_t get_events(page_state state = AIO_COMPLETE);
+        int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
         void recover_complete();
         inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS; return aio_cycle(); }
         void invalidate();
+        bool wait_for_validity(timespec* const timeout, const bool throw_on_timeout = false);
 
         /* TODO (if required)
         const iores get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
@@ -91,7 +92,7 @@
 
     private:
         void clean();
-        void init_validation();
+        void flush(timespec* timeout);
         iores pre_read_check(data_tok* dtokp);
         iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
         void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -647,7 +647,7 @@
                _page_cb_arr[_pg_index]._state = IN_USE;
         }
     }
-    get_events(UNUSED);
+    get_events(UNUSED, 0);
     if (_page_cb_arr[_pg_index]._state == UNUSED)
         _page_cb_arr[_pg_index]._state = IN_USE;
     return res;
@@ -662,18 +662,24 @@
     return res;
 }
 
-u_int32_t
-wmgr::get_events(page_state state)
+int32_t
+wmgr::get_events(page_state state, timespec* const timeout, bool flush)
 {
+    if (_aio_evt_rem == 0) // no events to get
+        return 0;
+
     int ret = 0;
-    if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
+    if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
     {
         std::ostringstream oss;
         oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
         throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
     }
 
-    u_int32_t tot_data_toks = 0;
+    if (ret == 0 && timeout)
+        return _jc->AIO_TIMEOUT;
+
+    int32_t tot_data_toks = 0;
     for (int i=0; i<ret; i++) // Index of returned AIOs
     {
         if (_aio_evt_rem == 0)

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -115,7 +115,7 @@
         iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
         iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
         iores flush();
-        u_int32_t get_events(page_state state);
+        int32_t get_events(page_state state, timespec* const timeout, bool flush = false);
         bool is_txn_synced(const std::string& xid);
         inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state != UNUSED; }
         inline bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; }

Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/Makefile.am	2010-07-26 19:48:20 UTC (rev 4148)
@@ -29,11 +29,7 @@
 TMP_DATA_DIR=$(abs_srcdir)/tmp_data_dir
 TMP_PYTHON_TEST_DIR=$(abs_srcdir)/python_tests.tmp
  
-if DO_CLUSTER_TESTS
-SUBDIRS = jrnl . cluster
-else
 SUBDIRS = jrnl .
-endif
 
 TESTS =						    \
   SimpleTest                    \

Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -547,7 +547,7 @@
     {
         if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
         {
-            jc.get_wr_events();
+            jc.get_wr_events(0); // *** GEV2
             usleep(AIO_SLEEP_TIME);
         }
         else

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2010-07-26 19:48:20 UTC (rev 4148)
@@ -191,7 +191,7 @@
                     _tcrp->add_exception("Timeout waiting for RHM_IORES_ENQCAPTHRESH to clear.");
                     panic();
                 }
-                else if (get_wr_events() == 0)
+                else if (get_wr_events(0) == 0) // *** GEV2
                 {
                     mrg::journal::slock sl(_wr_full_mutex);
                     _wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
@@ -273,7 +273,7 @@
                             xptr = 0;
                             break;
                         case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
-                            if (get_rd_events() == 0)
+                            if (get_rd_events(0) == 0)
                             {
                                 mrg::journal::slock sl(_rd_aio_mutex);
                                 _rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); // MAX_RD_WAIT in ms



More information about the rhmessaging-commits mailing list