[rhmessaging-commits] rhmessaging commits: r1645 - in store/trunk/cpp: lib/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Feb 6 10:39:52 EST 2008


Author: kpvdr
Date: 2008-02-06 10:39:52 -0500 (Wed, 06 Feb 2008)
New Revision: 1645

Added:
   store/trunk/cpp/lib/jrnl/cvar.cpp
   store/trunk/cpp/lib/jrnl/cvar.hpp
   store/trunk/cpp/lib/jrnl/slock.cpp
   store/trunk/cpp/lib/jrnl/time_ns.cpp
   store/trunk/cpp/lib/jrnl/time_ns.hpp
   store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp
Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/jrnl/aio_cb.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/slock.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/tests/jrnl/
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
   store/trunk/cpp/tests/jrnl/Makefile.am
   store/trunk/cpp/tests/jrnl/jtt/Makefile.am
   store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp
   store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp
Log:
Bugfixes to read pipeline in journal; updates to journal test tool (jtt) to test read. Also changed usleep() calls in jtt to use pthread condition variables instead.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -80,7 +80,7 @@
 }
 
 void
-JournalImpl::recover(const aio_cb wr_cb,
+JournalImpl::recover(const journal::rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
         boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t& highest_rid,
         u_int64_t queue_id)
 {
@@ -91,7 +91,7 @@
         prep_xid_list.push_back(i->xid);
     }
 
-    jcntl::recover(wr_cb, prep_xid_list, highest_rid);
+    jcntl::recover(rd_cb, wr_cb, prep_xid_list, highest_rid);
         
     // Populate PreparedTransaction lists from _tmap
     for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
@@ -341,3 +341,7 @@
 	    dtokp->release();
     }
 }
+
+// void
+// JournalImpl::aio_rd_callback(jcntl* /*journal*/, std::vector<u_int16_t>& /*pil*/)
+// {}

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-02-06 15:39:52 UTC (rev 1645)
@@ -92,15 +92,15 @@
                         const qpid::sys::Duration flushTimeout);
             virtual ~JournalImpl();
 
-            inline void initialize() { jcntl::initialize(&aio_wr_callback); } 
+            inline void initialize() { jcntl::initialize(0, &aio_wr_callback); } 
 
-            void recover(const journal::aio_cb wr_cb,
+            void recover(const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb,
                     boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
                     u_int64_t& highest_rid, u_int64_t queue_id);
 
             inline void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
                     u_int64_t& highest_rid, u_int64_t queue_id) {
-                recover(&aio_wr_callback, prep_tx_list, highest_rid, queue_id);
+                recover(0, &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
             }
             
             // Temporary fn to read and save last msg read from journal so it can be assigned
@@ -144,6 +144,7 @@
         private:
             void handleIoResult(const journal::iores r);
             static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
+            // static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
         }; // class JournalImpl
 
     } // namespace bdbstore

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/Makefile.am	2008-02-06 15:39:52 UTC (rev 1645)
@@ -38,6 +38,7 @@
   StoreException.h		\
   StringDbt.h			\
   TxnCtxt.h				\
+  jrnl/cvar.cpp             \
   jrnl/data_tok.cpp			\
   jrnl/deq_rec.cpp			\
   jrnl/enq_map.cpp			\
@@ -52,12 +53,15 @@
   jrnl/nlfh.cpp			\
   jrnl/pmgr.cpp			\
   jrnl/rmgr.cpp			\
+  jrnl/slock.cpp        \
   jrnl/rrfc.cpp			\
+  jrnl/time_ns.cpp      \
   jrnl/txn_map.cpp			\
   jrnl/txn_rec.cpp			\
   jrnl/wmgr.cpp			\
   jrnl/wrfc.cpp			\
   jrnl/aio_cb.hpp			\
+  jrnl/cvar.hpp             \
   jrnl/data_tok.hpp			\
   jrnl/deq_hdr.hpp			\
   jrnl/deq_rec.hpp			\
@@ -82,6 +86,7 @@
   jrnl/rmgr.hpp			\
   jrnl/rrfc.hpp			\
   jrnl/slock.hpp        \
+  jrnl/time_ns.hpp      \
   jrnl/txn_hdr.hpp			\
   jrnl/txn_map.hpp			\
   jrnl/txn_rec.hpp			\

Modified: store/trunk/cpp/lib/jrnl/aio_cb.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio_cb.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/aio_cb.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -44,7 +44,8 @@
     /**
     * \brief Callback function pointer to be called when AIO events arrive.
     */
-    typedef void (*aio_cb)(jcntl* journal, std::vector<data_tok*>& dtokl);
+    typedef void (*wr_aio_cb)(jcntl* journal, std::vector<data_tok*>& dtokl);
+    typedef void (*rd_aio_cb)(jcntl* journal, std::vector<u_int16_t>& pil);
 }
 }
 

Added: store/trunk/cpp/lib/jrnl/cvar.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/cvar.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/cvar.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,31 @@
+/**
+* \file cvar.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class rhm::journal::cvar (condition variable). See
+* comments in file cvar.hpp for details.
+*
+* Copyright (C) 2007, 2008 Red Hat Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include <jrnl/cvar.hpp>

Added: store/trunk/cpp/lib/jrnl/cvar.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/cvar.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/cvar.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,85 @@
+/**
+* \file cvar.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* This file contains a posix condition variable class.
+*
+* Copyright 2007, 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_cvar_hpp
+#define rhm_journal_cvar_hpp
+
+#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
+#include <jrnl/time_ns.hpp>
+#include <pthread.h>
+#include <sstream>
+
+namespace rhm
+{
+namespace journal
+{
+
+    // Ultra-simple thread condition variable class
+    class cvar
+    {
+    private:
+        pthread_mutex_t& _m;
+        pthread_cond_t _c;
+    public:
+        inline cvar(pthread_mutex_t& m) : _m(m) { ::pthread_cond_init(&_c, 0); }
+        inline ~cvar() { ::pthread_cond_destroy(&_c); }
+        inline void wait()
+        {
+            PTHREAD_CHK(::pthread_cond_wait(&_c, &_m), "pthread_cond_wait", "cvar", "wait");
+        }
+        inline void timedwait(timespec& ts)
+        {
+            PTHREAD_CHK(::pthread_cond_timedwait(&_c, &_m, &ts), "pthread_cond_timedwait", "cvar",
+                    "timedwait");
+        }
+        inline const bool waitintvl(const long intvl_ns)
+        {
+            time_ns t; t.now(); t+=intvl_ns;
+            int ret = ::pthread_cond_timedwait(&_c, &_m, &t);
+            if (ret == ETIMEDOUT)
+                return true;
+            PTHREAD_CHK(ret, "pthread_cond_timedwait", "cvar", "waitintvl");
+            return false;
+        }
+        inline void signal()
+        {
+            PTHREAD_CHK(::pthread_cond_signal(&_c), "pthread_cond_signal", "cvar", "notify");
+        }
+        inline void broadcast()
+        {
+            PTHREAD_CHK(::pthread_cond_broadcast(&_c), "pthread_cond_broadcast", "cvar",
+                    "broadcast");
+        }
+    };
+
+}
+}
+
+#endif

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -71,8 +71,8 @@
     _wmgr(this, _emap, _tmap, _wrfc),
     _rcvdat(_num_jfiles)
 {
-    pthread_mutex_init(&_wr_mutex, 0);
-    pthread_mutex_init(&_gev_mutex, 0);
+    ::pthread_mutex_init(&_wr_mutex, 0);
+    ::pthread_mutex_init(&_gev_mutex, 0);
 }
 
 jcntl::~jcntl()
@@ -86,12 +86,12 @@
             delete _datafh[i];
         delete[] _datafh;
     }
-    pthread_mutex_destroy(&_gev_mutex);
-    pthread_mutex_destroy(&_wr_mutex);
+    ::pthread_mutex_destroy(&_gev_mutex);
+    ::pthread_mutex_destroy(&_wr_mutex);
 }
 
 void
-jcntl::initialize(const aio_cb wr_cb)
+jcntl::initialize(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb)
 {
     _init_flag = false;
     _stop_flag = false;
@@ -125,7 +125,7 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
-    _rmgr.initialize(0);
+    _rmgr.initialize(rd_cb, 0);
     _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
 
     // Write info file (<basename>.jinf) to disk
@@ -135,8 +135,8 @@
 }
 
 void
-jcntl::recover(const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list,
-        u_int64_t& highest_rid)
+jcntl::recover(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
+        const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
 {
     _init_flag = false;
     _stop_flag = false;
@@ -180,7 +180,7 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
-    _rmgr.initialize(_rcvdat._fro);
+    _rmgr.initialize(rd_cb, _rcvdat._fro);
     _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
     
     _readonly_flag = true;
@@ -430,7 +430,7 @@
     u_int32_t cnt = 0;
     while (_wmgr.get_aio_evt_rem())
     {
-        _wmgr.get_events(pmgr::UNUSED);
+        get_wr_events();
         if (cnt++ > MAX_AIO_CMPL_SLEEPS)
             throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
         ::usleep(AIO_CMPL_SLEEP);

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -178,11 +178,12 @@
         * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
         * used.</b>
         *
+        * \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
         * \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
         *
         * \exception TODO
         */
-        void initialize(const aio_cb wr_cb);
+        void initialize(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb);
 
         /**
         * /brief Initialize journal by recovering state from previously written journal.
@@ -199,14 +200,15 @@
         * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
         * used.</b>
         *
+        * \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
         * \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
         * \param prep_txn_list
         * \param highest_rid Returns the highest rid found in the journal during recover
         *
         * \exception TODO
         */
-        void recover(const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list,
-                u_int64_t& highest_rid);
+        void recover(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
+                const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
 
         /**
         * \brief Notification to the journal that recovery is complete and that normal operation

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -42,7 +42,6 @@
 
 #include <deque>
 #include <libaio.h>
-#include <jrnl/aio_cb.hpp>
 #include <jrnl/data_tok.hpp>
 #include <jrnl/deq_rec.hpp>
 #include <jrnl/enq_map.hpp>

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -51,8 +51,9 @@
 {}
 
 void
-rmgr::initialize(size_t fro)
+rmgr::initialize(const rd_aio_cb rd_cb, const size_t fro)
 {
+    _cb = rd_cb;
     initialize();
     if (fro)
     {
@@ -390,6 +391,8 @@
         throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
     }
 
+    std::vector<u_int16_t> pil;
+    pil.reserve(ret);
     for (int i=0; i<ret; i++) // Index of returned AIOs
     {
         if (_aio_evt_rem == 0)
@@ -419,8 +422,12 @@
         pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
         pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
         pcbp->_state = state;
+        pil[i] = pcbp->_index;
     }
 
+    // Perform AIO return callback
+    if (_cb && ret)
+        (_cb)(_jc, pil);
     return ret;
 }
 
@@ -583,7 +590,7 @@
         // If skip still incomplete, move to next page and decode again
         if (tot_dblk_cnt < dsize_dblks)
         {
-            if (_pg_offset_dblks == JRNL_SBLK_SIZE * JRNL_RMGR_PAGE_SIZE)
+            if (dblks_rem() == 0)
                 rotate_page();
             if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
             {

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -39,6 +39,7 @@
 }
 }
 
+#include <jrnl/aio_cb.hpp>
 #include <jrnl/enums.hpp>
 #include <jrnl/pmgr.hpp>
 #include <jrnl/rec_hdr.hpp>
@@ -58,14 +59,15 @@
     class rmgr : public pmgr
     {
     private:
-        rrfc& _rrfc;        ///< Ref to read rotating file controller
+        rrfc& _rrfc;            ///< Ref to read rotating file controller
         rec_hdr _hdr;           ///< Header used to determind record type
+        rd_aio_cb _cb;          ///< Callback function pointer for AIO events
 
     public:
         rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
         virtual ~rmgr();
 
-        void initialize(size_t fro);
+        void initialize(const rd_aio_cb rd_cb, const size_t fro);
         const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
                 const void** const data, bool auto_discard);
         const iores discard(data_tok* dtok);

Added: store/trunk/cpp/lib/jrnl/slock.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/slock.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,31 @@
+/**
+* \file slock.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class rhm::journal::slock (scoped lock). See
+* comments in file slock.hpp for details.
+*
+* Copyright (C) 2007, 2008 Red Hat Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include <jrnl/slock.hpp>

Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/slock.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -31,11 +31,11 @@
 #ifndef rhm_journal_slock_hpp
 #define rhm_journal_slock_hpp
 
-#include <sstream>
-#include <pthread.h>
 #include <errno.h>
 #include <jrnl/jerrno.hpp>
 #include <jrnl/jexception.hpp>
+#include <pthread.h>
+#include <sstream>
 
 namespace rhm
 {
@@ -50,11 +50,11 @@
     public:
         inline slock(pthread_mutex_t* m) : _m(m)
         {
-            PTHREAD_CHK(pthread_mutex_lock(_m), "pthread_mutex_lock", "slock", "slock");
+            PTHREAD_CHK(::pthread_mutex_lock(_m), "pthread_mutex_lock", "slock", "slock");
         }
         inline ~slock()
         {
-            PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "slock", "~slock");
+            PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock", "slock", "~slock");
         }
     };
 
@@ -67,7 +67,7 @@
     public:
         inline stlock(pthread_mutex_t* m) : _m(m), _locked(false)
         {
-            int ret = pthread_mutex_trylock(_m);
+            int ret = ::pthread_mutex_trylock(_m);
             _locked = (ret == 0); // check if lock obtained
             if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "pthread_mutex_trylock", "stlock",
                     "stlock");
@@ -75,7 +75,8 @@
         inline ~stlock()
         {
             if (_locked)
-                PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "stlock", "~stlock");
+                PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock", "stlock",
+                        "~stlock");
         }
         inline const bool locked() const { return _locked; }
     };

Added: store/trunk/cpp/lib/jrnl/time_ns.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/time_ns.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/time_ns.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,47 @@
+/**
+* \file time_ns.cpp
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "time_ns.hpp"
+#include <sstream>
+
+namespace rhm
+{
+namespace journal
+{
+
+const std::string
+time_ns::str(int precision) const
+{
+    const double t = tv_sec + (tv_nsec/1e9);
+    std::ostringstream oss;
+    oss.setf(std::ios::fixed, std::ios::floatfield);
+    oss.precision(precision);
+    oss << t;
+    return oss.str();
+}
+
+
+} // namespace jtt
+} // namespace rhm

Added: store/trunk/cpp/lib/jrnl/time_ns.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/time_ns.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/time_ns.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,97 @@
+/**
+* \file time_ns.hpp
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+#ifndef rhm_jtt_time_ns_hpp
+#define rhm_jtt_time_ns_hpp
+
+#include <errno.h>
+#include <string>
+#include <time.h>
+
+namespace rhm
+{
+namespace journal
+{
+
+struct time_ns : public timespec
+{
+    inline time_ns() { tv_sec = 0; tv_nsec = 0; }
+    inline time_ns(const time_t sec, const long nsec = 0) { tv_sec = sec; tv_nsec = nsec; }
+    inline time_ns(const time_ns& t) { tv_sec = t.tv_sec; tv_nsec = t.tv_nsec; }
+
+    inline void set_zero() { tv_sec = 0; tv_nsec = 0; }
+    inline bool is_zero() const { return tv_sec == 0 && tv_nsec == 0; }
+    inline int now() { if(::clock_gettime(CLOCK_REALTIME, this)) return errno; return 0; }
+    const std::string str(int precision = 6) const;
+
+    inline time_ns& operator=(const time_ns& rhs)
+        { tv_sec = rhs.tv_sec; tv_nsec = rhs.tv_nsec; return *this; }
+    inline time_ns& operator+=(const time_ns& rhs)
+    {
+        tv_nsec += rhs.tv_nsec;
+        if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+        tv_sec += rhs.tv_sec;
+        return *this;
+    }
+    inline time_ns& operator+=(const long ns)
+    {
+        tv_nsec += ns;
+        if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+        return *this;
+    }
+    inline time_ns& operator-=(const long ns)
+    {
+        tv_nsec -= ns;
+        if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+        return *this;
+    }
+    inline time_ns& operator-=(const time_ns& rhs)
+    {
+        tv_nsec -= rhs.tv_nsec;
+        if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+        tv_sec -= rhs.tv_sec;
+        return *this;
+    }
+    inline const time_ns operator+(const time_ns& rhs)
+        { time_ns t(*this); t += rhs; return t; }
+    inline const time_ns operator-(const time_ns& rhs)
+        { time_ns t(*this); t -= rhs; return t; }
+    inline bool operator==(const time_ns& rhs)
+       { return tv_sec == rhs.tv_sec && tv_nsec == rhs.tv_nsec; }
+    inline bool operator!=(const time_ns& rhs)
+       { return tv_sec != rhs.tv_sec || tv_nsec != rhs.tv_nsec; }
+    inline bool operator>(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec > rhs.tv_nsec; return tv_sec > rhs.tv_sec; }
+    inline bool operator>=(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec >= rhs.tv_nsec; return tv_sec >= rhs.tv_sec; }
+    inline bool operator<(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec < rhs.tv_nsec; return tv_sec < rhs.tv_sec; }
+    inline bool operator<=(const time_ns& rhs)
+       { if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; }
+};
+    
+} // namespace jtt
+} // namespace rhm
+
+#endif // ifndef rhm_jtt_time_ns_hpp

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -37,8 +37,6 @@
 #include <jrnl/jcntl.hpp>
 #include <jrnl/jerrno.hpp>
 
-#include <iostream> // debug
-
 namespace rhm
 {
 namespace journal
@@ -91,7 +89,7 @@
 }
 
 void
-wmgr::initialize(const aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
+wmgr::initialize(const wr_aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
         size_t eo)
 {
     _enq_busy = false;
@@ -758,8 +756,6 @@
     int ret = 0;
     if ((ret = ::io_getevents(_ioctx, 0, JRNL_RMGR_PAGES + JRNL_WMGR_PAGES, _ioevt_arr, 0)) < 0)
     {
-        if (ret == -EINTR) // No events
-            return 0;
         std::ostringstream oss;
         oss << "io_getevents() failed: " << strerror(-ret) << " (" << ret << ")";
         throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
@@ -781,8 +777,15 @@
         if (aioret < 0)
         {
             std::ostringstream oss;
-            oss << "AIO write operation failed: " << strerror(-aioret) << " (" << aioret << ")";
-            oss << " [pg=" << pcbp->_index << " size=" << iocbp->u.c.nbytes;
+            oss << "AIO write operation failed: " << strerror(-aioret) << " (" << aioret << ") [";
+            if (pcbp)
+                oss << "pg=" << pcbp->_index;
+            else
+            {
+                file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
+                oss << "fid=" << fhp->_fid;
+            }
+            oss << " size=" << iocbp->u.c.nbytes;
             oss << " offset=" << iocbp->u.c.offset << " fh=" << iocbp->aio_fildes << "]";
             throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
         }

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -39,6 +39,7 @@
 }
 }
 
+#include <jrnl/aio_cb.hpp>
 #include <jrnl/enums.hpp>
 #include <jrnl/pmgr.hpp>
 #include <jrnl/wrfc.hpp>
@@ -93,7 +94,7 @@
         deq_rec _deq_rec;               ///< Dequeue record used for encoding/decoding
         txn_rec _txn_rec;               ///< Transaction record used for encoding/decoding
         std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
-        aio_cb _cb;                  ///< Callback function pointer for AIO events
+        wr_aio_cb _cb;                  ///< Callback function pointer for AIO events
 
     public:
         wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc);
@@ -101,7 +102,7 @@
                 const u_int32_t max_iowait_us);
         virtual ~wmgr();
 
-        void initialize(aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
+        void initialize(wr_aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
                 size_t eo = 0);
         const iores enqueue(const void* const data_buff, const size_t tot_data_len,
                 const size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,


Property changes on: store/trunk/cpp/tests/jrnl
___________________________________________________________________
Name: svn:ignore
   - .deps
.libs
Makefile
Makefile.in
jtest
_ut_enq_map
_ut_jdir
_ut_jerrno
_ut_jexception
_ut_jinf
_ut_rec_hdr
_ut_txn_map

   + .deps
.libs
Makefile
Makefile.in
jtest
_ut_enq_map
_ut_jdir
_ut_jerrno
_ut_jexception
_ut_jinf
_ut_rec_hdr
_ut_time_ns
_ut_txn_map


Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -647,14 +647,14 @@
 void
 JournalSystemTests::jrnl_init(rhm::journal::jcntl* jc)
 {
-    jc->initialize(0);
+    jc->initialize(0, 0);
 }
 
 void
 JournalSystemTests::jrnl_recover(rhm::journal::jcntl* jc, vector<string> txn_list,
         u_int64_t& highest_rid)
 {
-    jc->recover(0, txn_list, highest_rid);
+    jc->recover(0, 0, txn_list, highest_rid);
 }
 
 void

Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/Makefile.am	2008-02-06 15:39:52 UTC (rev 1645)
@@ -28,6 +28,7 @@
 # SUBDIRS = jtt
 
 TESTS = \
+  _ut_time_ns \
   _ut_jexception \
   _ut_jerrno \
   _ut_rec_hdr \
@@ -42,6 +43,7 @@
   libdlclose_noop.la
 
 check_PROGRAMS = \
+  _ut_time_ns \
   _ut_jexception \
   _ut_jerrno \
   _ut_rec_hdr \
@@ -53,6 +55,9 @@
 UNIT_TEST_SRCS = ../unit_test.cpp
 UNIT_TEST_LDADD = -lboost_unit_test_framework -lbdbstore -L../../lib/.libs 
 
+_ut_time_ns_SOURCES = _ut_time_ns.cpp $(UNIT_TEST_SRCS)
+_ut_time_ns_LDFLAGS = $(UNIT_TEST_LDADD)
+
 _ut_jexception_SOURCES = _ut_jexception.cpp $(UNIT_TEST_SRCS)
 _ut_jexception_LDFLAGS = $(UNIT_TEST_LDADD)
 

Added: store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp	                        (rev 0)
+++ store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,151 @@
+/**
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "../unit_test.h"
+#include <jrnl/time_ns.hpp>
+
+using namespace boost::unit_test;
+using namespace rhm::journal;
+
+QPID_AUTO_TEST_SUITE(jtt_time_ns)
+
+BOOST_AUTO_TEST_CASE(time_ns_constructors)
+{
+    const time_t sec = 123;
+    const long nsec = 123456789;
+
+    time_ns t1;
+    BOOST_CHECK_EQUAL(t1.tv_sec, 0);
+    BOOST_CHECK_EQUAL(t1.tv_nsec, 0);
+    BOOST_CHECK_EQUAL(t1.is_zero(), true);
+    time_ns t2(sec, nsec);
+    BOOST_CHECK_EQUAL(t2.tv_sec, sec);
+    BOOST_CHECK_EQUAL(t2.tv_nsec, nsec);
+    BOOST_CHECK_EQUAL(t2.is_zero(), false);
+    time_ns t3(t1);
+    BOOST_CHECK_EQUAL(t3.tv_sec, 0);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, 0);
+    BOOST_CHECK_EQUAL(t3.is_zero(), true);
+    time_ns t4(t2);
+    BOOST_CHECK_EQUAL(t4.tv_sec, sec);
+    BOOST_CHECK_EQUAL(t4.tv_nsec, nsec);
+    BOOST_CHECK_EQUAL(t4.is_zero(), false);
+    t4.set_zero();
+    BOOST_CHECK_EQUAL(t4.tv_sec, 0);
+    BOOST_CHECK_EQUAL(t4.tv_nsec, 0);
+    BOOST_CHECK_EQUAL(t4.is_zero(), true);
+}
+
+BOOST_AUTO_TEST_CASE(time_ns_operators)
+{
+    const time_t sec1 = 123;
+    const long nsec1 = 123456789;
+    const time_t sec2 = 1;
+    const long nsec2 = 999999999;
+    const time_t sec_sum = sec1 + sec2 + 1;
+    const long nsec_sum = nsec1 + nsec2 - 1000000000;
+    const time_t sec_1_minus_2 = sec1 - sec2 - 1;
+    const long nsec_1_minus_2 = nsec1 - nsec2 + 1000000000;
+    const time_t sec_2_minus_1 = sec2 - sec1;
+    const long nsec_2_minus_1 = nsec2 - nsec1;
+    time_ns z;
+    time_ns t1(sec1, nsec1);
+    time_ns t2(sec2, nsec2);
+
+    time_ns t3 = z;
+    BOOST_CHECK_EQUAL(t3.tv_sec, 0);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, 0);
+    BOOST_CHECK_EQUAL(t3 == z, true);
+    BOOST_CHECK_EQUAL(t3 != z, false);
+    BOOST_CHECK_EQUAL(t3 > z, false);
+    BOOST_CHECK_EQUAL(t3 >= z, true);
+    BOOST_CHECK_EQUAL(t3 < z, false);
+    BOOST_CHECK_EQUAL(t3 <= z, true);
+
+    t3 = t1;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec1);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec1);
+    BOOST_CHECK_EQUAL(t3 == t1, true);
+    BOOST_CHECK_EQUAL(t3 != t1, false);
+    BOOST_CHECK_EQUAL(t3 > t1, false);
+    BOOST_CHECK_EQUAL(t3 >= t1, true);
+    BOOST_CHECK_EQUAL(t3 < t1, false);
+    BOOST_CHECK_EQUAL(t3 <= t1, true);
+    
+    t3 += z;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec1);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec1);
+
+    t3 = t2;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec2);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec2);
+    BOOST_CHECK_EQUAL(t3 == t2, true);
+    BOOST_CHECK_EQUAL(t3 != t2, false);
+    BOOST_CHECK_EQUAL(t3 > t2, false);
+    BOOST_CHECK_EQUAL(t3 >= t2, true);
+    BOOST_CHECK_EQUAL(t3 < t2, false);
+    BOOST_CHECK_EQUAL(t3 <= t2, true);
+    
+    t3 += z;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec2);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec2);
+    
+    t3 = t1;
+    t3 += t2;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec_sum);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_sum);
+
+    t3 = t1;
+    t3 -= t2;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec_1_minus_2);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_1_minus_2);
+
+    t3 = t2;
+    t3 -= t1;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec_2_minus_1);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_2_minus_1);
+    
+    t3 = t1 + t2;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec_sum);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_sum);
+
+    t3 = t1 - t2;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec_1_minus_2);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_1_minus_2);
+
+    t3 = t2 - t1;
+    BOOST_CHECK_EQUAL(t3.tv_sec, sec_2_minus_1);
+    BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_2_minus_1);
+}
+
+BOOST_AUTO_TEST_CASE(time_ns_str)
+{
+    time_ns t1(123, 123456789);
+    BOOST_CHECK_EQUAL(t1.str(), "123.123457");
+    BOOST_CHECK_EQUAL(t1.str(9), "123.123456789");
+    BOOST_CHECK_EQUAL(t1.str(0), "123");
+    time_ns t2(1, 1);
+    BOOST_CHECK_EQUAL(t2.str(9), "1.000000001");
+    time_ns t3(-12, 345);
+    BOOST_CHECK_EQUAL(t3.str(9), "-11.999999655");
+}

Modified: store/trunk/cpp/tests/jrnl/jtt/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/Makefile.am	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/Makefile.am	2008-02-06 15:39:52 UTC (rev 1645)
@@ -76,6 +76,7 @@
     ${LIBOBJDIR}/pmgr.o \
     ${LIBOBJDIR}/rmgr.o \
     ${LIBOBJDIR}/rrfc.o \
+    ${LIBOBJDIR}/time_ns.o \
     ${LIBOBJDIR}/txn_map.o \
     ${LIBOBJDIR}/txn_rec.o \
     ${LIBOBJDIR}/wmgr.o \
@@ -122,6 +123,7 @@
     ${LIBOBJDIR}/pmgr.o \
     ${LIBOBJDIR}/rmgr.o \
     ${LIBOBJDIR}/rrfc.o \
+    ${LIBOBJDIR}/time_ns.o \
     ${LIBOBJDIR}/txn_map.o \
     ${LIBOBJDIR}/txn_rec.o \
     ${LIBOBJDIR}/wmgr.o \
@@ -129,23 +131,32 @@
 
 _ut_test_case_SOURCES = _ut_test_case.cpp ../../unit_test.cpp
 _ut_test_case_LDFLAGS = -lboost_unit_test_framework -lrt
-_ut_test_case_LDADD = test_case.o test_case_result.o test_case_result_agregation.o
+_ut_test_case_LDADD = test_case.o test_case_result.o test_case_result_agregation.o \
+    ${LIBOBJDIR}/time_ns.o
 
+
 _ut_test_case_result_SOURCES = _ut_test_case_result.cpp ../../unit_test.cpp
 _ut_test_case_result_LDFLAGS = -lboost_unit_test_framework -lrt
 _ut_test_case_result_LDADD = test_case_result.o \
     ${LIBOBJDIR}/jerrno.o \
-    ${LIBOBJDIR}/jexception.o
+    ${LIBOBJDIR}/jexception.o \
+    ${LIBOBJDIR}/time_ns.o
 
 _ut_test_case_result_agregation_SOURCES = _ut_test_case_result_agregation.cpp ../../unit_test.cpp
 _ut_test_case_result_agregation_LDFLAGS = -lboost_unit_test_framework -lrt
 _ut_test_case_result_agregation_LDADD = test_case_result.o test_case_result_agregation.o \
     ${LIBOBJDIR}/jerrno.o \
-    ${LIBOBJDIR}/jexception.o
+    ${LIBOBJDIR}/jexception.o \
+    ${LIBOBJDIR}/time_ns.o
 
 _ut_test_case_set_SOURCES = _ut_test_case_set.cpp ../../unit_test.cpp
 _ut_test_case_set_LDFLAGS = -lboost_unit_test_framework -lrt
-_ut_test_case_set_LDADD = test_case.o test_case_set.o test_case_result.o test_case_result_agregation.o
+_ut_test_case_set_LDADD = \
+    test_case.o \
+    test_case_set.o \
+    test_case_result.o \
+    test_case_result_agregation.o \
+    ${LIBOBJDIR}/time_ns.o
 
 EXTRA_DIST = \
     jtt.csv \

Modified: store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -38,15 +38,12 @@
     BOOST_CHECK_EQUAL(tcr.jid(), jid);
     BOOST_CHECK_EQUAL(tcr.exception(), false);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
-    const timespec& ts1 = tcr.start_time();
-    BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
-    const timespec& ts2 = tcr.stop_time();
-    BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
-    const timespec& ts3 = tcr.test_time();
-    BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+    const time_ns& ts1 = tcr.start_time();
+    BOOST_CHECK(ts1.is_zero());
+    const time_ns& ts2 = tcr.stop_time();
+    BOOST_CHECK(ts2.is_zero());
+    const time_ns& ts3 = tcr.test_time();
+    BOOST_CHECK(ts3.is_zero());
 }
 
 BOOST_AUTO_TEST_CASE(test_case_result_start_stop)
@@ -55,37 +52,30 @@
     test_case_result tcr(jid);
     BOOST_CHECK_EQUAL(tcr.exception(), false);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
-    const timespec& ts1 = tcr.start_time();
-    BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
-    const timespec& ts2 = tcr.stop_time();
-    BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
-    const timespec& ts3 = tcr.test_time();
-    BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+    const time_ns& ts1 = tcr.start_time();
+    BOOST_CHECK(ts1.is_zero());
+    const time_ns& ts2 = tcr.stop_time();
+    BOOST_CHECK(ts2.is_zero());
+    const time_ns& ts3 = tcr.test_time();
+    BOOST_CHECK(ts3.is_zero());
 
     tcr.set_start_time();
     BOOST_CHECK_EQUAL(tcr.exception(), false);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
-    const timespec& ts4 = tcr.start_time();
-    BOOST_CHECK(ts4.tv_sec > 0);
-    BOOST_CHECK(ts4.tv_nsec > 0);
-    const timespec& ts5 = tcr.stop_time();
-    BOOST_CHECK_EQUAL(ts5.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts5.tv_nsec, 0);
-    const timespec& ts6 = tcr.test_time();
-    BOOST_CHECK_EQUAL(ts6.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts6.tv_nsec, 0);
+    const time_ns& ts4 = tcr.start_time();
+    BOOST_CHECK(!ts4.is_zero());
+    const time_ns& ts5 = tcr.stop_time();
+    BOOST_CHECK(ts5.is_zero());
+    const time_ns& ts6 = tcr.test_time();
+    BOOST_CHECK(ts6.is_zero());
     
     ::usleep(1100000); // 1.1 sec in microseconds
     tcr.set_stop_time();
     BOOST_CHECK_EQUAL(tcr.exception(), false);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
-    const timespec& ts7 = tcr.stop_time();
-    BOOST_CHECK(ts7.tv_sec > 0);
-    BOOST_CHECK(ts7.tv_nsec > 0);
-    const timespec& ts8 = tcr.test_time();
+    const time_ns& ts7 = tcr.stop_time();
+    BOOST_CHECK(!ts7.is_zero());
+    const time_ns& ts8 = tcr.test_time();
     BOOST_CHECK(ts8.tv_sec == 1);
     BOOST_CHECK(ts8.tv_nsec > 100000000); // 0.1 sec in nanoseconds
     BOOST_CHECK(ts8.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -104,10 +94,9 @@
     BOOST_CHECK_EQUAL(tcr.exception(), true);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
     BOOST_CHECK_EQUAL(tcr[0], e.what());
-    const timespec& ts1 = tcr.stop_time();
-    BOOST_CHECK(ts1.tv_sec > 0);
-    BOOST_CHECK(ts1.tv_nsec > 0);
-    const timespec& ts2 = tcr.test_time();
+    const time_ns& ts1 = tcr.stop_time();
+    BOOST_CHECK(!ts1.is_zero());
+    const time_ns& ts2 = tcr.test_time();
     BOOST_CHECK(ts2.tv_sec == 1);
     BOOST_CHECK(ts2.tv_nsec > 100000000); // 0.1 sec in nanoseconds
     BOOST_CHECK(ts2.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -124,10 +113,9 @@
     BOOST_CHECK_EQUAL(tcr.exception(), true);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
     BOOST_CHECK_EQUAL(tcr[0], err_msg);
-    const timespec& ts1 = tcr.stop_time();
-    BOOST_CHECK(ts1.tv_sec > 0);
-    BOOST_CHECK(ts1.tv_nsec > 0);
-    const timespec& ts2 = tcr.test_time();
+    const time_ns& ts1 = tcr.stop_time();
+    BOOST_CHECK(!ts1.is_zero());
+    const time_ns& ts2 = tcr.test_time();
     BOOST_CHECK(ts2.tv_sec == 1);
     BOOST_CHECK(ts2.tv_nsec > 100000000); // 0.1 sec in nanoseconds
     BOOST_CHECK(ts2.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -144,10 +132,9 @@
     BOOST_CHECK_EQUAL(tcr.exception(), true);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
     BOOST_CHECK_EQUAL(tcr[0], err_msg);
-    const timespec& ts1 = tcr.stop_time();
-    BOOST_CHECK(ts1.tv_sec > 0);
-    BOOST_CHECK(ts1.tv_nsec > 0);
-    const timespec& ts2 = tcr.test_time();
+    const time_ns& ts1 = tcr.stop_time();
+    BOOST_CHECK(!ts1.is_zero());
+    const time_ns& ts2 = tcr.test_time();
     BOOST_CHECK(ts2.tv_sec == 1);
     BOOST_CHECK(ts2.tv_nsec > 100000000); // 0.1 sec in nanoseconds
     BOOST_CHECK(ts2.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -166,12 +153,10 @@
     BOOST_CHECK_EQUAL(tcr.exception(), true);
     BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
     BOOST_CHECK_EQUAL(tcr[0], e.what());
-    const timespec& ts1 = tcr.stop_time();
-    BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
-    const timespec& ts2 = tcr.test_time();
-    BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
+    const time_ns& ts1 = tcr.stop_time();
+    BOOST_CHECK(ts1.is_zero());
+    const time_ns& ts2 = tcr.test_time();
+    BOOST_CHECK(ts2.is_zero());
 }
 
 BOOST_AUTO_TEST_CASE(test_case_counters)

Modified: store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -46,15 +46,12 @@
     BOOST_CHECK_EQUAL(tcra.jid(), "Average");
     BOOST_CHECK_EQUAL(tcra.exception(), false);
     BOOST_CHECK_EQUAL(tcra.exception_count(), 0U);
-    const timespec& ts1 = tcra.start_time();
-    BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
-    const timespec& ts2 = tcra.stop_time();
-    BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
-    const timespec& ts3 = tcra.test_time();
-    BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+    const time_ns& ts1 = tcra.start_time();
+    BOOST_CHECK(ts1.is_zero());
+    const time_ns& ts2 = tcra.stop_time();
+    BOOST_CHECK(ts2.is_zero());
+    const time_ns& ts3 = tcra.test_time();
+    BOOST_CHECK(ts3.is_zero());
 }
 
 BOOST_AUTO_TEST_CASE(test_case_result_agregation_constructor_2)
@@ -65,15 +62,12 @@
     BOOST_CHECK_EQUAL(tcra.jid(), jid);
     BOOST_CHECK_EQUAL(tcra.exception(), false);
     BOOST_CHECK_EQUAL(tcra.exception_count(), 0U);
-    const timespec& ts1 = tcra.start_time();
-    BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
-    const timespec& ts2 = tcra.stop_time();
-    BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
-    const timespec& ts3 = tcra.test_time();
-    BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
-    BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+    const time_ns& ts1 = tcra.start_time();
+    BOOST_CHECK(ts1.is_zero());
+    const time_ns& ts2 = tcra.stop_time();
+    BOOST_CHECK(ts2.is_zero());
+    const time_ns& ts3 = tcra.test_time();
+    BOOST_CHECK(ts3.is_zero());
 }
 
 BOOST_AUTO_TEST_CASE(test_case_result_agregation_add_test_case)
@@ -158,7 +152,7 @@
     BOOST_CHECK_EQUAL(tcra.num_results(), num_results);
     BOOST_CHECK_EQUAL(tcra.exception_count(), num_exceptions);
     BOOST_CHECK_EQUAL(tcra.exception(), num_exceptions > 0);
-    const timespec& ts1 = tcra.test_time();
+    const time_ns& ts1 = tcra.test_time();
     BOOST_CHECK_EQUAL(ts1.tv_sec, secs);
     BOOST_CHECK_EQUAL(ts1.tv_nsec, nsec);
 }
@@ -173,9 +167,7 @@
         tcrp->incr_num_deq();
     for (unsigned i=0; i<num_reads; i++)
         tcrp->incr_num_read();
-    timespec ts;
-    ts.tv_sec = secs;
-    ts.tv_nsec = nsec;
+    time_ns ts(secs, nsec);
     tcrp->set_test_time(ts);
     return tcrp;
 }

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -28,10 +28,10 @@
 #include <jrnl/data_tok.hpp>
 #include <jrnl/jerrno.hpp>
 
-#include <iostream>
+#include <iostream> // debug
 
-#define SLEEP_US        1000 // 1 ms
-#define MAX_SLEEP_CNT   2000 // 2 sec max
+#define MAX_WR_WAIT     10 // in ms
+#define MAX_RD_WAIT     100 // in ms
 
 namespace rhm
 {
@@ -47,11 +47,17 @@
         _dtok_master_txn_list(),
         _dtok_rd_list(),
         _dtok_deq_list(),
+        _rd_aio_cv(_rd_aio_mutex),
+        _wr_full_cv(_wr_full_mutex),
+        _rd_list_cv(_rd_list_mutex),
+        _deq_list_cv(_deq_list_mutex),
         _tcp(),
         _tcrp()
 {
-    pthread_mutex_init(&_rd_mutex, 0);
-    pthread_mutex_init(&_deq_mutex, 0);
+    pthread_mutex_init(&_rd_aio_mutex, 0);
+    pthread_mutex_init(&_wr_full_mutex, 0);
+    pthread_mutex_init(&_rd_list_mutex, 0);
+    pthread_mutex_init(&_deq_list_mutex, 0);
 }
         
 jrnl_instance::jrnl_instance(const jrnl_init_params::shared_ptr& p):
@@ -63,17 +69,25 @@
         _dtok_master_txn_list(),
         _dtok_rd_list(),
         _dtok_deq_list(),
+        _rd_aio_cv(_rd_aio_mutex),
+        _wr_full_cv(_wr_full_mutex),
+        _rd_list_cv(_rd_list_mutex),
+        _deq_list_cv(_deq_list_mutex),
         _tcp(),
         _tcrp()
 {
-    pthread_mutex_init(&_rd_mutex, 0);
-    pthread_mutex_init(&_deq_mutex, 0);
+    pthread_mutex_init(&_rd_aio_mutex, 0);
+    pthread_mutex_init(&_wr_full_mutex, 0);
+    pthread_mutex_init(&_rd_list_mutex, 0);
+    pthread_mutex_init(&_deq_list_mutex, 0);
 }
 
 jrnl_instance::~jrnl_instance()
 {
-    pthread_mutex_destroy(&_rd_mutex);
-    pthread_mutex_destroy(&_deq_mutex);
+    pthread_mutex_destroy(&_rd_aio_mutex);
+    pthread_mutex_destroy(&_wr_full_mutex);
+    pthread_mutex_destroy(&_rd_list_mutex);
+    pthread_mutex_destroy(&_deq_list_mutex);
 }
 
 
@@ -97,19 +111,19 @@
             {
             std::vector<std::string> prep_txn_list;
             u_int64_t highest_rid;
-            recover(aio_wr_callback, prep_txn_list, highest_rid);
+            recover(aio_rd_callback, aio_wr_callback, prep_txn_list, highest_rid);
             recover_complete();
             }
             catch (const rhm::journal::jexception& e)
             {
                 if (e.err_code() == rhm::journal::jerrno::JERR_JDIR_STAT)
-                    initialize(aio_wr_callback);
+                    initialize(aio_rd_callback, aio_wr_callback);
                 else
                     throw;
             }
         }
         else
-            initialize(aio_wr_callback);
+            initialize(aio_rd_callback, aio_wr_callback);
     }
     catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
     catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -175,33 +189,29 @@
                 else
                     res = enqueue_data_record(msgp, msg_size, msg_size, p.get(), transient);
             }
-            if (res == rhm::journal::RHM_IORES_SUCCESS)
+            switch (res)
             {
+            case rhm::journal::RHM_IORES_SUCCESS:
                 sleep_cnt = 0U;
                 _tcrp->incr_num_enq();
-            }
-            else if (res == rhm::journal::RHM_IORES_ENQCAPTHRESH)
-            {
-                if (sleep_cnt++ > MAX_SLEEP_CNT)
+                if (p->has_xid() && !_tcp->auto_deq())
+                    commit(p.get());
+                break;
+            case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+                if (get_wr_events() == 0)
                 {
-                    std::ostringstream oss;
-                    oss << "ERROR: Timeout waiting for journal \"" << _jid;
-                    oss << "\" to empty. (RHM_IORES_ENQCAPTHRESH)";
-                    _tcrp->add_exception(oss.str());
+                    rhm::journal::slock sl(&_wr_full_mutex);
+                    _wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
                 }
-                else
-                    ::usleep(SLEEP_US); // 1ms
-            }
-            else
-            {
+                break;
+            default:
                 std::ostringstream oss;
                 oss << "ERROR: enqueue operation in journal \"" << _jid << "\" returned ";
                 oss << rhm::journal::iores_str(res) << ".";
                 _tcrp->add_exception(oss.str());
             }
-            
         }
-        flush();
+        flush(true);
     }
     catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
     catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -216,31 +226,21 @@
         read_arg::read_mode_t rd_mode = _args_ptr->read_mode.val();
         if (rd_mode != read_arg::NONE)
         {
-            unsigned sleep_cnt = 0U;
             while (_tcrp->num_rproc() < _tcp->num_msgs() && !_tcrp->exception())
             {
-                if (_dtok_rd_list.empty())
+                journal::data_tok* dtokp = 0;
                 {
-                    if (sleep_cnt++ > MAX_SLEEP_CNT)
+                    rhm::journal::slock sl(&_rd_list_mutex);
+                    if (_dtok_rd_list.empty())
+                        _rd_list_cv.wait();
+                    if (!_dtok_rd_list.empty())
                     {
-                        std::ostringstream oss;
-                        oss << "ERROR: Timeout waiting for enqueue AIO in journal \"" << _jid;
-                        oss << "\": num_enq=" << _tcp->num_msgs();
-                        oss << " num_deq=" << _tcrp->num_deq();
-                        _tcrp->add_exception(oss.str());
-                    }
-                    if (get_wr_events() == 0)
-                        ::usleep(SLEEP_US); // 1ms
-                }
-                else
-                {
-                    sleep_cnt = 0U;
-                    journal::data_tok* dtokp;
-                    {
-                        rhm::journal::slock sl(&_rd_mutex);
                         dtokp = _dtok_rd_list.front();
                         _dtok_rd_list.pop_front();
                     }
+                }
+                if (dtokp)
+                {
                     _tcrp->incr_num_rproc();
                     
                     bool do_read = true;
@@ -260,15 +260,16 @@
                         bool ext = false;
                         rhm::journal::iores res = read_data_record(&dptr, dsize, &xptr, xsize, tr,
                                 ext, dtokp);
-                        if (res == rhm::journal::RHM_IORES_SUCCESS)
+                        switch (res)
                         {
+                        case rhm::journal::RHM_IORES_SUCCESS:
                             {
-                                rhm::journal::slock sl(&_deq_mutex);
+                                rhm::journal::slock sl(&_deq_list_mutex);
                                 _dtok_deq_list.push_back(dtokp);
+                                _deq_list_cv.broadcast();
                             }
                             read_compl = true;
                             _tcrp->incr_num_read();
-                            sleep_cnt = 0;
                             
                             // clean up
                             if (xsize)
@@ -277,26 +278,23 @@
                                 ::free(dptr);
                             dptr = 0;
                             xptr = 0;
-                        }
-                        else if (res == rhm::journal::RHM_IORES_AIO_WAIT)
-                        {
-                            if (sleep_cnt++ > MAX_SLEEP_CNT)
+                            break;
+                        case rhm::journal::RHM_IORES_AIO_WAIT:
+                            if (get_rd_events() == 0)
                             {
-                                std::ostringstream oss;
-                                oss << "ERROR: Timeout waiting for read AIO in journal \"" << _jid;
-                                oss << "\": num_enq=" << _tcp->num_msgs();
-                                oss << " num_deq=" << _tcrp->num_deq();
-                                _tcrp->add_exception(oss.str());
+                                rhm::journal::slock sl(&_rd_aio_mutex);
+                                _rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); // MAX_RD_WAIT in ms
                             }
-                            if (get_rd_events() == 0)
-                                ::usleep(SLEEP_US); // 1ms
-                        }
-                        else
-                        {
+                            break;
+                        default:
                             std::ostringstream oss;
                             oss << "ERROR: read operation in journal \"" << _jid;
                             oss << "\" returned " << rhm::journal::iores_str(res) << ".";
                             _tcrp->add_exception(oss.str());
+                            {
+                                rhm::journal::slock sl(&_deq_list_mutex);
+                                _deq_list_cv.broadcast(); // wake up deq thread
+                            }
                         }
                     }
                 }
@@ -315,31 +313,21 @@
     {
         if (_tcp->auto_deq())
         {
-            unsigned sleep_cnt = 0U;
             while(_tcrp->num_deq() < _tcp->num_msgs() && !_tcrp->exception())
             {
-                if (_dtok_deq_list.empty())
+                journal::data_tok* dtokp = 0;
                 {
-                    if (sleep_cnt++ > MAX_SLEEP_CNT)
+                    rhm::journal::slock sl(&_deq_list_mutex);
+                    if (_dtok_deq_list.empty())
+                        _deq_list_cv.wait();
+                    if (!_dtok_deq_list.empty())
                     {
-                        std::ostringstream oss;
-                        oss << "ERROR: Timeout waiting for AIO in journal \"" << _jid;
-                        oss << "\": num_enq=" << _tcp->num_msgs();
-                        oss << " num_deq=" << _tcrp->num_deq();
-                        _tcrp->add_exception(oss.str());
-                    }
-                    if (get_wr_events() == 0)
-                        ::usleep(SLEEP_US); // 1ms
-                }
-                else
-                {
-                    journal::data_tok* dtokp;
-                    {
-                        rhm::journal::slock sl(&_deq_mutex);
                         dtokp = _dtok_deq_list.front();
                         _dtok_deq_list.pop_front();
                     }
-
+                }
+                if (dtokp)
+                {
                     rhm::journal::iores res;
                     if (dtokp->has_xid())
                         res = dequeue_txn_data_record(dtokp, dtokp->xid());
@@ -357,10 +345,9 @@
                         oss << "\" returned " << rhm::journal::iores_str(res) << ".";
                         _tcrp->add_exception(oss.str());
                     }
-                    sleep_cnt = 0U;
                 }
             }
-            flush();
+            flush(true);
         }
     }
     catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
@@ -409,11 +396,17 @@
 // static AIO callback fns
 
 void
+jrnl_instance::aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& /*pil*/)
+{
+    jrnl_instance::handle_rd_callback(journal);
+}
+
+void
 jrnl_instance::aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl)
 {
     for (std::vector<journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
-        if ((*i)->wstate() == journal::data_tok::ENQ)
-            jrnl_instance::push_dtokp(journal, *i);
+        if ((*i)->wstate() == journal::data_tok::ENQ || (*i)->wstate() == journal::data_tok::DEQ)
+            jrnl_instance::handle_wr_callback(journal, *i);
 }
 
 } // namespace jtt

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -29,14 +29,13 @@
 #include "test_case.hpp"
 
 #include <boost/shared_ptr.hpp>
-#include <deque>
+#include <jrnl/cvar.hpp>
 #include <jrnl/data_tok.hpp>
 #include <jrnl/jcntl.hpp>
 #include <jrnl/slock.hpp>
+#include <list>
 #include <vector>
 
-#include <iostream> // debug
-
 namespace rhm
 {
 namespace jtt
@@ -53,10 +52,16 @@
         const args* _args_ptr;
         std::vector<dtok_ptr> _dtok_master_enq_list;
         std::vector<dtok_ptr> _dtok_master_txn_list;
-        std::deque<journal::data_tok*> _dtok_rd_list;
-        std::deque<journal::data_tok*> _dtok_deq_list;
-        pthread_mutex_t _rd_mutex;  ///< Mutex for _dtok_rd_list
-        pthread_mutex_t _deq_mutex;  ///< Mutex for _dtok_deq_list
+        std::list<journal::data_tok*> _dtok_rd_list;
+        std::list<journal::data_tok*> _dtok_deq_list;
+        pthread_mutex_t _rd_aio_mutex;      ///< Mutex for read aio wait conditions
+        rhm::journal::cvar _rd_aio_cv;      ///< Condition var for read aio wait conditions
+        pthread_mutex_t _wr_full_mutex;     ///< Mutex for write full conditions
+        rhm::journal::cvar _wr_full_cv;     ///< Condition var for write full conditions
+        pthread_mutex_t _rd_list_mutex;     ///< Mutex for _dtok_rd_list
+        rhm::journal::cvar _rd_list_cv;     ///< Condition var for _dtok_rd_list
+        pthread_mutex_t _deq_list_mutex;    ///< Mutex for _dtok_deq_list
+        rhm::journal::cvar _deq_list_cv;    ///< Condition var for _dtok_deq_list
         pthread_t _enq_thread;
         pthread_t _deq_thread;
         pthread_t _read_thread;
@@ -79,16 +84,40 @@
         void tc_wait_compl() throw ();
 
     private:
-        inline void push_dtokp(rhm::journal::data_tok* dtokp)
+        inline void handle_rd_callback()
         {
-            if (_args_ptr->read_mode.val() == read_arg::NONE)
-                { rhm::journal::slock sl(&_deq_mutex); _dtok_deq_list.push_back(dtokp); }
-            else
-                { rhm::journal::slock sl(&_rd_mutex); _dtok_rd_list.push_back(dtokp); }
+            rhm::journal::slock sl(&_rd_aio_mutex);
+            _rd_aio_cv.broadcast();
         }
-        static inline void push_dtokp(jcntl* jp, rhm::journal::data_tok* dtokp)
-                { static_cast<jrnl_instance*>(jp)->push_dtokp(dtokp); }
+        static inline void handle_rd_callback(jcntl* jp)
+                { static_cast<jrnl_instance*>(jp)->handle_rd_callback(); }
 
+        inline void handle_wr_callback(rhm::journal::data_tok* dtokp)
+        {
+            if (dtokp->wstate() == journal::data_tok::ENQ)
+            {
+                if (_args_ptr->read_mode.val() == read_arg::NONE)
+                {
+                    rhm::journal::slock sl(&_deq_list_mutex);
+                    _dtok_deq_list.push_back(dtokp);
+                    _deq_list_cv.broadcast();
+                }
+                else
+                {
+                    rhm::journal::slock sl(&_rd_list_mutex);
+                    _dtok_rd_list.push_back(dtokp);
+                    _rd_list_cv.broadcast();
+                }
+            }
+            else // DEQ
+            {
+                rhm::journal::slock sl(&_wr_full_mutex);
+                _wr_full_cv.broadcast();
+            }
+        }
+        static inline void handle_wr_callback(jcntl* jp, rhm::journal::data_tok* dtokp)
+                { static_cast<jrnl_instance*>(jp)->handle_wr_callback(dtokp); }
+
         void run_enq() throw ();
         inline static void* run_enq(void* p)
                 { static_cast<jrnl_instance*>(p)->run_enq(); return 0; }
@@ -107,6 +136,7 @@
         rhm::journal::data_tok* prep_txn_dtok(const rhm::journal::data_tok* dtokp);
 
         // static callbacks
+        static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
         static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
     };
     

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -37,16 +37,12 @@
         _num_deq(0),
         _num_read(0),
         _num_rproc(0),
+        _start_time(),
+        _stop_time(),
         _stopped(false),
+        _test_time(),
         _exception_list()
-{
-    _start_time.tv_sec = 0;
-    _start_time.tv_nsec = 0;
-    _stop_time.tv_sec = 0;
-    _stop_time.tv_nsec = 0;
-    _test_time.tv_sec = 0;
-    _test_time.tv_nsec = 0;
-}
+{}
 
 test_case_result::~test_case_result()
 {}
@@ -54,12 +50,7 @@
 const std::string
 test_case_result::test_time_str() const
 {
-    if (_test_time.tv_nsec == 0)
-        return "0.0";
-    std::ostringstream oss;
-    oss << _test_time.tv_sec << ".";
-    oss << std::setfill('0') << std::setw(9) << _test_time.tv_nsec;
-    return oss.str();
+    return _test_time.str(9);
 }
 
 void
@@ -101,12 +92,9 @@
     _num_enq = 0;
     _num_deq = 0;
     _num_read = 0;
-    _start_time.tv_sec = 0;
-    _start_time.tv_nsec = 0;
-    _stop_time.tv_sec = 0;
-    _stop_time.tv_nsec = 0;
-    _test_time.tv_sec = 0;
-    _test_time.tv_nsec = 0;
+    _start_time.set_zero();
+    _stop_time.set_zero();
+    _test_time.set_zero();
     _exception_list.clear();
 }
 
@@ -207,16 +195,8 @@
 void
 test_case_result::calc_test_time()
 {
-    if (_start_time.tv_sec > 0 && _stop_time.tv_sec >= _start_time.tv_sec)
-    {
-        _test_time.tv_sec = _stop_time.tv_sec - _start_time.tv_sec;
-        _test_time.tv_nsec = _stop_time.tv_nsec - _start_time.tv_nsec;
-        if (_test_time.tv_nsec < 0)
-        {
-            _test_time.tv_nsec += 1000000000;
-            _test_time.tv_sec--;
-        }
-    }
+    if (!_start_time.is_zero() && _stop_time >= _start_time)
+        _test_time = _stop_time - _start_time;
 }
     
 } // namespace jtt

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -27,6 +27,7 @@
 #include <boost/shared_ptr.hpp>
 #include <deque>
 #include <jrnl/jexception.hpp>
+#include <jrnl/time_ns.hpp>
 #include <string>
 
 namespace rhm
@@ -48,10 +49,10 @@
         u_int32_t _num_deq;
         u_int32_t _num_read;  // Messages actually read
         u_int32_t _num_rproc; // Messages handled by read thread (not all are read)
-        timespec _start_time;
-        timespec _stop_time;
+        journal::time_ns _start_time;
+        journal::time_ns _stop_time;
         bool _stopped;
-        timespec _test_time;
+        journal::time_ns _test_time;
         elist _exception_list;
 
     public:
@@ -68,14 +69,13 @@
         inline const u_int32_t num_rproc() const { return _num_rproc; }
         inline const u_int32_t incr_num_rproc() { return ++_num_rproc; }
 
-        inline const timespec& start_time() const { return _start_time; }
+        inline const journal::time_ns& start_time() const { return _start_time; }
         inline void set_start_time() { ::clock_gettime(CLOCK_REALTIME, &_start_time); }
-        inline const timespec& stop_time() const { return _stop_time; }
+        inline const journal::time_ns& stop_time() const { return _stop_time; }
         inline void set_stop_time()
                 { ::clock_gettime(CLOCK_REALTIME, &_stop_time); calc_test_time(); }
-        inline void set_test_time(const timespec& ts)
-                { _test_time.tv_sec = ts.tv_sec; _test_time.tv_nsec = ts.tv_nsec; }
-        inline const timespec& test_time() const { return _test_time; }
+        inline void set_test_time(const journal::time_ns& ts) { _test_time = ts; }
+        inline const journal::time_ns& test_time() const { return _test_time; }
         const std::string test_time_str() const;
 
         void add_exception(const journal::jexception& e, const bool set_stop_time_flag = true);

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -158,16 +158,10 @@
     return oss.str();
 }
 
-const timespec&
-test_case_result_agregation::add_test_time(const timespec& t)
+const journal::time_ns&
+test_case_result_agregation::add_test_time(const journal::time_ns& t)
 {
-    _test_time.tv_sec += t.tv_sec;
-    _test_time.tv_nsec += t.tv_nsec;
-    if (_test_time.tv_nsec > 999999999)
-    {
-        _test_time.tv_nsec -= 1000000000;
-        _test_time.tv_sec++;
-    }
+    _test_time += t;
     return _test_time;
 }
     

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp	2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp	2008-02-06 15:39:52 UTC (rev 1645)
@@ -71,7 +71,7 @@
     private:
         const std::string str_full(const bool last_only) const;
         const std::string str_summary(const bool last_only) const;
-        const timespec& add_test_time(const timespec& t);
+        const journal::time_ns& add_test_time(const journal::time_ns& t);
     };
 
 } // namespace jtt




More information about the rhmessaging-commits mailing list