[rhmessaging-commits] rhmessaging commits: r1645 - in store/trunk/cpp: lib/jrnl and 2 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Feb 6 10:39:52 EST 2008
Author: kpvdr
Date: 2008-02-06 10:39:52 -0500 (Wed, 06 Feb 2008)
New Revision: 1645
Added:
store/trunk/cpp/lib/jrnl/cvar.cpp
store/trunk/cpp/lib/jrnl/cvar.hpp
store/trunk/cpp/lib/jrnl/slock.cpp
store/trunk/cpp/lib/jrnl/time_ns.cpp
store/trunk/cpp/lib/jrnl/time_ns.hpp
store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/aio_cb.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/slock.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/Makefile.am
store/trunk/cpp/tests/jrnl/jtt/Makefile.am
store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp
store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp
store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp
store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp
store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp
Log:
Bugfixes to read pipeline in journal; updates to journal test tool (jtt) to test read. Also changed usleep() calls in jtt to use pthread condition variables instead.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -80,7 +80,7 @@
}
void
-JournalImpl::recover(const aio_cb wr_cb,
+JournalImpl::recover(const journal::rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t& highest_rid,
u_int64_t queue_id)
{
@@ -91,7 +91,7 @@
prep_xid_list.push_back(i->xid);
}
- jcntl::recover(wr_cb, prep_xid_list, highest_rid);
+ jcntl::recover(rd_cb, wr_cb, prep_xid_list, highest_rid);
// Populate PreparedTransaction lists from _tmap
for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
@@ -341,3 +341,7 @@
dtokp->release();
}
}
+
+// void
+// JournalImpl::aio_rd_callback(jcntl* /*journal*/, std::vector<u_int16_t>& /*pil*/)
+// {}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-02-06 15:39:52 UTC (rev 1645)
@@ -92,15 +92,15 @@
const qpid::sys::Duration flushTimeout);
virtual ~JournalImpl();
- inline void initialize() { jcntl::initialize(&aio_wr_callback); }
+ inline void initialize() { jcntl::initialize(0, &aio_wr_callback); }
- void recover(const journal::aio_cb wr_cb,
+ void recover(const journal::rd_aio_cb rd_cb, const journal::wr_aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
u_int64_t& highest_rid, u_int64_t queue_id);
inline void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
u_int64_t& highest_rid, u_int64_t queue_id) {
- recover(&aio_wr_callback, prep_tx_list, highest_rid, queue_id);
+ recover(0, &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
}
// Temporary fn to read and save last msg read from journal so it can be assigned
@@ -144,6 +144,7 @@
private:
void handleIoResult(const journal::iores r);
static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
+ // static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
}; // class JournalImpl
} // namespace bdbstore
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/Makefile.am 2008-02-06 15:39:52 UTC (rev 1645)
@@ -38,6 +38,7 @@
StoreException.h \
StringDbt.h \
TxnCtxt.h \
+ jrnl/cvar.cpp \
jrnl/data_tok.cpp \
jrnl/deq_rec.cpp \
jrnl/enq_map.cpp \
@@ -52,12 +53,15 @@
jrnl/nlfh.cpp \
jrnl/pmgr.cpp \
jrnl/rmgr.cpp \
+ jrnl/slock.cpp \
jrnl/rrfc.cpp \
+ jrnl/time_ns.cpp \
jrnl/txn_map.cpp \
jrnl/txn_rec.cpp \
jrnl/wmgr.cpp \
jrnl/wrfc.cpp \
jrnl/aio_cb.hpp \
+ jrnl/cvar.hpp \
jrnl/data_tok.hpp \
jrnl/deq_hdr.hpp \
jrnl/deq_rec.hpp \
@@ -82,6 +86,7 @@
jrnl/rmgr.hpp \
jrnl/rrfc.hpp \
jrnl/slock.hpp \
+ jrnl/time_ns.hpp \
jrnl/txn_hdr.hpp \
jrnl/txn_map.hpp \
jrnl/txn_rec.hpp \
Modified: store/trunk/cpp/lib/jrnl/aio_cb.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio_cb.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/aio_cb.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -44,7 +44,8 @@
/**
* \brief Callback function pointer to be called when AIO events arrive.
*/
- typedef void (*aio_cb)(jcntl* journal, std::vector<data_tok*>& dtokl);
+ typedef void (*wr_aio_cb)(jcntl* journal, std::vector<data_tok*>& dtokl);
+ typedef void (*rd_aio_cb)(jcntl* journal, std::vector<u_int16_t>& pil);
}
}
Added: store/trunk/cpp/lib/jrnl/cvar.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/cvar.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/cvar.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,31 @@
+/**
+* \file cvar.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class rhm::journal::cvar (condition variable). See
+* comments in file cvar.hpp for details.
+*
+* Copyright (C) 2007, 2008 Red Hat Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include <jrnl/cvar.hpp>
Added: store/trunk/cpp/lib/jrnl/cvar.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/cvar.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/cvar.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,85 @@
+/**
+* \file cvar.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* This file contains a posix condition variable class.
+*
+* Copyright 2007, 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_cvar_hpp
+#define rhm_journal_cvar_hpp
+
+#include <jrnl/jerrno.hpp>
+#include <jrnl/jexception.hpp>
+#include <jrnl/time_ns.hpp>
+#include <pthread.h>
+#include <sstream>
+
+namespace rhm
+{
+namespace journal
+{
+
+ // Ultra-simple thread condition variable class
+ class cvar
+ {
+ private:
+ pthread_mutex_t& _m;
+ pthread_cond_t _c;
+ public:
+ inline cvar(pthread_mutex_t& m) : _m(m) { ::pthread_cond_init(&_c, 0); }
+ inline ~cvar() { ::pthread_cond_destroy(&_c); }
+ inline void wait()
+ {
+ PTHREAD_CHK(::pthread_cond_wait(&_c, &_m), "pthread_cond_wait", "cvar", "wait");
+ }
+ inline void timedwait(timespec& ts)
+ {
+ PTHREAD_CHK(::pthread_cond_timedwait(&_c, &_m, &ts), "pthread_cond_timedwait", "cvar",
+ "timedwait");
+ }
+ inline const bool waitintvl(const long intvl_ns)
+ {
+ time_ns t; t.now(); t+=intvl_ns;
+ int ret = ::pthread_cond_timedwait(&_c, &_m, &t);
+ if (ret == ETIMEDOUT)
+ return true;
+ PTHREAD_CHK(ret, "pthread_cond_timedwait", "cvar", "waitintvl");
+ return false;
+ }
+ inline void signal()
+ {
+ PTHREAD_CHK(::pthread_cond_signal(&_c), "pthread_cond_signal", "cvar", "notify");
+ }
+ inline void broadcast()
+ {
+ PTHREAD_CHK(::pthread_cond_broadcast(&_c), "pthread_cond_broadcast", "cvar",
+ "broadcast");
+ }
+ };
+
+}
+}
+
+#endif
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -71,8 +71,8 @@
_wmgr(this, _emap, _tmap, _wrfc),
_rcvdat(_num_jfiles)
{
- pthread_mutex_init(&_wr_mutex, 0);
- pthread_mutex_init(&_gev_mutex, 0);
+ ::pthread_mutex_init(&_wr_mutex, 0);
+ ::pthread_mutex_init(&_gev_mutex, 0);
}
jcntl::~jcntl()
@@ -86,12 +86,12 @@
delete _datafh[i];
delete[] _datafh;
}
- pthread_mutex_destroy(&_gev_mutex);
- pthread_mutex_destroy(&_wr_mutex);
+ ::pthread_mutex_destroy(&_gev_mutex);
+ ::pthread_mutex_destroy(&_wr_mutex);
}
void
-jcntl::initialize(const aio_cb wr_cb)
+jcntl::initialize(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb)
{
_init_flag = false;
_stop_flag = false;
@@ -125,7 +125,7 @@
// constrains read activity (i.e. one can't read what has not yet been written).
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
- _rmgr.initialize(0);
+ _rmgr.initialize(rd_cb, 0);
_wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
// Write info file (<basename>.jinf) to disk
@@ -135,8 +135,8 @@
}
void
-jcntl::recover(const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list,
- u_int64_t& highest_rid)
+jcntl::recover(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
+ const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
{
_init_flag = false;
_stop_flag = false;
@@ -180,7 +180,7 @@
// constrains read activity (i.e. one can't read what has not yet been written).
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
- _rmgr.initialize(_rcvdat._fro);
+ _rmgr.initialize(rd_cb, _rcvdat._fro);
_wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
_readonly_flag = true;
@@ -430,7 +430,7 @@
u_int32_t cnt = 0;
while (_wmgr.get_aio_evt_rem())
{
- _wmgr.get_events(pmgr::UNUSED);
+ get_wr_events();
if (cnt++ > MAX_AIO_CMPL_SLEEPS)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
::usleep(AIO_CMPL_SLEEP);
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -178,11 +178,12 @@
* <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
* used.</b>
*
+ * \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
* \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
*
* \exception TODO
*/
- void initialize(const aio_cb wr_cb);
+ void initialize(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb);
/**
* /brief Initialize journal by recovering state from previously written journal.
@@ -199,14 +200,15 @@
* <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
* used.</b>
*
+ * \param rd_cb Function pointer to callback function for read operations. May be 0 (NULL).
* \param wr_cb Function pointer to callback function for write operations. May be 0 (NULL).
* \param prep_txn_list
* \param highest_rid Returns the highest rid found in the journal during recover
*
* \exception TODO
*/
- void recover(const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list,
- u_int64_t& highest_rid);
+ void recover(const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
+ const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
/**
* \brief Notification to the journal that recovery is complete and that normal operation
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -42,7 +42,6 @@
#include <deque>
#include <libaio.h>
-#include <jrnl/aio_cb.hpp>
#include <jrnl/data_tok.hpp>
#include <jrnl/deq_rec.hpp>
#include <jrnl/enq_map.hpp>
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -51,8 +51,9 @@
{}
void
-rmgr::initialize(size_t fro)
+rmgr::initialize(const rd_aio_cb rd_cb, const size_t fro)
{
+ _cb = rd_cb;
initialize();
if (fro)
{
@@ -390,6 +391,8 @@
throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
}
+ std::vector<u_int16_t> pil;
+ pil.reserve(ret);
for (int i=0; i<ret; i++) // Index of returned AIOs
{
if (_aio_evt_rem == 0)
@@ -419,8 +422,12 @@
pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
pcbp->_state = state;
+ pil[i] = pcbp->_index;
}
+ // Perform AIO return callback
+ if (_cb && ret)
+ (_cb)(_jc, pil);
return ret;
}
@@ -583,7 +590,7 @@
// If skip still incomplete, move to next page and decode again
if (tot_dblk_cnt < dsize_dblks)
{
- if (_pg_offset_dblks == JRNL_SBLK_SIZE * JRNL_RMGR_PAGE_SIZE)
+ if (dblks_rem() == 0)
rotate_page();
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -39,6 +39,7 @@
}
}
+#include <jrnl/aio_cb.hpp>
#include <jrnl/enums.hpp>
#include <jrnl/pmgr.hpp>
#include <jrnl/rec_hdr.hpp>
@@ -58,14 +59,15 @@
class rmgr : public pmgr
{
private:
- rrfc& _rrfc; ///< Ref to read rotating file controller
+ rrfc& _rrfc; ///< Ref to read rotating file controller
rec_hdr _hdr; ///< Header used to determind record type
+ rd_aio_cb _cb; ///< Callback function pointer for AIO events
public:
rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
virtual ~rmgr();
- void initialize(size_t fro);
+ void initialize(const rd_aio_cb rd_cb, const size_t fro);
const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
const void** const data, bool auto_discard);
const iores discard(data_tok* dtok);
Added: store/trunk/cpp/lib/jrnl/slock.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/slock.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,31 @@
+/**
+* \file slock.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class rhm::journal::slock (scoped lock). See
+* comments in file slock.hpp for details.
+*
+* Copyright (C) 2007, 2008 Red Hat Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include <jrnl/slock.hpp>
Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/slock.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -31,11 +31,11 @@
#ifndef rhm_journal_slock_hpp
#define rhm_journal_slock_hpp
-#include <sstream>
-#include <pthread.h>
#include <errno.h>
#include <jrnl/jerrno.hpp>
#include <jrnl/jexception.hpp>
+#include <pthread.h>
+#include <sstream>
namespace rhm
{
@@ -50,11 +50,11 @@
public:
inline slock(pthread_mutex_t* m) : _m(m)
{
- PTHREAD_CHK(pthread_mutex_lock(_m), "pthread_mutex_lock", "slock", "slock");
+ PTHREAD_CHK(::pthread_mutex_lock(_m), "pthread_mutex_lock", "slock", "slock");
}
inline ~slock()
{
- PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "slock", "~slock");
+ PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock", "slock", "~slock");
}
};
@@ -67,7 +67,7 @@
public:
inline stlock(pthread_mutex_t* m) : _m(m), _locked(false)
{
- int ret = pthread_mutex_trylock(_m);
+ int ret = ::pthread_mutex_trylock(_m);
_locked = (ret == 0); // check if lock obtained
if (!_locked && ret != EBUSY) PTHREAD_CHK(ret, "pthread_mutex_trylock", "stlock",
"stlock");
@@ -75,7 +75,8 @@
inline ~stlock()
{
if (_locked)
- PTHREAD_CHK(pthread_mutex_unlock(_m), "pthread_mutex_unlock", "stlock", "~stlock");
+ PTHREAD_CHK(::pthread_mutex_unlock(_m), "pthread_mutex_unlock", "stlock",
+ "~stlock");
}
inline const bool locked() const { return _locked; }
};
Added: store/trunk/cpp/lib/jrnl/time_ns.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/time_ns.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/time_ns.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,47 @@
+/**
+* \file time_ns.cpp
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "time_ns.hpp"
+#include <sstream>
+
+namespace rhm
+{
+namespace journal
+{
+
+const std::string
+time_ns::str(int precision) const
+{
+ const double t = tv_sec + (tv_nsec/1e9);
+ std::ostringstream oss;
+ oss.setf(std::ios::fixed, std::ios::floatfield);
+ oss.precision(precision);
+ oss << t;
+ return oss.str();
+}
+
+
+} // namespace jtt
+} // namespace rhm
Added: store/trunk/cpp/lib/jrnl/time_ns.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/time_ns.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/time_ns.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,97 @@
+/**
+* \file time_ns.hpp
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+#ifndef rhm_jtt_time_ns_hpp
+#define rhm_jtt_time_ns_hpp
+
+#include <errno.h>
+#include <string>
+#include <time.h>
+
+namespace rhm
+{
+namespace journal
+{
+
+struct time_ns : public timespec
+{
+ inline time_ns() { tv_sec = 0; tv_nsec = 0; }
+ inline time_ns(const time_t sec, const long nsec = 0) { tv_sec = sec; tv_nsec = nsec; }
+ inline time_ns(const time_ns& t) { tv_sec = t.tv_sec; tv_nsec = t.tv_nsec; }
+
+ inline void set_zero() { tv_sec = 0; tv_nsec = 0; }
+ inline bool is_zero() const { return tv_sec == 0 && tv_nsec == 0; }
+ inline int now() { if(::clock_gettime(CLOCK_REALTIME, this)) return errno; return 0; }
+ const std::string str(int precision = 6) const;
+
+ inline time_ns& operator=(const time_ns& rhs)
+ { tv_sec = rhs.tv_sec; tv_nsec = rhs.tv_nsec; return *this; }
+ inline time_ns& operator+=(const time_ns& rhs)
+ {
+ tv_nsec += rhs.tv_nsec;
+ if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+ tv_sec += rhs.tv_sec;
+ return *this;
+ }
+ inline time_ns& operator+=(const long ns)
+ {
+ tv_nsec += ns;
+ if (tv_nsec >= 1000000000L) { tv_sec++; tv_nsec -= 1000000000L; }
+ return *this;
+ }
+ inline time_ns& operator-=(const long ns)
+ {
+ tv_nsec -= ns;
+ if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+ return *this;
+ }
+ inline time_ns& operator-=(const time_ns& rhs)
+ {
+ tv_nsec -= rhs.tv_nsec;
+ if (tv_nsec < 0) { tv_sec--; tv_nsec += 1000000000L; }
+ tv_sec -= rhs.tv_sec;
+ return *this;
+ }
+ inline const time_ns operator+(const time_ns& rhs)
+ { time_ns t(*this); t += rhs; return t; }
+ inline const time_ns operator-(const time_ns& rhs)
+ { time_ns t(*this); t -= rhs; return t; }
+ inline bool operator==(const time_ns& rhs)
+ { return tv_sec == rhs.tv_sec && tv_nsec == rhs.tv_nsec; }
+ inline bool operator!=(const time_ns& rhs)
+ { return tv_sec != rhs.tv_sec || tv_nsec != rhs.tv_nsec; }
+ inline bool operator>(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec > rhs.tv_nsec; return tv_sec > rhs.tv_sec; }
+ inline bool operator>=(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec >= rhs.tv_nsec; return tv_sec >= rhs.tv_sec; }
+ inline bool operator<(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec < rhs.tv_nsec; return tv_sec < rhs.tv_sec; }
+ inline bool operator<=(const time_ns& rhs)
+ { if(tv_sec == rhs.tv_sec) return tv_nsec <= rhs.tv_nsec; return tv_sec <= rhs.tv_sec; }
+};
+
+} // namespace jtt
+} // namespace rhm
+
+#endif // ifndef rhm_jtt_time_ns_hpp
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -37,8 +37,6 @@
#include <jrnl/jcntl.hpp>
#include <jrnl/jerrno.hpp>
-#include <iostream> // debug
-
namespace rhm
{
namespace journal
@@ -91,7 +89,7 @@
}
void
-wmgr::initialize(const aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
+wmgr::initialize(const wr_aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
size_t eo)
{
_enq_busy = false;
@@ -758,8 +756,6 @@
int ret = 0;
if ((ret = ::io_getevents(_ioctx, 0, JRNL_RMGR_PAGES + JRNL_WMGR_PAGES, _ioevt_arr, 0)) < 0)
{
- if (ret == -EINTR) // No events
- return 0;
std::ostringstream oss;
oss << "io_getevents() failed: " << strerror(-ret) << " (" << ret << ")";
throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
@@ -781,8 +777,15 @@
if (aioret < 0)
{
std::ostringstream oss;
- oss << "AIO write operation failed: " << strerror(-aioret) << " (" << aioret << ")";
- oss << " [pg=" << pcbp->_index << " size=" << iocbp->u.c.nbytes;
+ oss << "AIO write operation failed: " << strerror(-aioret) << " (" << aioret << ") [";
+ if (pcbp)
+ oss << "pg=" << pcbp->_index;
+ else
+ {
+ file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
+ oss << "fid=" << fhp->_fid;
+ }
+ oss << " size=" << iocbp->u.c.nbytes;
oss << " offset=" << iocbp->u.c.offset << " fh=" << iocbp->aio_fildes << "]";
throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -39,6 +39,7 @@
}
}
+#include <jrnl/aio_cb.hpp>
#include <jrnl/enums.hpp>
#include <jrnl/pmgr.hpp>
#include <jrnl/wrfc.hpp>
@@ -93,7 +94,7 @@
deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding
txn_rec _txn_rec; ///< Transaction record used for encoding/decoding
std::set<std::string> _txn_pending_set; ///< Set containing xids of pending commits/aborts
- aio_cb _cb; ///< Callback function pointer for AIO events
+ wr_aio_cb _cb; ///< Callback function pointer for AIO events
public:
wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc);
@@ -101,7 +102,7 @@
const u_int32_t max_iowait_us);
virtual ~wmgr();
- void initialize(aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
+ void initialize(wr_aio_cb wr_cb, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
size_t eo = 0);
const iores enqueue(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
Property changes on: store/trunk/cpp/tests/jrnl
___________________________________________________________________
Name: svn:ignore
- .deps
.libs
Makefile
Makefile.in
jtest
_ut_enq_map
_ut_jdir
_ut_jerrno
_ut_jexception
_ut_jinf
_ut_rec_hdr
_ut_txn_map
+ .deps
.libs
Makefile
Makefile.in
jtest
_ut_enq_map
_ut_jdir
_ut_jerrno
_ut_jexception
_ut_jinf
_ut_rec_hdr
_ut_time_ns
_ut_txn_map
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -647,14 +647,14 @@
void
JournalSystemTests::jrnl_init(rhm::journal::jcntl* jc)
{
- jc->initialize(0);
+ jc->initialize(0, 0);
}
void
JournalSystemTests::jrnl_recover(rhm::journal::jcntl* jc, vector<string> txn_list,
u_int64_t& highest_rid)
{
- jc->recover(0, txn_list, highest_rid);
+ jc->recover(0, 0, txn_list, highest_rid);
}
void
Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/Makefile.am 2008-02-06 15:39:52 UTC (rev 1645)
@@ -28,6 +28,7 @@
# SUBDIRS = jtt
TESTS = \
+ _ut_time_ns \
_ut_jexception \
_ut_jerrno \
_ut_rec_hdr \
@@ -42,6 +43,7 @@
libdlclose_noop.la
check_PROGRAMS = \
+ _ut_time_ns \
_ut_jexception \
_ut_jerrno \
_ut_rec_hdr \
@@ -53,6 +55,9 @@
UNIT_TEST_SRCS = ../unit_test.cpp
UNIT_TEST_LDADD = -lboost_unit_test_framework -lbdbstore -L../../lib/.libs
+_ut_time_ns_SOURCES = _ut_time_ns.cpp $(UNIT_TEST_SRCS)
+_ut_time_ns_LDFLAGS = $(UNIT_TEST_LDADD)
+
_ut_jexception_SOURCES = _ut_jexception.cpp $(UNIT_TEST_SRCS)
_ut_jexception_LDFLAGS = $(UNIT_TEST_LDADD)
Added: store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp (rev 0)
+++ store/trunk/cpp/tests/jrnl/_ut_time_ns.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -0,0 +1,151 @@
+/**
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "../unit_test.h"
+#include <jrnl/time_ns.hpp>
+
+using namespace boost::unit_test;
+using namespace rhm::journal;
+
+QPID_AUTO_TEST_SUITE(jtt_time_ns)
+
+BOOST_AUTO_TEST_CASE(time_ns_constructors)
+{
+ const time_t sec = 123;
+ const long nsec = 123456789;
+
+ time_ns t1;
+ BOOST_CHECK_EQUAL(t1.tv_sec, 0);
+ BOOST_CHECK_EQUAL(t1.tv_nsec, 0);
+ BOOST_CHECK_EQUAL(t1.is_zero(), true);
+ time_ns t2(sec, nsec);
+ BOOST_CHECK_EQUAL(t2.tv_sec, sec);
+ BOOST_CHECK_EQUAL(t2.tv_nsec, nsec);
+ BOOST_CHECK_EQUAL(t2.is_zero(), false);
+ time_ns t3(t1);
+ BOOST_CHECK_EQUAL(t3.tv_sec, 0);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, 0);
+ BOOST_CHECK_EQUAL(t3.is_zero(), true);
+ time_ns t4(t2);
+ BOOST_CHECK_EQUAL(t4.tv_sec, sec);
+ BOOST_CHECK_EQUAL(t4.tv_nsec, nsec);
+ BOOST_CHECK_EQUAL(t4.is_zero(), false);
+ t4.set_zero();
+ BOOST_CHECK_EQUAL(t4.tv_sec, 0);
+ BOOST_CHECK_EQUAL(t4.tv_nsec, 0);
+ BOOST_CHECK_EQUAL(t4.is_zero(), true);
+}
+
+BOOST_AUTO_TEST_CASE(time_ns_operators)
+{
+ const time_t sec1 = 123;
+ const long nsec1 = 123456789;
+ const time_t sec2 = 1;
+ const long nsec2 = 999999999;
+ const time_t sec_sum = sec1 + sec2 + 1;
+ const long nsec_sum = nsec1 + nsec2 - 1000000000;
+ const time_t sec_1_minus_2 = sec1 - sec2 - 1;
+ const long nsec_1_minus_2 = nsec1 - nsec2 + 1000000000;
+ const time_t sec_2_minus_1 = sec2 - sec1;
+ const long nsec_2_minus_1 = nsec2 - nsec1;
+ time_ns z;
+ time_ns t1(sec1, nsec1);
+ time_ns t2(sec2, nsec2);
+
+ time_ns t3 = z;
+ BOOST_CHECK_EQUAL(t3.tv_sec, 0);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, 0);
+ BOOST_CHECK_EQUAL(t3 == z, true);
+ BOOST_CHECK_EQUAL(t3 != z, false);
+ BOOST_CHECK_EQUAL(t3 > z, false);
+ BOOST_CHECK_EQUAL(t3 >= z, true);
+ BOOST_CHECK_EQUAL(t3 < z, false);
+ BOOST_CHECK_EQUAL(t3 <= z, true);
+
+ t3 = t1;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec1);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec1);
+ BOOST_CHECK_EQUAL(t3 == t1, true);
+ BOOST_CHECK_EQUAL(t3 != t1, false);
+ BOOST_CHECK_EQUAL(t3 > t1, false);
+ BOOST_CHECK_EQUAL(t3 >= t1, true);
+ BOOST_CHECK_EQUAL(t3 < t1, false);
+ BOOST_CHECK_EQUAL(t3 <= t1, true);
+
+ t3 += z;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec1);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec1);
+
+ t3 = t2;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec2);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec2);
+ BOOST_CHECK_EQUAL(t3 == t2, true);
+ BOOST_CHECK_EQUAL(t3 != t2, false);
+ BOOST_CHECK_EQUAL(t3 > t2, false);
+ BOOST_CHECK_EQUAL(t3 >= t2, true);
+ BOOST_CHECK_EQUAL(t3 < t2, false);
+ BOOST_CHECK_EQUAL(t3 <= t2, true);
+
+ t3 += z;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec2);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec2);
+
+ t3 = t1;
+ t3 += t2;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec_sum);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_sum);
+
+ t3 = t1;
+ t3 -= t2;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec_1_minus_2);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_1_minus_2);
+
+ t3 = t2;
+ t3 -= t1;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec_2_minus_1);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_2_minus_1);
+
+ t3 = t1 + t2;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec_sum);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_sum);
+
+ t3 = t1 - t2;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec_1_minus_2);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_1_minus_2);
+
+ t3 = t2 - t1;
+ BOOST_CHECK_EQUAL(t3.tv_sec, sec_2_minus_1);
+ BOOST_CHECK_EQUAL(t3.tv_nsec, nsec_2_minus_1);
+}
+
+BOOST_AUTO_TEST_CASE(time_ns_str)
+{
+ time_ns t1(123, 123456789);
+ BOOST_CHECK_EQUAL(t1.str(), "123.123457");
+ BOOST_CHECK_EQUAL(t1.str(9), "123.123456789");
+ BOOST_CHECK_EQUAL(t1.str(0), "123");
+ time_ns t2(1, 1);
+ BOOST_CHECK_EQUAL(t2.str(9), "1.000000001");
+ time_ns t3(-12, 345);
+ BOOST_CHECK_EQUAL(t3.str(9), "-11.999999655");
+}
Modified: store/trunk/cpp/tests/jrnl/jtt/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/Makefile.am 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/Makefile.am 2008-02-06 15:39:52 UTC (rev 1645)
@@ -76,6 +76,7 @@
${LIBOBJDIR}/pmgr.o \
${LIBOBJDIR}/rmgr.o \
${LIBOBJDIR}/rrfc.o \
+ ${LIBOBJDIR}/time_ns.o \
${LIBOBJDIR}/txn_map.o \
${LIBOBJDIR}/txn_rec.o \
${LIBOBJDIR}/wmgr.o \
@@ -122,6 +123,7 @@
${LIBOBJDIR}/pmgr.o \
${LIBOBJDIR}/rmgr.o \
${LIBOBJDIR}/rrfc.o \
+ ${LIBOBJDIR}/time_ns.o \
${LIBOBJDIR}/txn_map.o \
${LIBOBJDIR}/txn_rec.o \
${LIBOBJDIR}/wmgr.o \
@@ -129,23 +131,32 @@
_ut_test_case_SOURCES = _ut_test_case.cpp ../../unit_test.cpp
_ut_test_case_LDFLAGS = -lboost_unit_test_framework -lrt
-_ut_test_case_LDADD = test_case.o test_case_result.o test_case_result_agregation.o
+_ut_test_case_LDADD = test_case.o test_case_result.o test_case_result_agregation.o \
+ ${LIBOBJDIR}/time_ns.o
+
_ut_test_case_result_SOURCES = _ut_test_case_result.cpp ../../unit_test.cpp
_ut_test_case_result_LDFLAGS = -lboost_unit_test_framework -lrt
_ut_test_case_result_LDADD = test_case_result.o \
${LIBOBJDIR}/jerrno.o \
- ${LIBOBJDIR}/jexception.o
+ ${LIBOBJDIR}/jexception.o \
+ ${LIBOBJDIR}/time_ns.o
_ut_test_case_result_agregation_SOURCES = _ut_test_case_result_agregation.cpp ../../unit_test.cpp
_ut_test_case_result_agregation_LDFLAGS = -lboost_unit_test_framework -lrt
_ut_test_case_result_agregation_LDADD = test_case_result.o test_case_result_agregation.o \
${LIBOBJDIR}/jerrno.o \
- ${LIBOBJDIR}/jexception.o
+ ${LIBOBJDIR}/jexception.o \
+ ${LIBOBJDIR}/time_ns.o
_ut_test_case_set_SOURCES = _ut_test_case_set.cpp ../../unit_test.cpp
_ut_test_case_set_LDFLAGS = -lboost_unit_test_framework -lrt
-_ut_test_case_set_LDADD = test_case.o test_case_set.o test_case_result.o test_case_result_agregation.o
+_ut_test_case_set_LDADD = \
+ test_case.o \
+ test_case_set.o \
+ test_case_result.o \
+ test_case_result_agregation.o \
+ ${LIBOBJDIR}/time_ns.o
EXTRA_DIST = \
jtt.csv \
Modified: store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -38,15 +38,12 @@
BOOST_CHECK_EQUAL(tcr.jid(), jid);
BOOST_CHECK_EQUAL(tcr.exception(), false);
BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
- const timespec& ts1 = tcr.start_time();
- BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
- const timespec& ts2 = tcr.stop_time();
- BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
- const timespec& ts3 = tcr.test_time();
- BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+ const time_ns& ts1 = tcr.start_time();
+ BOOST_CHECK(ts1.is_zero());
+ const time_ns& ts2 = tcr.stop_time();
+ BOOST_CHECK(ts2.is_zero());
+ const time_ns& ts3 = tcr.test_time();
+ BOOST_CHECK(ts3.is_zero());
}
BOOST_AUTO_TEST_CASE(test_case_result_start_stop)
@@ -55,37 +52,30 @@
test_case_result tcr(jid);
BOOST_CHECK_EQUAL(tcr.exception(), false);
BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
- const timespec& ts1 = tcr.start_time();
- BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
- const timespec& ts2 = tcr.stop_time();
- BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
- const timespec& ts3 = tcr.test_time();
- BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+ const time_ns& ts1 = tcr.start_time();
+ BOOST_CHECK(ts1.is_zero());
+ const time_ns& ts2 = tcr.stop_time();
+ BOOST_CHECK(ts2.is_zero());
+ const time_ns& ts3 = tcr.test_time();
+ BOOST_CHECK(ts3.is_zero());
tcr.set_start_time();
BOOST_CHECK_EQUAL(tcr.exception(), false);
BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
- const timespec& ts4 = tcr.start_time();
- BOOST_CHECK(ts4.tv_sec > 0);
- BOOST_CHECK(ts4.tv_nsec > 0);
- const timespec& ts5 = tcr.stop_time();
- BOOST_CHECK_EQUAL(ts5.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts5.tv_nsec, 0);
- const timespec& ts6 = tcr.test_time();
- BOOST_CHECK_EQUAL(ts6.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts6.tv_nsec, 0);
+ const time_ns& ts4 = tcr.start_time();
+ BOOST_CHECK(!ts4.is_zero());
+ const time_ns& ts5 = tcr.stop_time();
+ BOOST_CHECK(ts5.is_zero());
+ const time_ns& ts6 = tcr.test_time();
+ BOOST_CHECK(ts6.is_zero());
::usleep(1100000); // 1.1 sec in microseconds
tcr.set_stop_time();
BOOST_CHECK_EQUAL(tcr.exception(), false);
BOOST_CHECK_EQUAL(tcr.exception_count(), 0U);
- const timespec& ts7 = tcr.stop_time();
- BOOST_CHECK(ts7.tv_sec > 0);
- BOOST_CHECK(ts7.tv_nsec > 0);
- const timespec& ts8 = tcr.test_time();
+ const time_ns& ts7 = tcr.stop_time();
+ BOOST_CHECK(!ts7.is_zero());
+ const time_ns& ts8 = tcr.test_time();
BOOST_CHECK(ts8.tv_sec == 1);
BOOST_CHECK(ts8.tv_nsec > 100000000); // 0.1 sec in nanoseconds
BOOST_CHECK(ts8.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -104,10 +94,9 @@
BOOST_CHECK_EQUAL(tcr.exception(), true);
BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
BOOST_CHECK_EQUAL(tcr[0], e.what());
- const timespec& ts1 = tcr.stop_time();
- BOOST_CHECK(ts1.tv_sec > 0);
- BOOST_CHECK(ts1.tv_nsec > 0);
- const timespec& ts2 = tcr.test_time();
+ const time_ns& ts1 = tcr.stop_time();
+ BOOST_CHECK(!ts1.is_zero());
+ const time_ns& ts2 = tcr.test_time();
BOOST_CHECK(ts2.tv_sec == 1);
BOOST_CHECK(ts2.tv_nsec > 100000000); // 0.1 sec in nanoseconds
BOOST_CHECK(ts2.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -124,10 +113,9 @@
BOOST_CHECK_EQUAL(tcr.exception(), true);
BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
BOOST_CHECK_EQUAL(tcr[0], err_msg);
- const timespec& ts1 = tcr.stop_time();
- BOOST_CHECK(ts1.tv_sec > 0);
- BOOST_CHECK(ts1.tv_nsec > 0);
- const timespec& ts2 = tcr.test_time();
+ const time_ns& ts1 = tcr.stop_time();
+ BOOST_CHECK(!ts1.is_zero());
+ const time_ns& ts2 = tcr.test_time();
BOOST_CHECK(ts2.tv_sec == 1);
BOOST_CHECK(ts2.tv_nsec > 100000000); // 0.1 sec in nanoseconds
BOOST_CHECK(ts2.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -144,10 +132,9 @@
BOOST_CHECK_EQUAL(tcr.exception(), true);
BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
BOOST_CHECK_EQUAL(tcr[0], err_msg);
- const timespec& ts1 = tcr.stop_time();
- BOOST_CHECK(ts1.tv_sec > 0);
- BOOST_CHECK(ts1.tv_nsec > 0);
- const timespec& ts2 = tcr.test_time();
+ const time_ns& ts1 = tcr.stop_time();
+ BOOST_CHECK(!ts1.is_zero());
+ const time_ns& ts2 = tcr.test_time();
BOOST_CHECK(ts2.tv_sec == 1);
BOOST_CHECK(ts2.tv_nsec > 100000000); // 0.1 sec in nanoseconds
BOOST_CHECK(ts2.tv_nsec < 200000000); // 0.2 sec in nanoseconds
@@ -166,12 +153,10 @@
BOOST_CHECK_EQUAL(tcr.exception(), true);
BOOST_CHECK_EQUAL(tcr.exception_count(), 1U);
BOOST_CHECK_EQUAL(tcr[0], e.what());
- const timespec& ts1 = tcr.stop_time();
- BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
- const timespec& ts2 = tcr.test_time();
- BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
+ const time_ns& ts1 = tcr.stop_time();
+ BOOST_CHECK(ts1.is_zero());
+ const time_ns& ts2 = tcr.test_time();
+ BOOST_CHECK(ts2.is_zero());
}
BOOST_AUTO_TEST_CASE(test_case_counters)
Modified: store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_result_agregation.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -46,15 +46,12 @@
BOOST_CHECK_EQUAL(tcra.jid(), "Average");
BOOST_CHECK_EQUAL(tcra.exception(), false);
BOOST_CHECK_EQUAL(tcra.exception_count(), 0U);
- const timespec& ts1 = tcra.start_time();
- BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
- const timespec& ts2 = tcra.stop_time();
- BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
- const timespec& ts3 = tcra.test_time();
- BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+ const time_ns& ts1 = tcra.start_time();
+ BOOST_CHECK(ts1.is_zero());
+ const time_ns& ts2 = tcra.stop_time();
+ BOOST_CHECK(ts2.is_zero());
+ const time_ns& ts3 = tcra.test_time();
+ BOOST_CHECK(ts3.is_zero());
}
BOOST_AUTO_TEST_CASE(test_case_result_agregation_constructor_2)
@@ -65,15 +62,12 @@
BOOST_CHECK_EQUAL(tcra.jid(), jid);
BOOST_CHECK_EQUAL(tcra.exception(), false);
BOOST_CHECK_EQUAL(tcra.exception_count(), 0U);
- const timespec& ts1 = tcra.start_time();
- BOOST_CHECK_EQUAL(ts1.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts1.tv_nsec, 0);
- const timespec& ts2 = tcra.stop_time();
- BOOST_CHECK_EQUAL(ts2.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts2.tv_nsec, 0);
- const timespec& ts3 = tcra.test_time();
- BOOST_CHECK_EQUAL(ts3.tv_sec, 0);
- BOOST_CHECK_EQUAL(ts3.tv_nsec, 0);
+ const time_ns& ts1 = tcra.start_time();
+ BOOST_CHECK(ts1.is_zero());
+ const time_ns& ts2 = tcra.stop_time();
+ BOOST_CHECK(ts2.is_zero());
+ const time_ns& ts3 = tcra.test_time();
+ BOOST_CHECK(ts3.is_zero());
}
BOOST_AUTO_TEST_CASE(test_case_result_agregation_add_test_case)
@@ -158,7 +152,7 @@
BOOST_CHECK_EQUAL(tcra.num_results(), num_results);
BOOST_CHECK_EQUAL(tcra.exception_count(), num_exceptions);
BOOST_CHECK_EQUAL(tcra.exception(), num_exceptions > 0);
- const timespec& ts1 = tcra.test_time();
+ const time_ns& ts1 = tcra.test_time();
BOOST_CHECK_EQUAL(ts1.tv_sec, secs);
BOOST_CHECK_EQUAL(ts1.tv_nsec, nsec);
}
@@ -173,9 +167,7 @@
tcrp->incr_num_deq();
for (unsigned i=0; i<num_reads; i++)
tcrp->incr_num_read();
- timespec ts;
- ts.tv_sec = secs;
- ts.tv_nsec = nsec;
+ time_ns ts(secs, nsec);
tcrp->set_test_time(ts);
return tcrp;
}
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -28,10 +28,10 @@
#include <jrnl/data_tok.hpp>
#include <jrnl/jerrno.hpp>
-#include <iostream>
+#include <iostream> // debug
-#define SLEEP_US 1000 // 1 ms
-#define MAX_SLEEP_CNT 2000 // 2 sec max
+#define MAX_WR_WAIT 10 // in ms
+#define MAX_RD_WAIT 100 // in ms
namespace rhm
{
@@ -47,11 +47,17 @@
_dtok_master_txn_list(),
_dtok_rd_list(),
_dtok_deq_list(),
+ _rd_aio_cv(_rd_aio_mutex),
+ _wr_full_cv(_wr_full_mutex),
+ _rd_list_cv(_rd_list_mutex),
+ _deq_list_cv(_deq_list_mutex),
_tcp(),
_tcrp()
{
- pthread_mutex_init(&_rd_mutex, 0);
- pthread_mutex_init(&_deq_mutex, 0);
+ pthread_mutex_init(&_rd_aio_mutex, 0);
+ pthread_mutex_init(&_wr_full_mutex, 0);
+ pthread_mutex_init(&_rd_list_mutex, 0);
+ pthread_mutex_init(&_deq_list_mutex, 0);
}
jrnl_instance::jrnl_instance(const jrnl_init_params::shared_ptr& p):
@@ -63,17 +69,25 @@
_dtok_master_txn_list(),
_dtok_rd_list(),
_dtok_deq_list(),
+ _rd_aio_cv(_rd_aio_mutex),
+ _wr_full_cv(_wr_full_mutex),
+ _rd_list_cv(_rd_list_mutex),
+ _deq_list_cv(_deq_list_mutex),
_tcp(),
_tcrp()
{
- pthread_mutex_init(&_rd_mutex, 0);
- pthread_mutex_init(&_deq_mutex, 0);
+ pthread_mutex_init(&_rd_aio_mutex, 0);
+ pthread_mutex_init(&_wr_full_mutex, 0);
+ pthread_mutex_init(&_rd_list_mutex, 0);
+ pthread_mutex_init(&_deq_list_mutex, 0);
}
jrnl_instance::~jrnl_instance()
{
- pthread_mutex_destroy(&_rd_mutex);
- pthread_mutex_destroy(&_deq_mutex);
+ pthread_mutex_destroy(&_rd_aio_mutex);
+ pthread_mutex_destroy(&_wr_full_mutex);
+ pthread_mutex_destroy(&_rd_list_mutex);
+ pthread_mutex_destroy(&_deq_list_mutex);
}
@@ -97,19 +111,19 @@
{
std::vector<std::string> prep_txn_list;
u_int64_t highest_rid;
- recover(aio_wr_callback, prep_txn_list, highest_rid);
+ recover(aio_rd_callback, aio_wr_callback, prep_txn_list, highest_rid);
recover_complete();
}
catch (const rhm::journal::jexception& e)
{
if (e.err_code() == rhm::journal::jerrno::JERR_JDIR_STAT)
- initialize(aio_wr_callback);
+ initialize(aio_rd_callback, aio_wr_callback);
else
throw;
}
}
else
- initialize(aio_wr_callback);
+ initialize(aio_rd_callback, aio_wr_callback);
}
catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -175,33 +189,29 @@
else
res = enqueue_data_record(msgp, msg_size, msg_size, p.get(), transient);
}
- if (res == rhm::journal::RHM_IORES_SUCCESS)
+ switch (res)
{
+ case rhm::journal::RHM_IORES_SUCCESS:
sleep_cnt = 0U;
_tcrp->incr_num_enq();
- }
- else if (res == rhm::journal::RHM_IORES_ENQCAPTHRESH)
- {
- if (sleep_cnt++ > MAX_SLEEP_CNT)
+ if (p->has_xid() && !_tcp->auto_deq())
+ commit(p.get());
+ break;
+ case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+ if (get_wr_events() == 0)
{
- std::ostringstream oss;
- oss << "ERROR: Timeout waiting for journal \"" << _jid;
- oss << "\" to empty. (RHM_IORES_ENQCAPTHRESH)";
- _tcrp->add_exception(oss.str());
+ rhm::journal::slock sl(&_wr_full_mutex);
+ _wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
}
- else
- ::usleep(SLEEP_US); // 1ms
- }
- else
- {
+ break;
+ default:
std::ostringstream oss;
oss << "ERROR: enqueue operation in journal \"" << _jid << "\" returned ";
oss << rhm::journal::iores_str(res) << ".";
_tcrp->add_exception(oss.str());
}
-
}
- flush();
+ flush(true);
}
catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -216,31 +226,21 @@
read_arg::read_mode_t rd_mode = _args_ptr->read_mode.val();
if (rd_mode != read_arg::NONE)
{
- unsigned sleep_cnt = 0U;
while (_tcrp->num_rproc() < _tcp->num_msgs() && !_tcrp->exception())
{
- if (_dtok_rd_list.empty())
+ journal::data_tok* dtokp = 0;
{
- if (sleep_cnt++ > MAX_SLEEP_CNT)
+ rhm::journal::slock sl(&_rd_list_mutex);
+ if (_dtok_rd_list.empty())
+ _rd_list_cv.wait();
+ if (!_dtok_rd_list.empty())
{
- std::ostringstream oss;
- oss << "ERROR: Timeout waiting for enqueue AIO in journal \"" << _jid;
- oss << "\": num_enq=" << _tcp->num_msgs();
- oss << " num_deq=" << _tcrp->num_deq();
- _tcrp->add_exception(oss.str());
- }
- if (get_wr_events() == 0)
- ::usleep(SLEEP_US); // 1ms
- }
- else
- {
- sleep_cnt = 0U;
- journal::data_tok* dtokp;
- {
- rhm::journal::slock sl(&_rd_mutex);
dtokp = _dtok_rd_list.front();
_dtok_rd_list.pop_front();
}
+ }
+ if (dtokp)
+ {
_tcrp->incr_num_rproc();
bool do_read = true;
@@ -260,15 +260,16 @@
bool ext = false;
rhm::journal::iores res = read_data_record(&dptr, dsize, &xptr, xsize, tr,
ext, dtokp);
- if (res == rhm::journal::RHM_IORES_SUCCESS)
+ switch (res)
{
+ case rhm::journal::RHM_IORES_SUCCESS:
{
- rhm::journal::slock sl(&_deq_mutex);
+ rhm::journal::slock sl(&_deq_list_mutex);
_dtok_deq_list.push_back(dtokp);
+ _deq_list_cv.broadcast();
}
read_compl = true;
_tcrp->incr_num_read();
- sleep_cnt = 0;
// clean up
if (xsize)
@@ -277,26 +278,23 @@
::free(dptr);
dptr = 0;
xptr = 0;
- }
- else if (res == rhm::journal::RHM_IORES_AIO_WAIT)
- {
- if (sleep_cnt++ > MAX_SLEEP_CNT)
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (get_rd_events() == 0)
{
- std::ostringstream oss;
- oss << "ERROR: Timeout waiting for read AIO in journal \"" << _jid;
- oss << "\": num_enq=" << _tcp->num_msgs();
- oss << " num_deq=" << _tcrp->num_deq();
- _tcrp->add_exception(oss.str());
+ rhm::journal::slock sl(&_rd_aio_mutex);
+ _rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); // MAX_RD_WAIT in ms
}
- if (get_rd_events() == 0)
- ::usleep(SLEEP_US); // 1ms
- }
- else
- {
+ break;
+ default:
std::ostringstream oss;
oss << "ERROR: read operation in journal \"" << _jid;
oss << "\" returned " << rhm::journal::iores_str(res) << ".";
_tcrp->add_exception(oss.str());
+ {
+ rhm::journal::slock sl(&_deq_list_mutex);
+ _deq_list_cv.broadcast(); // wake up deq thread
+ }
}
}
}
@@ -315,31 +313,21 @@
{
if (_tcp->auto_deq())
{
- unsigned sleep_cnt = 0U;
while(_tcrp->num_deq() < _tcp->num_msgs() && !_tcrp->exception())
{
- if (_dtok_deq_list.empty())
+ journal::data_tok* dtokp = 0;
{
- if (sleep_cnt++ > MAX_SLEEP_CNT)
+ rhm::journal::slock sl(&_deq_list_mutex);
+ if (_dtok_deq_list.empty())
+ _deq_list_cv.wait();
+ if (!_dtok_deq_list.empty())
{
- std::ostringstream oss;
- oss << "ERROR: Timeout waiting for AIO in journal \"" << _jid;
- oss << "\": num_enq=" << _tcp->num_msgs();
- oss << " num_deq=" << _tcrp->num_deq();
- _tcrp->add_exception(oss.str());
- }
- if (get_wr_events() == 0)
- ::usleep(SLEEP_US); // 1ms
- }
- else
- {
- journal::data_tok* dtokp;
- {
- rhm::journal::slock sl(&_deq_mutex);
dtokp = _dtok_deq_list.front();
_dtok_deq_list.pop_front();
}
-
+ }
+ if (dtokp)
+ {
rhm::journal::iores res;
if (dtokp->has_xid())
res = dequeue_txn_data_record(dtokp, dtokp->xid());
@@ -357,10 +345,9 @@
oss << "\" returned " << rhm::journal::iores_str(res) << ".";
_tcrp->add_exception(oss.str());
}
- sleep_cnt = 0U;
}
}
- flush();
+ flush(true);
}
}
catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
@@ -409,11 +396,17 @@
// static AIO callback fns
void
+jrnl_instance::aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& /*pil*/)
+{
+ jrnl_instance::handle_rd_callback(journal);
+}
+
+void
jrnl_instance::aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl)
{
for (std::vector<journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
- if ((*i)->wstate() == journal::data_tok::ENQ)
- jrnl_instance::push_dtokp(journal, *i);
+ if ((*i)->wstate() == journal::data_tok::ENQ || (*i)->wstate() == journal::data_tok::DEQ)
+ jrnl_instance::handle_wr_callback(journal, *i);
}
} // namespace jtt
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -29,14 +29,13 @@
#include "test_case.hpp"
#include <boost/shared_ptr.hpp>
-#include <deque>
+#include <jrnl/cvar.hpp>
#include <jrnl/data_tok.hpp>
#include <jrnl/jcntl.hpp>
#include <jrnl/slock.hpp>
+#include <list>
#include <vector>
-#include <iostream> // debug
-
namespace rhm
{
namespace jtt
@@ -53,10 +52,16 @@
const args* _args_ptr;
std::vector<dtok_ptr> _dtok_master_enq_list;
std::vector<dtok_ptr> _dtok_master_txn_list;
- std::deque<journal::data_tok*> _dtok_rd_list;
- std::deque<journal::data_tok*> _dtok_deq_list;
- pthread_mutex_t _rd_mutex; ///< Mutex for _dtok_rd_list
- pthread_mutex_t _deq_mutex; ///< Mutex for _dtok_deq_list
+ std::list<journal::data_tok*> _dtok_rd_list;
+ std::list<journal::data_tok*> _dtok_deq_list;
+ pthread_mutex_t _rd_aio_mutex; ///< Mutex for read aio wait conditions
+ rhm::journal::cvar _rd_aio_cv; ///< Condition var for read aio wait conditions
+ pthread_mutex_t _wr_full_mutex; ///< Mutex for write full conditions
+ rhm::journal::cvar _wr_full_cv; ///< Condition var for write full conditions
+ pthread_mutex_t _rd_list_mutex; ///< Mutex for _dtok_rd_list
+ rhm::journal::cvar _rd_list_cv; ///< Condition var for _dtok_rd_list
+ pthread_mutex_t _deq_list_mutex; ///< Mutex for _dtok_deq_list
+ rhm::journal::cvar _deq_list_cv; ///< Condition var for _dtok_deq_list
pthread_t _enq_thread;
pthread_t _deq_thread;
pthread_t _read_thread;
@@ -79,16 +84,40 @@
void tc_wait_compl() throw ();
private:
- inline void push_dtokp(rhm::journal::data_tok* dtokp)
+ inline void handle_rd_callback()
{
- if (_args_ptr->read_mode.val() == read_arg::NONE)
- { rhm::journal::slock sl(&_deq_mutex); _dtok_deq_list.push_back(dtokp); }
- else
- { rhm::journal::slock sl(&_rd_mutex); _dtok_rd_list.push_back(dtokp); }
+ rhm::journal::slock sl(&_rd_aio_mutex);
+ _rd_aio_cv.broadcast();
}
- static inline void push_dtokp(jcntl* jp, rhm::journal::data_tok* dtokp)
- { static_cast<jrnl_instance*>(jp)->push_dtokp(dtokp); }
+ static inline void handle_rd_callback(jcntl* jp)
+ { static_cast<jrnl_instance*>(jp)->handle_rd_callback(); }
+ inline void handle_wr_callback(rhm::journal::data_tok* dtokp)
+ {
+ if (dtokp->wstate() == journal::data_tok::ENQ)
+ {
+ if (_args_ptr->read_mode.val() == read_arg::NONE)
+ {
+ rhm::journal::slock sl(&_deq_list_mutex);
+ _dtok_deq_list.push_back(dtokp);
+ _deq_list_cv.broadcast();
+ }
+ else
+ {
+ rhm::journal::slock sl(&_rd_list_mutex);
+ _dtok_rd_list.push_back(dtokp);
+ _rd_list_cv.broadcast();
+ }
+ }
+ else // DEQ
+ {
+ rhm::journal::slock sl(&_wr_full_mutex);
+ _wr_full_cv.broadcast();
+ }
+ }
+ static inline void handle_wr_callback(jcntl* jp, rhm::journal::data_tok* dtokp)
+ { static_cast<jrnl_instance*>(jp)->handle_wr_callback(dtokp); }
+
void run_enq() throw ();
inline static void* run_enq(void* p)
{ static_cast<jrnl_instance*>(p)->run_enq(); return 0; }
@@ -107,6 +136,7 @@
rhm::journal::data_tok* prep_txn_dtok(const rhm::journal::data_tok* dtokp);
// static callbacks
+ static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
};
Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -37,16 +37,12 @@
_num_deq(0),
_num_read(0),
_num_rproc(0),
+ _start_time(),
+ _stop_time(),
_stopped(false),
+ _test_time(),
_exception_list()
-{
- _start_time.tv_sec = 0;
- _start_time.tv_nsec = 0;
- _stop_time.tv_sec = 0;
- _stop_time.tv_nsec = 0;
- _test_time.tv_sec = 0;
- _test_time.tv_nsec = 0;
-}
+{}
test_case_result::~test_case_result()
{}
@@ -54,12 +50,7 @@
const std::string
test_case_result::test_time_str() const
{
- if (_test_time.tv_nsec == 0)
- return "0.0";
- std::ostringstream oss;
- oss << _test_time.tv_sec << ".";
- oss << std::setfill('0') << std::setw(9) << _test_time.tv_nsec;
- return oss.str();
+ return _test_time.str(9);
}
void
@@ -101,12 +92,9 @@
_num_enq = 0;
_num_deq = 0;
_num_read = 0;
- _start_time.tv_sec = 0;
- _start_time.tv_nsec = 0;
- _stop_time.tv_sec = 0;
- _stop_time.tv_nsec = 0;
- _test_time.tv_sec = 0;
- _test_time.tv_nsec = 0;
+ _start_time.set_zero();
+ _stop_time.set_zero();
+ _test_time.set_zero();
_exception_list.clear();
}
@@ -207,16 +195,8 @@
void
test_case_result::calc_test_time()
{
- if (_start_time.tv_sec > 0 && _stop_time.tv_sec >= _start_time.tv_sec)
- {
- _test_time.tv_sec = _stop_time.tv_sec - _start_time.tv_sec;
- _test_time.tv_nsec = _stop_time.tv_nsec - _start_time.tv_nsec;
- if (_test_time.tv_nsec < 0)
- {
- _test_time.tv_nsec += 1000000000;
- _test_time.tv_sec--;
- }
- }
+ if (!_start_time.is_zero() && _stop_time >= _start_time)
+ _test_time = _stop_time - _start_time;
}
} // namespace jtt
Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -27,6 +27,7 @@
#include <boost/shared_ptr.hpp>
#include <deque>
#include <jrnl/jexception.hpp>
+#include <jrnl/time_ns.hpp>
#include <string>
namespace rhm
@@ -48,10 +49,10 @@
u_int32_t _num_deq;
u_int32_t _num_read; // Messages actually read
u_int32_t _num_rproc; // Messages handled by read thread (not all are read)
- timespec _start_time;
- timespec _stop_time;
+ journal::time_ns _start_time;
+ journal::time_ns _stop_time;
bool _stopped;
- timespec _test_time;
+ journal::time_ns _test_time;
elist _exception_list;
public:
@@ -68,14 +69,13 @@
inline const u_int32_t num_rproc() const { return _num_rproc; }
inline const u_int32_t incr_num_rproc() { return ++_num_rproc; }
- inline const timespec& start_time() const { return _start_time; }
+ inline const journal::time_ns& start_time() const { return _start_time; }
inline void set_start_time() { ::clock_gettime(CLOCK_REALTIME, &_start_time); }
- inline const timespec& stop_time() const { return _stop_time; }
+ inline const journal::time_ns& stop_time() const { return _stop_time; }
inline void set_stop_time()
{ ::clock_gettime(CLOCK_REALTIME, &_stop_time); calc_test_time(); }
- inline void set_test_time(const timespec& ts)
- { _test_time.tv_sec = ts.tv_sec; _test_time.tv_nsec = ts.tv_nsec; }
- inline const timespec& test_time() const { return _test_time; }
+ inline void set_test_time(const journal::time_ns& ts) { _test_time = ts; }
+ inline const journal::time_ns& test_time() const { return _test_time; }
const std::string test_time_str() const;
void add_exception(const journal::jexception& e, const bool set_stop_time_flag = true);
Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.cpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -158,16 +158,10 @@
return oss.str();
}
-const timespec&
-test_case_result_agregation::add_test_time(const timespec& t)
+const journal::time_ns&
+test_case_result_agregation::add_test_time(const journal::time_ns& t)
{
- _test_time.tv_sec += t.tv_sec;
- _test_time.tv_nsec += t.tv_nsec;
- if (_test_time.tv_nsec > 999999999)
- {
- _test_time.tv_nsec -= 1000000000;
- _test_time.tv_sec++;
- }
+ _test_time += t;
return _test_time;
}
Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp 2008-02-06 14:58:42 UTC (rev 1644)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_result_agregation.hpp 2008-02-06 15:39:52 UTC (rev 1645)
@@ -71,7 +71,7 @@
private:
const std::string str_full(const bool last_only) const;
const std::string str_summary(const bool last_only) const;
- const timespec& add_test_time(const timespec& t);
+ const journal::time_ns& add_test_time(const journal::time_ns& t);
};
} // namespace jtt
More information about the rhmessaging-commits
mailing list