[rhmessaging-commits] rhmessaging commits: r1629 - in store/trunk/cpp: lib/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Jan 31 10:28:06 EST 2008


Author: kpvdr
Date: 2008-01-31 10:28:06 -0500 (Thu, 31 Jan 2008)
New Revision: 1629

Added:
   store/trunk/cpp/tests/jrnl/jtt/_ut_read_arg.cpp
   store/trunk/cpp/tests/jrnl/jtt/read_arg.cpp
   store/trunk/cpp/tests/jrnl/jtt/read_arg.hpp
Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/pmgr.cpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
   store/trunk/cpp/tests/jrnl/jtt/
   store/trunk/cpp/tests/jrnl/jtt/Makefile.am
   store/trunk/cpp/tests/jrnl/jtt/_ut_jrnl_instance.cpp
   store/trunk/cpp/tests/jrnl/jtt/args.cpp
   store/trunk/cpp/tests/jrnl/jtt/args.hpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
   store/trunk/cpp/tests/jrnl/jtt/test_case.cpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp
   store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
   store/trunk/cpp/tests/jrnl/jtt/test_mgr.hpp
Log:
Removed read callback - serves no purpose. Added read handling into journal test framework.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -80,7 +80,7 @@
 }
 
 void
-JournalImpl::recover(const aio_cb rd_cb, const aio_cb wr_cb,
+JournalImpl::recover(const aio_cb wr_cb,
         boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t& highest_rid,
         u_int64_t queue_id)
 {
@@ -91,7 +91,7 @@
         prep_xid_list.push_back(i->xid);
     }
 
-    jcntl::recover(rd_cb, wr_cb, prep_xid_list, highest_rid);
+    jcntl::recover(wr_cb, prep_xid_list, highest_rid);
         
     // Populate PreparedTransaction lists from _tmap
     for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
@@ -341,20 +341,3 @@
 	    dtokp->release();
     }
 }
-
-void
-JournalImpl::aio_rd_callback(jcntl* journal, std::vector<data_tok*>& dtokl)
-{
-        for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
-        {
-            DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
-		    if (!journal->is_stopped() && dtokp->getSourceMessage())
-		    {
-                if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
-                {
-                    // cct call the recovery manager. / lazyload.. 
-                }
-            }
-            dtokp->release();
-        }
-}

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-01-31 15:28:06 UTC (rev 1629)
@@ -92,15 +92,15 @@
                         const qpid::sys::Duration flushTimeout);
             virtual ~JournalImpl();
 
-            inline void initialize() { jcntl::initialize(&aio_rd_callback, &aio_wr_callback); } 
+            inline void initialize() { jcntl::initialize(&aio_wr_callback); } 
 
-            void recover(const journal::aio_cb rd_cb, const journal::aio_cb wr_cb,
+            void recover(const journal::aio_cb wr_cb,
                     boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
                     u_int64_t& highest_rid, u_int64_t queue_id);
 
             inline void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
                     u_int64_t& highest_rid, u_int64_t queue_id) {
-                recover(&aio_rd_callback,  &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
+                recover(&aio_wr_callback, prep_tx_list, highest_rid, queue_id);
             }
             
             // Temporary fn to read and save last msg read from journal so it can be assigned
@@ -144,7 +144,6 @@
         private:
             void handleIoResult(const journal::iores r);
             static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
-            static void aio_rd_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
         }; // class JournalImpl
 
     } // namespace bdbstore

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -91,7 +91,7 @@
 }
 
 void
-jcntl::initialize(const aio_cb rd_cb, const aio_cb wr_cb)
+jcntl::initialize(const aio_cb wr_cb)
 {
     _init_flag = false;
     _stop_flag = false;
@@ -125,7 +125,7 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
-    _rmgr.initialize(rd_cb, 0);
+    _rmgr.initialize(0);
     _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
 
     // Write info file (<basename>.jinf) to disk
@@ -135,8 +135,8 @@
 }
 
 void
-jcntl::recover(const aio_cb rd_cb, const aio_cb wr_cb,
-        const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+jcntl::recover(const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list,
+        u_int64_t& highest_rid)
 {
     _init_flag = false;
     _stop_flag = false;
@@ -180,7 +180,7 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
-    _rmgr.initialize(rd_cb, _rcvdat._fro);
+    _rmgr.initialize(_rcvdat._fro);
     _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
     
     _readonly_flag = true;

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -178,12 +178,11 @@
         * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
         * used.</b>
         *
-        * \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
         * \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
         *
         * \exception TODO
         */
-        void initialize(const aio_cb rd_cb, const aio_cb wr_cb);
+        void initialize(const aio_cb wr_cb);
 
         /**
         * /brief Initialize journal by recovering state from previously written journal.
@@ -200,15 +199,14 @@
         * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
         * used.</b>
         *
-        * \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
         * \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
         * \param prep_txn_list
         * \param highest_rid Returns the highest rid found in the journal during recover
         *
         * \exception TODO
         */
-        void recover(const aio_cb rd_cb, const aio_cb wr_cb,
-                const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
+        void recover(const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list,
+                u_int64_t& highest_rid);
 
         /**
         * \brief Notification to the journal that recovery is complete and that normal operation

Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -90,8 +90,7 @@
         _aio_evt_rem(0),
         _enq_rec(),
         _deq_rec(),
-        _txn_rec(),
-        _cb(0)
+        _txn_rec()
 {}
 
 pmgr::~pmgr()

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -116,8 +116,6 @@
         deq_rec _deq_rec;               ///< Dequeue record used for encoding/decoding
         txn_rec _txn_rec;               ///< Transaction record used for encoding/decoding
 
-        aio_cb _cb;                     ///< Callback function pointer for AIO events
-
     public:
         pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
                 const u_int16_t pages);

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -51,9 +51,8 @@
 {}
 
 void
-rmgr::initialize(const aio_cb rd_cb, size_t fro)
+rmgr::initialize(size_t fro)
 {
-    _cb = rd_cb;
     initialize();
     if (fro)
     {
@@ -391,7 +390,6 @@
         throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
     }
 
-    u_int32_t tot_data_toks = 0;
     for (int i=0; i<ret; i++) // Index of returned AIOs
     {
         if (_aio_evt_rem == 0)
@@ -415,29 +413,15 @@
             throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
         }
 
-        // Transfer all data tokens
-        u_int32_t s = pcbp->_pdtokl->size();
-        std::vector<data_tok*> dtokl(s, 0);
-        for (u_int32_t k=0; k<s; k++)
-            dtokl[k] = pcbp->_pdtokl->at(k);
-        tot_data_toks += s;
-
         // Increment the completed read offset
         // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
         // Use stored pointer to nlfh in the pcb instead.
         pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
         pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
-
-        // Clean up this pcb's data_tok list
-        pcbp->_pdtokl->clear();
         pcbp->_state = state;
-
-        // Perform AIO return callback
-        if (_cb)
-            (_cb)(_jc, dtokl);
     }
 
-    return tot_data_toks;
+    return ret;
 }
 
 void

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -65,7 +65,7 @@
         rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
         virtual ~rmgr();
 
-        void initialize(const aio_cb rd_cb, size_t fro);
+        void initialize(size_t fro);
         const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
                 const void** const data, bool auto_discard);
         const iores discard(data_tok* dtok);

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -93,7 +93,7 @@
         deq_rec _deq_rec;               ///< Dequeue record used for encoding/decoding
         txn_rec _txn_rec;               ///< Transaction record used for encoding/decoding
         std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
-        aio_cb _cb;                     ///< Callback function pointer for AIO events
+        aio_cb _cb;                  ///< Callback function pointer for AIO events
 
     public:
         wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc);

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -647,14 +647,14 @@
 void
 JournalSystemTests::jrnl_init(rhm::journal::jcntl* jc)
 {
-    jc->initialize(0, 0);
+    jc->initialize(0);
 }
 
 void
 JournalSystemTests::jrnl_recover(rhm::journal::jcntl* jc, vector<string> txn_list,
         u_int64_t& highest_rid)
 {
-    jc->recover(0, 0, txn_list, highest_rid);
+    jc->recover(0, txn_list, highest_rid);
 }
 
 void


Property changes on: store/trunk/cpp/tests/jrnl/jtt
___________________________________________________________________
Name: svn:ignore
   - aclocal.m4
autom4te.cache
config.hpp
config.hpp.in
config.log
config.status
configure
depcomp
install-sh
jtt
Makefile
Makefile.in
missing
stamp-h1
_ut_data_src
_ut_jrnl_init_params
_ut_jrnl_instance
_ut_test_case
_ut_test_case_set
_ut_test_case_result
_ut_test_case_result_agregation
.deps

   + aclocal.m4
autom4te.cache
config.hpp
config.hpp.in
config.log
config.status
configure
depcomp
install-sh
jtt
Makefile
Makefile.in
missing
stamp-h1
_ut_data_src
_ut_jrnl_init_params
_ut_jrnl_instance
_ut_read_arg
_ut_test_case
_ut_test_case_set
_ut_test_case_result
_ut_test_case_result_agregation
.deps


Modified: store/trunk/cpp/tests/jrnl/jtt/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/Makefile.am	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/Makefile.am	2008-01-31 15:28:06 UTC (rev 1629)
@@ -29,6 +29,7 @@
 TESTS = \
     _ut_data_src \
     _ut_jrnl_init_params \
+    _ut_read_arg \
     _ut_test_case \
     _ut_test_case_result \
     _ut_test_case_result_agregation \
@@ -40,6 +41,7 @@
 check_PROGRAMS = \
     _ut_data_src \
     _ut_jrnl_init_params \
+    _ut_read_arg \
     _ut_test_case \
     _ut_test_case_result \
     _ut_test_case_result_agregation \
@@ -52,6 +54,7 @@
     jrnl_init_params.cpp \
     jrnl_instance.cpp \
     main.cpp \
+    read_arg.cpp \
     test_case.cpp \
     test_case_result.cpp \
     test_case_result_agregation.cpp \
@@ -89,12 +92,18 @@
 _ut_jrnl_init_params_LDFLAGS =  -lboost_unit_test_framework -lrt
 _ut_jrnl_init_params_LDADD = jrnl_init_params.o
 
+_ut_read_arg_SOURCES = _ut_read_arg.cpp ../../unit_test.cpp
+_ut_read_arg_LDFLAGS =  -lboost_unit_test_framework -lrt -lboost_program_options
+_ut_read_arg_LDADD = read_arg.o
+
 _ut_jrnl_instance_SOURCES = _ut_jrnl_instance.cpp ../../unit_test.cpp
-_ut_jrnl_instance_LDFLAGS =  -lboost_unit_test_framework -laio -lrt
+_ut_jrnl_instance_LDFLAGS =  -lboost_unit_test_framework -laio -lrt -lboost_program_options
 _ut_jrnl_instance_LDADD = \
+    args.o \
     data_src.o \
     jrnl_init_params.o \
     jrnl_instance.o \
+    read_arg.o \
     test_case.o \
     test_case_result.o \
     test_case_result_agregation.o \

Modified: store/trunk/cpp/tests/jrnl/jtt/_ut_jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_jrnl_instance.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_jrnl_instance.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -41,11 +41,12 @@
     const u_int16_t num_jfiles = 20;
     const u_int32_t jfsize_sblks = 128;
 
+    args a("a1");
     using rhm::jtt::test_case;
     test_case::shared_ptr p(new test_case(1, 0, 0, 0, false, 0, 0, test_case::JTT_PERSISTNET,
             test_case::JDL_INTERNAL, "t1"));
     jrnl_instance ji(jid, jdir, bfn, num_jfiles, jfsize_sblks);
-    ji.init_tc(p, false);
+    ji.init_tc(p, &a);
     ji.run_tc();
     ji.tc_wait_compl();
     try { jdir::verify_dir(jdir, bfn); }
@@ -61,13 +62,14 @@
     const u_int16_t num_jfiles = 20;
     const u_int32_t jfsize_sblks = 128;
 
+    args a("a2");
     using rhm::jtt::test_case;
     test_case::shared_ptr p(new test_case(2, 0, 0, 0, false, 0, 0, test_case::JTT_PERSISTNET,
             test_case::JDL_INTERNAL, "t2"));
     jrnl_init_params::shared_ptr jpp(new jrnl_init_params(jid, jdir, bfn, num_jfiles,
             jfsize_sblks));
     jrnl_instance ji(jpp);
-    ji.init_tc(p, false);
+    ji.init_tc(p, &a);
     ji.run_tc();
     ji.tc_wait_compl();
     try { jdir::verify_dir(jdir, bfn); }
@@ -83,13 +85,14 @@
     const u_int16_t num_jfiles = 20;
     const u_int32_t jfsize_sblks = 128;
 
+    args a("a3");
     using rhm::jtt::test_case;
     test_case::shared_ptr p(new test_case(3, 0, 0, 0, false, 0, 0, test_case::JTT_PERSISTNET,
             test_case::JDL_INTERNAL, "t3"));
     jrnl_init_params::shared_ptr jpp(new jrnl_init_params(jid, jdir, bfn, num_jfiles,
             jfsize_sblks));
     jrnl_instance ji(jpp);
-    ji.init_tc(p, false);
+    ji.init_tc(p, &a);
     ji.run_tc();
     ji.tc_wait_compl();
     try { jdir::verify_dir(jdir, bfn); }
@@ -105,18 +108,20 @@
     const u_int16_t num_jfiles = 20;
     const u_int32_t jfsize_sblks = 128;
 
+    args a("a4");
     using rhm::jtt::test_case;
     test_case::shared_ptr p(new test_case(5, 0, 0, 0, false, 0, 0, test_case::JTT_PERSISTNET,
             test_case::JDL_INTERNAL, "t5"));
     jrnl_init_params::shared_ptr jpp(new jrnl_init_params(jid, jdir, bfn, num_jfiles,
             jfsize_sblks));
     jrnl_instance ji(jpp);
-    ji.init_tc(p, false);
+    ji.init_tc(p, &a);
     ji.run_tc();
     ji.tc_wait_compl();
     try { jdir::verify_dir(jdir, bfn); }
     catch (const jexception& e) { BOOST_ERROR(e.what()); }
-    ji.init_tc(p, true);
+    a.recover_mode = true;
+    ji.init_tc(p, &a);
     ji.run_tc();
     ji.tc_wait_compl();
     try { jdir::verify_dir(jdir, bfn); }
@@ -132,13 +137,15 @@
     const u_int16_t num_jfiles = 20;
     const u_int32_t jfsize_sblks = 128;
 
+    args a("a5");
+    a.recover_mode = true;
     using rhm::jtt::test_case;
     test_case::shared_ptr p(new test_case(6, 0, 0, 0, false, 0, 0, test_case::JTT_PERSISTNET,
             test_case::JDL_INTERNAL, "t6"));
     jrnl_init_params::shared_ptr jpp(new jrnl_init_params(jid, jdir, bfn, num_jfiles,
             jfsize_sblks));
     jrnl_instance ji(jpp);
-    ji.init_tc(p, true);
+    ji.init_tc(p, &a);
     ji.run_tc();
     ji.tc_wait_compl();
     try { jdir::verify_dir(jdir, bfn); }

Added: store/trunk/cpp/tests/jrnl/jtt/_ut_read_arg.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_read_arg.cpp	                        (rev 0)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_read_arg.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -0,0 +1,133 @@
+/**
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "../../unit_test.h"
+#include <boost/test/unit_test_log.hpp>
+#include "read_arg.hpp"
+
+#include <boost/program_options.hpp>
+namespace po = boost::program_options;
+using namespace rhm::jtt;
+
+using namespace boost::unit_test;
+
+QPID_AUTO_TEST_SUITE(jtt_read_arg)
+
+BOOST_AUTO_TEST_CASE(read_arg_constructor)
+{
+    read_arg ra1;
+    BOOST_CHECK_EQUAL(ra1.val(), read_arg::NONE);
+    BOOST_CHECK_EQUAL(ra1.str(), "NONE");
+    read_arg ra2(read_arg::NONE);
+    BOOST_CHECK_EQUAL(ra2.val(), read_arg::NONE);
+    BOOST_CHECK_EQUAL(ra2.str(), "NONE");
+    read_arg ra3(read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra3.val(), read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra3.str(), "ALL");
+    read_arg ra4(read_arg::RANDOM);
+    BOOST_CHECK_EQUAL(ra4.val(), read_arg::RANDOM);
+    BOOST_CHECK_EQUAL(ra4.str(), "RANDOM");
+    read_arg ra5(read_arg::LAZYLOAD);
+    BOOST_CHECK_EQUAL(ra5.val(), read_arg::LAZYLOAD);
+    BOOST_CHECK_EQUAL(ra5.str(), "LAZYLOAD");
+}
+
+BOOST_AUTO_TEST_CASE(read_arg_set_val)
+{
+    read_arg ra;
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::NONE);
+    BOOST_CHECK_EQUAL(ra.str(), "NONE");
+    ra.set_val(read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra.str(), "ALL");
+    ra.set_val(read_arg::RANDOM);
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::RANDOM);
+    BOOST_CHECK_EQUAL(ra.str(), "RANDOM");
+    ra.set_val(read_arg::LAZYLOAD);
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::LAZYLOAD);
+    BOOST_CHECK_EQUAL(ra.str(), "LAZYLOAD");
+}
+
+BOOST_AUTO_TEST_CASE(read_arg_parse)
+{
+    read_arg ra;
+    ra.parse("LAZYLOAD");
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::LAZYLOAD);
+    BOOST_CHECK_EQUAL(ra.str(), "LAZYLOAD");
+    ra.parse("ALL");
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra.str(), "ALL");
+    BOOST_CHECK_THROW(ra.parse(""), po::invalid_option_value)
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra.str(), "ALL");
+    BOOST_CHECK_THROW(ra.parse("abc123"), po::invalid_option_value)
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra.str(), "ALL");
+    ra.parse("NONE");
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::NONE);
+    BOOST_CHECK_EQUAL(ra.str(), "NONE");
+    ra.parse("RANDOM");
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::RANDOM);
+    BOOST_CHECK_EQUAL(ra.str(), "RANDOM");
+}
+
+BOOST_AUTO_TEST_CASE(read_arg_istream)
+{
+    read_arg ra;
+    std::istringstream ss1("LAZYLOAD", std::ios::in);
+    ss1 >> ra;
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::LAZYLOAD);
+    BOOST_CHECK_EQUAL(ra.str(), "LAZYLOAD");
+    std::istringstream ss2("ALL", std::ios::in);
+    ss2 >> ra;
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::ALL);
+    BOOST_CHECK_EQUAL(ra.str(), "ALL");
+    std::istringstream ss3("NONE", std::ios::in);
+    ss3 >> ra;
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::NONE);
+    BOOST_CHECK_EQUAL(ra.str(), "NONE");
+    std::istringstream ss4("RANDOM", std::ios::in);
+    ss4 >> ra;
+    BOOST_CHECK_EQUAL(ra.val(), read_arg::RANDOM);
+    BOOST_CHECK_EQUAL(ra.str(), "RANDOM");
+}
+
+BOOST_AUTO_TEST_CASE(read_arg_ostream)
+{
+    std::ostringstream s1;
+    read_arg ra(read_arg::LAZYLOAD);
+    s1 << ra;
+    BOOST_CHECK_EQUAL(s1.str(), "LAZYLOAD");
+    ra.set_val(read_arg::ALL);
+    std::ostringstream s2;
+    s2 << ra;
+    BOOST_CHECK_EQUAL(s2.str(), "ALL");
+    ra.set_val(read_arg::NONE);
+    std::ostringstream s3;
+    s3 << ra;
+    BOOST_CHECK_EQUAL(s3.str(), "NONE");
+    ra.set_val(read_arg::RANDOM);
+    std::ostringstream s4;
+    s4 << ra;
+    BOOST_CHECK_EQUAL(s4.str(), "RANDOM");
+}

Modified: store/trunk/cpp/tests/jrnl/jtt/args.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/args.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/args.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -33,12 +33,16 @@
 {
 
 args::args(std::string opt_title):
-    _options_descr(opt_title),
-    jrnl_format_chk(false),
+    _options_descr(opt_title, 100),
+    format_chk(false),
     keep_jrnls(false),
+    lld_rd_num(10),
+    lld_skip_num(100),
     num_jrnls(1),
     pause_secs(0),
     randomize(false),
+    read_mode(),
+    read_prob(50),
     recover_mode(false),
     repeat_flag(false),
     reuse_instance(false),
@@ -47,26 +51,36 @@
     _options_descr.add_options()
             ("csv-file,c",
                     po::value<std::string>(&test_case_csv_file_name)->default_value("jtt.csv"),
-                    "CSV file containing test cases")
-            ("help,h", "This help message")
-            ("jrnl-format-chk", po::value<bool>(&jrnl_format_chk)->zero_tokens(),
-                    "If true, will check the format of each journal file")
+                    "CSV file containing test cases.")
+            ("format-chk", po::value<bool>(&format_chk)->zero_tokens(),
+                    "Check the format of each journal file.")
+            ("help,h", "This help message.")
             ("keep-jrnls", po::value<bool>(&keep_jrnls)->zero_tokens(),
-                    "If true, will keep all test journals in backup dirs")
+                    "Keep all test journals.")
+            ("lld-rd-num", po::value<unsigned>(&lld_rd_num)->default_value(10),
+                    "Number of consecutive messages to read after only dequeueing <lld-skip-num> "
+                    "messages during lazy-loading. Ignored if read-mode is not set to LAZYLOAD.")
+            ("lld-skip-num", po::value<unsigned>(&lld_skip_num)->default_value(100),
+                    "Number of consecutive messages to dequeue only (without reading) prior to "
+                    "reading <lld-rd-num> messages. Ignored if read-mode is not set to LAZYLOAD.")
             ("num-jrnls", po::value<unsigned>(&num_jrnls)->default_value(1),
-                    "Number of simultaneous journal instances to test")
+                    "Number of simultaneous journal instances to test.")
             ("pause", po::value<unsigned>(&pause_secs)->default_value(0),
-                    "Pause in seconds between test cases (allows disk to catch up)")
+                    "Pause in seconds between test cases (allows disk to catch up).")
             ("randomize", po::value<bool>(&randomize)->zero_tokens(),
-                    "Randomize the order of the test case execution")
+                    "Randomize the order of the test case execution.")
+            ("read-mode", po::value<read_arg>(&read_mode)->default_value(read_arg::NONE),
+                    read_arg::descr().c_str())
+            ("read-prob", po::value<unsigned>(&read_prob)->default_value(50),
+                    "Read probability (percent) for each message when read-mode is set to RANDOM.")
             ("recover-mode", po::value<bool>(&recover_mode)->zero_tokens(),
-                    "If true, will cause the journal from the previous test to be recovered")
+                    "Recover journal from the previous test for each test case.")
             ("repeat", po::value<bool>(&repeat_flag)->zero_tokens(),
-                    "If true, will repeat tests in csv file indefinitely")
+                    "Repeat all test cases in CSV file indefinitely.")
             ("reuse-instance", po::value<bool>(&reuse_instance)->zero_tokens(),
-                    "If true, will cause first journal instance to be reused for all test cases")
+                    "Reuse journal instance for all test cases.")
             ("seed", po::value<unsigned>(&seed)->default_value(0),
-                    "Seed for use in random number generator");
+                    "Seed for use in random number generator.");
 }
 
 const bool
@@ -86,9 +100,14 @@
         return usage();
     if (num_jrnls == 0)
     {
-        std::cout << "ERROR: num_jrnls must be 1 or more." << std::endl;
+        std::cout << "ERROR: num-jrnls must be 1 or more." << std::endl;
         return usage();
     }
+    if (read_prob > 100) // read_prob is unsigned, so no need to check < 0
+    {
+        std::cout << "ERROR: read-prob must be between 0 and 100 inclusive." << std::endl;
+        return usage();
+    }
     if (repeat_flag && keep_jrnls)
     {
         std::string resp;
@@ -123,16 +142,35 @@
     return true;
 }
 
-const void
+void
+args::print_args() const
+{
+    std::cout << "Number of journals: " << num_jrnls << std::endl;
+    std::cout << "Read mode: " << read_mode << std::endl;
+    if (read_mode.val() == read_arg::RANDOM)
+        std::cout << "Read probability: " << read_prob << " %" << std::endl;
+    if (read_mode.val() == read_arg::LAZYLOAD)
+    {
+        std::cout << "Lazy-load skips: " << lld_skip_num << std::endl;
+        std::cout << "Lazy-load reads: " << lld_rd_num << std::endl;
+    }
+    if (pause_secs)
+        std::cout << "Pause between test cases: " << pause_secs << " sec." << std::endl;
+    if (seed)
+        std::cout << "Randomize seed: " << seed << std::endl;
+    print_flags();
+}
+
+void
 args::print_flags() const
 {
-    if (jrnl_format_chk || keep_jrnls || randomize || recover_mode || repeat_flag ||
+    if (format_chk || keep_jrnls || randomize || recover_mode || repeat_flag ||
             reuse_instance)
     {
-        std::cout << "Options:";
+        std::cout << "Flag options:";
         // TODO: Get flag args and their strings directly from _options_descr.
-        if (jrnl_format_chk)
-            std::cout << " jrnl-format-chk";
+        if (format_chk)
+            std::cout << " format-chk";
         if (keep_jrnls)
             std::cout << " keep-jrnls";
         if (randomize)

Modified: store/trunk/cpp/tests/jrnl/jtt/args.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/args.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/args.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -25,6 +25,7 @@
 #define rhm_jtt_args_hpp
 
 #include <boost/program_options.hpp>
+#include "read_arg.hpp"
 
 namespace rhm
 {
@@ -38,11 +39,15 @@
 
         // Add args here
         std::string test_case_csv_file_name;
-        bool jrnl_format_chk;
+        bool format_chk;
         bool keep_jrnls;
+        unsigned lld_rd_num;
+        unsigned lld_skip_num;
         unsigned num_jrnls;
         unsigned pause_secs;
         bool randomize;
+        read_arg read_mode;
+        unsigned read_prob;
     	bool recover_mode;
         bool repeat_flag;
         bool reuse_instance;
@@ -51,7 +56,8 @@
         args(std::string opt_title);
         const bool parse(int argc, char** argv); // return true if error, false if ok
         const bool usage() const; // return true
-        const void print_flags() const;
+        void print_args() const;
+        void print_flags() const;
     };
 
 } // namespace jtt

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -42,66 +42,74 @@
         const std::string& base_filename, const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
         rhm::journal::jcntl(jid, jdir, base_filename, num_jfiles, jfsize_sblks),
         _jpp(new jrnl_init_params(jid, jdir, base_filename, num_jfiles, jfsize_sblks)),
+        _args_ptr(0),
         _dtok_master_enq_list(),
         _dtok_master_txn_list(),
+        _dtok_rd_list(),
         _dtok_deq_list(),
         _tcp(),
         _tcrp()
 {
-    pthread_mutex_init(&_ddl_mutex, 0);
+    pthread_mutex_init(&_rd_mutex, 0);
+    pthread_mutex_init(&_deq_mutex, 0);
 }
         
 jrnl_instance::jrnl_instance(const jrnl_init_params::shared_ptr& p):
         rhm::journal::jcntl(p->jid(), p->jdir(), p->base_filename(), p->num_jfiles(),
             p->jfsize_sblks()),
         _jpp(p),
+        _args_ptr(0),
         _dtok_master_enq_list(),
         _dtok_master_txn_list(),
+        _dtok_rd_list(),
         _dtok_deq_list(),
         _tcp(),
         _tcrp()
 {
-    pthread_mutex_init(&_ddl_mutex, 0);
+    pthread_mutex_init(&_rd_mutex, 0);
+    pthread_mutex_init(&_deq_mutex, 0);
 }
 
 jrnl_instance::~jrnl_instance()
 {
-    pthread_mutex_destroy(&_ddl_mutex);
+    pthread_mutex_destroy(&_rd_mutex);
+    pthread_mutex_destroy(&_deq_mutex);
 }
 
 
 void
-jrnl_instance::init_tc(test_case::shared_ptr& tcp, const bool recover_mode)
-        throw ()
+jrnl_instance::init_tc(test_case::shared_ptr& tcp, const args* const args_ptr) throw ()
 {
     test_case_result::shared_ptr p(new test_case_result(_jpp->jid()));
     _tcrp = p;
+    _args_ptr = args_ptr;
     try
     {
         _tcp = tcp;
         _dtok_master_enq_list.clear();
         _dtok_master_txn_list.clear();
+        _dtok_rd_list.clear();
         _dtok_deq_list.clear();
 
-        if (recover_mode)
+        if (_args_ptr->recover_mode)
         {
             try
             {
             std::vector<std::string> prep_txn_list;
             u_int64_t highest_rid;
-            recover(aio_rd_callback, aio_wr_callback, prep_txn_list, highest_rid);
+            recover(aio_wr_callback, prep_txn_list, highest_rid);
             recover_complete();
             }
             catch (const rhm::journal::jexception& e)
             {
                 if (e.err_code() == rhm::journal::jerrno::JERR_JDIR_STAT)
-                    initialize(aio_rd_callback, aio_wr_callback);
+                    initialize(aio_wr_callback);
                 else
                     throw;
             }
         }
         else
-            initialize(aio_rd_callback, aio_wr_callback);
+            initialize(aio_wr_callback);
     }
     catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
     catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -113,8 +121,8 @@
 {
     _tcrp->set_start_time();
     ::pthread_create(&_enq_thread, 0, run_enq, this);
+    ::pthread_create(&_read_thread, 0, run_read, this);
     ::pthread_create(&_deq_thread, 0, run_deq, this);
-    ::pthread_create(&_read_thread, 0, run_read, this);
 }
 
 void
@@ -122,8 +130,8 @@
 {
     try
     {
+        ::pthread_join(_deq_thread, 0);
         ::pthread_join(_read_thread, 0);
-        ::pthread_join(_deq_thread, 0);
         ::pthread_join(_enq_thread, 0);
         stop(true);
         _tcrp->set_stop_time();
@@ -201,6 +209,106 @@
 }
 
 void
+jrnl_instance::run_read() throw ()
+{
+    try
+    {
+        read_arg::read_mode_t rd_mode = _args_ptr->read_mode.val();
+        if (rd_mode != read_arg::NONE)
+        {
+            unsigned sleep_cnt = 0U;
+            while (_tcrp->num_rproc() < _tcp->num_msgs() && !_tcrp->exception())
+            {
+                if (_dtok_rd_list.empty())
+                {
+                    if (sleep_cnt++ > MAX_SLEEP_CNT)
+                    {
+                        std::ostringstream oss;
+                        oss << "ERROR: Timeout waiting for enqueue AIO in journal \"" << _jid;
+                        oss << "\": num_enq=" << _tcp->num_msgs();
+                        oss << " num_deq=" << _tcrp->num_deq();
+                        _tcrp->add_exception(oss.str());
+                    }
+                    if (get_wr_events() == 0)
+                        ::usleep(SLEEP_US); // 1ms
+                }
+                else
+                {
+                    sleep_cnt = 0U;
+                    journal::data_tok* dtokp;
+                    {
+                        rhm::journal::slock sl(&_rd_mutex);
+                        dtokp = _dtok_rd_list.front();
+                        _dtok_rd_list.pop_front();
+                    }
+                    _tcrp->incr_num_rproc();
+                    
+                    bool do_read = true;
+                    if (rd_mode == read_arg::RANDOM)
+                        do_read = 1.0 * ::rand() / RAND_MAX <  _args_ptr->read_prob / 100.0;
+                    else if (rd_mode == read_arg::LAZYLOAD)
+                        do_read = _tcrp->num_rproc() >= _args_ptr->lld_skip_num &&
+                                        _tcrp->num_read() < _args_ptr->lld_rd_num;
+                    bool read_compl = false;
+                    while (do_read && !read_compl && !_tcrp->exception())
+                    {
+                        void* dptr = 0;
+                        size_t dsize = 0;
+                        void* xptr = 0;
+                        size_t xsize = 0;
+                        bool tr = false;
+                        bool ext = false;
+                        rhm::journal::iores res = read_data_record(&dptr, dsize, &xptr, xsize, tr,
+                                ext, dtokp);
+                        if (res == rhm::journal::RHM_IORES_SUCCESS)
+                        {
+                            {
+                                rhm::journal::slock sl(&_deq_mutex);
+                                _dtok_deq_list.push_back(dtokp);
+                            }
+                            read_compl = true;
+                            _tcrp->incr_num_read();
+                            sleep_cnt = 0;
+                            
+                            // clean up
+                            if (xsize)
+                                ::free(xptr);
+                            else if (dsize)
+                                ::free(dptr);
+                            dptr = 0;
+                            xptr = 0;
+                        }
+                        else if (res == rhm::journal::RHM_IORES_AIO_WAIT)
+                        {
+                            if (sleep_cnt++ > MAX_SLEEP_CNT)
+                            {
+                                std::ostringstream oss;
+                                oss << "ERROR: Timeout waiting for read AIO in journal \"" << _jid;
+                                oss << "\": num_enq=" << _tcp->num_msgs();
+                                oss << " num_deq=" << _tcrp->num_deq();
+                                _tcrp->add_exception(oss.str());
+                            }
+                            if (get_rd_events() == 0)
+                                ::usleep(SLEEP_US); // 1ms
+                        }
+                        else
+                        {
+                            std::ostringstream oss;
+                            oss << "ERROR: read operation in journal \"" << _jid;
+                            oss << "\" returned " << rhm::journal::iores_str(res) << ".";
+                            _tcrp->add_exception(oss.str());
+                        }
+                    }
+                }
+            }
+        }
+    }
+    catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
+    catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
+    catch (...) { _tcrp->add_exception("Unknown exception"); }
+}
+
+void
 jrnl_instance::run_deq() throw ()
 {
     try
@@ -220,14 +328,14 @@
                         oss << " num_deq=" << _tcrp->num_deq();
                         _tcrp->add_exception(oss.str());
                     }
-                    get_wr_events();
-                    ::usleep(SLEEP_US); // 1ms
+                    if (get_wr_events() == 0)
+                        ::usleep(SLEEP_US); // 1ms
                 }
                 else
                 {
                     journal::data_tok* dtokp;
                     {
-                        rhm::journal::slock sl(&_ddl_mutex);
+                        rhm::journal::slock sl(&_deq_mutex);
                         dtokp = _dtok_deq_list.front();
                         _dtok_deq_list.pop_front();
                     }
@@ -239,7 +347,6 @@
                         res = dequeue_data_record(dtokp);
                     if (res == rhm::journal::RHM_IORES_SUCCESS)
                     {
-                        sleep_cnt = 0U;
                         _tcrp->incr_num_deq();
                         commit(dtokp);
                     }
@@ -262,10 +369,6 @@
 }
 
 void
-jrnl_instance::run_read() throw ()
-{}
-
-void
 jrnl_instance::abort(const rhm::journal::data_tok* dtokp)
 {
     txn(dtokp, false);
@@ -313,9 +416,5 @@
             jrnl_instance::push_dtokp(journal, *i);
 }
 
-void
-jrnl_instance::aio_rd_callback(jcntl* /* journal */, std::vector<journal::data_tok*>& /* dtokl */)
-{}
-
 } // namespace jtt
 } // namespace rhm

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -24,6 +24,7 @@
 #ifndef rhm_jtt_jrnl_instance_hpp
 #define rhm_jtt_jrnl_instance_hpp
 
+#include "args.hpp"
 #include "jrnl_init_params.hpp"
 #include "test_case.hpp"
 
@@ -34,6 +35,8 @@
 #include <jrnl/slock.hpp>
 #include <vector>
 
+#include <iostream> // debug
+
 namespace rhm
 {
 namespace jtt
@@ -47,10 +50,13 @@
 
     private:
         jrnl_init_params::shared_ptr _jpp;
+        const args* _args_ptr;
         std::vector<dtok_ptr> _dtok_master_enq_list;
         std::vector<dtok_ptr> _dtok_master_txn_list;
+        std::deque<journal::data_tok*> _dtok_rd_list;
         std::deque<journal::data_tok*> _dtok_deq_list;
-        pthread_mutex_t _ddl_mutex;  ///< Mutex for _dtok_deq_list
+        pthread_mutex_t _rd_mutex;  ///< Mutex for _dtok_rd_list
+        pthread_mutex_t _deq_mutex;  ///< Mutex for _dtok_deq_list
         pthread_t _enq_thread;
         pthread_t _deq_thread;
         pthread_t _read_thread;
@@ -68,14 +74,18 @@
         inline const jrnl_init_params::shared_ptr& params() const { return _jpp; }
         inline const std::string& jid() const { return _jpp->jid(); }
 
-        void init_tc(test_case::shared_ptr& tcp, const bool recover_mode)
-                throw ();
+        void init_tc(test_case::shared_ptr& tcp, const args* const args_ptr) throw ();
         void run_tc() throw ();
         void tc_wait_compl() throw ();
 
     private:
         inline void push_dtokp(rhm::journal::data_tok* dtokp)
-                { rhm::journal::slock sl(&_ddl_mutex); _dtok_deq_list.push_back(dtokp); }
+        {
+            if (_args_ptr->read_mode.val() == read_arg::NONE)
+                { rhm::journal::slock sl(&_deq_mutex); _dtok_deq_list.push_back(dtokp); }
+            else
+                { rhm::journal::slock sl(&_rd_mutex); _dtok_rd_list.push_back(dtokp); }
+        }
         static inline void push_dtokp(jcntl* jp, rhm::journal::data_tok* dtokp)
                 { static_cast<jrnl_instance*>(jp)->push_dtokp(dtokp); }
 
@@ -83,13 +93,13 @@
         inline static void* run_enq(void* p)
                 { static_cast<jrnl_instance*>(p)->run_enq(); return 0; }
 
+        void run_read() throw ();
+        inline static void* run_read(void* p)
+                { static_cast<jrnl_instance*>(p)->run_read(); return 0; }
+
         void run_deq() throw ();
         inline static void* run_deq(void* p)
                 { static_cast<jrnl_instance*>(p)->run_deq(); return 0; }
-
-        void run_read() throw ();
-        inline static void* run_read(void* p)
-                { static_cast<jrnl_instance*>(p)->run_read(); return 0; }
         
         void abort(const rhm::journal::data_tok* dtokp);
         void commit(const rhm::journal::data_tok* dtokp);
@@ -98,7 +108,6 @@
 
         // static callbacks
         static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
-        static void aio_rd_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
     };
     
 } // namespace jtt

Added: store/trunk/cpp/tests/jrnl/jtt/read_arg.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/read_arg.cpp	                        (rev 0)
+++ store/trunk/cpp/tests/jrnl/jtt/read_arg.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -0,0 +1,97 @@
+/**
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "read_arg.hpp"
+
+#include <assert.h>
+#include <boost/program_options.hpp>
+namespace po = boost::program_options;
+
+namespace rhm
+{
+namespace jtt
+{
+std::map<std::string, read_arg::read_mode_t> read_arg::_map;
+std::string read_arg::_description;
+const bool read_arg::init = __init();
+
+// static init fn
+const bool
+read_arg::__init()
+{
+    // Set string versions of each enum option here
+    _map["NONE"] = NONE;
+    _map["ALL"] = ALL;
+    _map["RANDOM"] = RANDOM;
+    _map["LAZYLOAD"] = LAZYLOAD;
+    _description = "Determines if and when messages will be read prior to dequeueing. "
+            "Values: (NONE | ALL | RANDOM | LAZYLOAD)";
+    return true;
+}
+
+void
+read_arg::parse(const std::string& str)
+{
+    std::map<std::string, read_arg::read_mode_t>::const_iterator i = _map.find(str);
+    if (i == _map.end())
+        throw po::invalid_option_value(str);
+    _rm = i->second;
+}
+
+// static fn
+const std::string&
+read_arg::str(const read_mode_t rm)
+{
+    for (std::map<std::string, read_mode_t>::const_iterator i = _map.begin(); i != _map.end(); i++)
+    {
+        if (i->second == rm)
+            return i->first;
+    }
+    assert(!"map - enum mismatch");
+}
+
+// static fn
+const std::string&
+read_arg::descr()
+{
+    return _description;
+}
+
+std::ostream&
+operator<<(std::ostream& os, const read_arg& ra)
+{
+    os << ra.str();
+    return os;
+}
+
+std::istream&
+operator>>(std::istream& is, read_arg& ra)
+{
+    std::string s;
+    is >> s;
+    ra.parse(s);
+    return is;
+}
+    
+} // namespace jtt
+} // namespace rhm

Added: store/trunk/cpp/tests/jrnl/jtt/read_arg.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/read_arg.hpp	                        (rev 0)
+++ store/trunk/cpp/tests/jrnl/jtt/read_arg.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -0,0 +1,64 @@
+/**
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_jtt_read_arg_hpp
+#define rhm_jtt_read_arg_hpp
+
+#include <string>
+#include <map>
+
+namespace rhm
+{
+namespace jtt
+{
+
+class read_arg
+{
+    public:
+        enum read_mode_t { NONE, ALL, RANDOM, LAZYLOAD};
+    private:
+        static std::map<std::string, read_mode_t> _map;
+        static std::string _description;
+        static const bool init;
+        static const bool __init();
+        read_mode_t _rm;
+    public:
+        inline read_arg() : _rm(NONE) {}
+        inline read_arg(read_mode_t rm) : _rm(rm) {}
+
+        inline const read_mode_t val() const { return _rm; }
+        inline void set_val(const read_mode_t rm) { _rm = rm; }
+        void parse(const std::string& str);
+
+        inline const std::string& str() const { return str(_rm); }
+        static const std::string& str(const read_mode_t rm);
+        static const std::string& descr();
+
+        friend std::ostream& operator<<(std::ostream& os, const read_arg& ra);
+        friend std::istream& operator>>(std::istream& is, read_arg& ra);
+};
+    
+} // namespace jtt
+} // namespace rhm
+
+#endif // ifndef rhm_jtt_read_arg_hpp

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -58,7 +58,7 @@
     if (_min_data_size == _max_data_size)
         return _max_data_size;
     size_t size_diff = _max_data_size - _min_data_size;
-    return _min_data_size + size_t(1.0 * rand() * size_diff/(RAND_MAX + 1.0));
+    return _min_data_size + size_t(1.0 * ::rand() * size_diff/(RAND_MAX + 1.0));
 }
 
 const size_t
@@ -69,11 +69,11 @@
         return size_t(0);
     if (_min_xid_size == 0)
     {
-        if (1.0 * rand() / RAND_MAX < 0.5)
+        if (1.0 * ::rand() / RAND_MAX < 0.5)
             return size_t(0);
     }
     size_t size_diff = _max_xid_size - _min_xid_size;
-    return _min_xid_size + size_t(1.0 * rand() * size_diff/(RAND_MAX + 1.0));
+    return _min_xid_size + size_t(1.0 * ::rand() * size_diff/(RAND_MAX + 1.0));
 }
 
 const bool
@@ -84,7 +84,7 @@
         return false;
     if (_transient == JTT_PERSISTNET)
         return true;
-    return 1.0 * rand() / RAND_MAX < 0.5;
+    return 1.0 * ::rand() / RAND_MAX < 0.5;
 }
 
 const bool
@@ -95,7 +95,7 @@
         return false;
     if (_external == JDL_EXTERNAL)
         return true;
-    return 1.0 * rand() / RAND_MAX < 0.5;
+    return 1.0 * ::rand() / RAND_MAX < 0.5;
 }
 
 void

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -36,6 +36,7 @@
         _num_enq(0),
         _num_deq(0),
         _num_read(0),
+        _num_rproc(0),
         _stopped(false),
         _exception_list()
 {

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -46,7 +46,8 @@
         std::string _jid;
         u_int32_t _num_enq;
         u_int32_t _num_deq;
-        u_int32_t _num_read;
+        u_int32_t _num_read;  // Messages actually read
+        u_int32_t _num_rproc; // Messages handled by read thread (not all are read)
         timespec _start_time;
         timespec _stop_time;
         bool _stopped;
@@ -64,6 +65,8 @@
         inline const u_int32_t incr_num_deq() { return ++_num_deq; }
         inline const u_int32_t num_read() const { return _num_read; }
         inline const u_int32_t incr_num_read() { return ++_num_read; }
+        inline const u_int32_t num_rproc() const { return _num_rproc; }
+        inline const u_int32_t incr_num_rproc() { return ++_num_rproc; }
 
         inline const timespec& start_time() const { return _start_time; }
         inline void set_start_time() { ::clock_gettime(CLOCK_REALTIME, &_start_time); }

Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -36,9 +36,8 @@
         _args(args),
         _random_fn_ptr(random_fn)
 {
-    std::cout << "Number of journals: " << _args.num_jrnls << std::endl;
-    if (args.seed)
-        ::srandom(args.seed);
+    if (_args.seed)
+        ::srand(_args.seed);
 }
 
 test_mgr::~test_mgr()
@@ -64,7 +63,7 @@
         if (tcs.ignored())
             std::cout << "WARNING: " << tcs.ignored() << " test cases were ignored. "
                     "(recover-mode selected and test has no auto-dequeue.)" << std::endl;
-        _args.print_flags();
+        _args.print_args();
     }
     else
         std::cout << " (WARNING: This CSV file is empty or does not exist.)" << std::endl;
@@ -84,12 +83,12 @@
             if (!_args.reuse_instance || _ji_list.empty())
                 initialize_jrnls();
             for (ji_list_citr jii=_ji_list.begin(); jii!=_ji_list.end(); jii++)
-                (*jii)->init_tc(*tci, _args.recover_mode);
+                (*jii)->init_tc(*tci, &_args);
             for (ji_list_citr jii=_ji_list.begin(); jii!=_ji_list.end(); jii++)
                 (*jii)->run_tc();
             for (ji_list_citr jii=_ji_list.begin(); jii!=_ji_list.end(); jii++)
                 (*jii)->tc_wait_compl();
-            if (_args.jrnl_format_chk)
+            if (_args.format_chk)
             {
                 for (ji_list_citr jii=_ji_list.begin(); jii!=_ji_list.end(); jii++)
                 {

Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.hpp	2008-01-30 15:45:16 UTC (rev 1628)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.hpp	2008-01-31 15:28:06 UTC (rev 1629)
@@ -57,7 +57,7 @@
         void initialize_jrnls();
         void print_results(test_case::shared_ptr tcp, const bool summary);
         inline static ptrdiff_t random_fn(const ptrdiff_t i)
-                { return static_cast<ptrdiff_t>(1.0 * i * random() / RAND_MAX); }
+                { return static_cast<ptrdiff_t>(1.0 * i * ::rand() / RAND_MAX); }
     };
     
 } // namespace jtt




More information about the rhmessaging-commits mailing list