Author: kpvdr
Date: 2010-07-26 15:48:20 -0400 (Mon, 26 Jul 2010)
New Revision: 4148
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/TxnCtxt.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/aio.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
Log:
Performance and efficiency improvement - removed loops using usleep for blocking calls
waiting on AIO getevents() and used built-in timeout instead.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -237,8 +237,8 @@
_emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()),
qpid::management::ManagementAgent::SEV_NOTE);
}
-#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
-#define AIO_SLEEP_TIME_US 10 // 0.01 ms
+//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
+//#define AIO_SLEEP_TIME_US 10 // 0.01 ms
// Return true if content is recovered from store; false if content is external and must
be recovered from an external store.
// Throw exception for all errors.
bool
@@ -272,7 +272,6 @@
bool transient = false;
bool done = false;
bool rid_found = false;
- unsigned aio_sleep_cnt = 0;
oooRidList.clear();
while (!done) {
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient,
_external, &_dtok);
@@ -295,13 +294,10 @@
}
break;
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
- get_wr_events();
- usleep(AIO_SLEEP_TIME_US);
- } else {
+ if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT) {
std::stringstream ss;
ss << "read_data_record() returned " <<
mrg::journal::iores_str(res);
- ss << "; exceeded maximum wait time";
+ ss << "; timed out waiting for page to be
processed.";
throw jexception(mrg::journal::jerrno::JERR__TIMEOUT,
ss.str().c_str(), "JournalImpl",
"loadMsgContent");
}
@@ -425,30 +421,6 @@
}
}
-#define MAX_INVALID_RETRYS 5 // This will yield a total wait time of 10 sec with 5 log
messages at 2 sec intervals
-iores
-JournalImpl::read_data_record(void** const data_buff, size_t& tot_data_len, void**
const xid_buff, size_t& xid_len,
- bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
bool ignore_pending_txns)
-{
- int retry_cnt = 0;
- iores res;
- do {
- res = jcntl::read_data_record(data_buff, tot_data_len, xid_buff, xid_len,
transient, external, dtokp, ignore_pending_txns);
- if (res == mrg::journal::RHM_IORES_RCINVALID) {
- retry_cnt++;
- std::ostringstream oss;
- if (retry_cnt < MAX_INVALID_RETRYS) {
- oss << "Store read pipeline on queue " << _jid
<< " timed out waiting for journal header file read, retrying...";
- log(LOG_WARN, oss.str());
- } else {
- oss << "Store read pipeline on queue " << _jid
<< " timed out waiting for journal header file read, aborting read with
RHM_IORES_RCINVALID";
- log(LOG_ERROR, oss.str());
- }
- }
- } while (res == mrg::journal::RHM_IORES_RCINVALID && retry_cnt <
MAX_INVALID_RETRYS);
- return res;
-}
-
void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
@@ -524,7 +496,7 @@
{
qpid::sys::Mutex::ScopedLock sl(_getf_lock);
getEventsTimerSetFlag = false;
- if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
+ if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/JournalImpl.h 2010-07-26 19:48:20 UTC (rev 4148)
@@ -185,10 +185,6 @@
void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const
std::string& xid, const bool txn_coml_commit = false);
- mrg::journal::iores read_data_record(void** const data_buff, size_t&
tot_data_len, void** const xid_buff,
- size_t& xid_len, bool& transient,
bool& external, mrg::journal::data_tok* const dtokp,
- bool ignore_pending_txns = false);
-
void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
Modified: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/TxnCtxt.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -1,7 +1,6 @@
#include "TxnCtxt.h"
#include <sstream>
-#include <unistd.h> // ::usleep()
#include "jrnl/jexception.hpp"
#include "StoreException.h"
@@ -76,42 +75,35 @@
TxnCtxt::~TxnCtxt() { if(txn) abort(); }
-#define MAX_SYNC_SLEEPS 100000 // tot: ~1 sec
-#define SYNC_SLEEP_TIME_US 10 // 0.01 ms
-
void TxnCtxt::sync() {
- bool allWritten = false;
- bool firstloop = true;
- long sleep_cnt = 0L;
- while (loggedtx && !allWritten) {
- if (sleep_cnt > MAX_SYNC_SLEEPS)
THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for
TxnCtxt::sync()"));
- if (!firstloop) {
- ::usleep(SYNC_SLEEP_TIME_US);
- sleep_cnt++;
- } // move this into the get events call aiolib..
- allWritten = true;
- for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
+ if (loggedtx) {
+ try {
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+ jrnl_flush(static_cast<JournalImpl*>(*i));
+ if (preparedXidStorePtr)
+ jrnl_flush(preparedXidStorePtr);
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++)
+ jrnl_sync(static_cast<JournalImpl*>(*i),
&journal::jcntl::_aio_cmpl_timeout);
+ if (preparedXidStorePtr)
+ jrnl_sync(preparedXidStorePtr, &journal::jcntl::_aio_cmpl_timeout);
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") +
e.what());
}
- if (preparedXidStorePtr)
- sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
- firstloop = false;
}
}
-void TxnCtxt::sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
- try {
- if (jc && !(jc->is_txn_synced(getXid()))) {
- if (firstloop)
- jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
+void TxnCtxt::jrnl_flush(JournalImpl* jc) {
+ if (jc && !(jc->is_txn_synced(getXid())))
+ jc->flush();
}
+void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) {
+ if (!jc || jc->is_txn_synced(getXid()))
+ return;
+ if (jc->get_wr_events(timeout) == journal::jcntl::AIO_TIMEOUT && timeout)
+ THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for
TxnCtxt::jrnl_sync()"));
+}
+
void TxnCtxt::begin(DbEnv* env, bool sync) {
env->txn_begin(0, &txn, 0);
if (sync)
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/TxnCtxt.h 2010-07-26 19:48:20 UTC (rev 4148)
@@ -69,6 +69,8 @@
virtual void completeTxn(bool commit);
void commitTxn(JournalImpl* jc, bool commit);
+ void jrnl_flush(JournalImpl* jc);
+ void jrnl_sync(JournalImpl* jc, timespec* timeout);
public:
TxnCtxt(IdSequence* _loggedtx=NULL);
@@ -81,7 +83,6 @@
*@return if the data successfully synced.
*/
void sync();
- void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
void begin(DbEnv* env, bool sync = false);
void commit();
void abort();
Modified: store/trunk/cpp/lib/jrnl/aio.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/aio.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -68,7 +68,7 @@
return ::io_submit(ctx, nr, aios);
}
- static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event*
events, timespec* timeout)
+ static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event*
events, timespec* const timeout)
{
return ::io_getevents(ctx, min_nr, nr, events, timeout);
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -54,6 +54,27 @@
namespace journal
{
+#define AIO_CMPL_TIMEOUT_SEC 5
+#define AIO_CMPL_TIMEOUT_NSEC 0
+#define FINAL_AIO_CMPL_TIMEOUT_SEC 15
+#define FINAL_AIO_CMPL_TIMEOUT_NSEC 0
+
+// Static
+timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when
stopping or finalizing
+bool jcntl::_init = init_statics();
+bool jcntl::init_statics()
+{
+ _aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC;
+ _aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC;
+ _final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC;
+ _final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
+ return true;
+}
+int32_t jcntl::AIO_TIMEOUT = int32_t(-1);
+int32_t jcntl::THREAD_BLOCKED = int32_t(-2);
+
+
// Functions
jcntl::jcntl(const std::string& jid, const std::string& jdir, const
std::string& base_filename):
@@ -263,31 +284,21 @@
return _rmgr.discard(dtokp);
} */
-// These two combined make a wait time of approx. 2 sec.
-#define MAX_RCINVALID_CNT 20000 // tot: ~ 2 sec
-#define RCINVALID_SLEEP_TIME_US 100 // 0.1 ms
iores
jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize,
bool& transient, bool& external, data_tok* const dtokp, bool
ignore_pending_txns)
{
check_rstatus("read_data");
- unsigned cnt = 0;
- iores res;
- do
+ iores res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp,
ignore_pending_txns);
+ if (res == RHM_IORES_RCINVALID)
{
+ get_wr_events(0); // check for outstanding write events
+ iores sres = _rmgr.synchronize(); // flushes all outstanding read events
+ if (sres != RHM_IORES_SUCCESS)
+ return sres;
+ _rmgr.wait_for_validity(&_aio_cmpl_timeout, true); // throw if timeout
occurs
res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp,
ignore_pending_txns);
- if (res == RHM_IORES_RCINVALID)
- {
- get_wr_events(); // check for outstanding write events
- iores sres = _rmgr.synchronize();
- if (sres != RHM_IORES_SUCCESS)
- return sres;
- if (cnt > 0)
- ::usleep(RCINVALID_SLEEP_TIME_US);
- }
- cnt++;
}
- while (cnt < MAX_RCINVALID_CNT && res == RHM_IORES_RCINVALID);
return res;
}
@@ -346,19 +357,19 @@
return _wmgr.is_txn_synced(xid);
}
-u_int32_t
-jcntl::get_wr_events()
+int32_t
+jcntl::get_wr_events(timespec* const timeout)
{
stlock t(_wr_mutex);
- if (t.locked())
- return _wmgr.get_events(pmgr::UNUSED);
- return 0;
+ if (!t.locked())
+ return THREAD_BLOCKED;
+ return _wmgr.get_events(pmgr::UNUSED, timeout);
}
-u_int32_t
-jcntl::get_rd_events()
+int32_t
+jcntl::get_rd_events(timespec* const timeout)
{
- return _rmgr.get_events();
+ return _rmgr.get_events(pmgr::AIO_COMPLETE, timeout);
}
void
@@ -429,21 +440,14 @@
_rmgr.invalidate();
}
-#define MAX_AIO_CMPL_SLEEPS 1000000 // tot: ~10 sec
-#define AIO_CMPL_SLEEP_US 10 // 0.01 ms
-
void
jcntl::fhdr_wr_sync(const u_int16_t lid)
{
- long cnt = 0;
fcntl* fcntlp = _lpmgr.get_fcntlp(lid);
- get_wr_events();
while (fcntlp->wr_fhdr_aio_outstanding())
{
- if (++cnt > MAX_AIO_CMPL_SLEEPS)
+ if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl",
"fhdr_wr_sync");
- ::usleep(AIO_CMPL_SLEEP_US);
- get_wr_events();
}
}
@@ -496,36 +500,28 @@
void
jcntl::aio_cmpl_wait()
{
- u_int32_t cnt = 0;
while (_wmgr.get_aio_evt_rem())
{
- get_wr_events();
- if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ if (get_wr_events(&_aio_cmpl_timeout) == AIO_TIMEOUT)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl",
"aio_cmpl_wait");
- ::usleep(AIO_CMPL_SLEEP_US);
}
}
bool
jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
{
- // TODO: factor out the common while loops below into a common fn
- u_int32_t cnt = 0;
resout = res;
if (res == RHM_IORES_PAGE_AIOWAIT)
{
while (_wmgr.curr_pg_blocked())
{
- _wmgr.get_events(pmgr::UNUSED);
- if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
{
std::ostringstream oss;
- oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT;
wmgr_status: ";
- oss << _wmgr.status_str();
+ oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT;
wmgr_status: " << _wmgr.status_str();
this->log(LOG_CRITICAL, oss.str());
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl",
"handle_aio_wait");
}
- ::usleep(AIO_CMPL_SLEEP_US);
}
return true;
}
@@ -533,16 +529,13 @@
{
while (_wmgr.curr_file_blocked())
{
- _wmgr.get_events(pmgr::UNUSED);
- if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == AIO_TIMEOUT)
{
std::ostringstream oss;
- oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT;
wmgr_status: ";
- oss << _wmgr.status_str();
+ oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT;
wmgr_status: " << _wmgr.status_str();
this->log(LOG_CRITICAL, oss.str());
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl",
"handle_aio_wait");
}
- ::usleep(AIO_CMPL_SLEEP_US);
}
_wrfc.wr_reset();
resout = RHM_IORES_SUCCESS;
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -147,6 +147,11 @@
smutex _wr_mutex; ///< Mutex for journal writes
public:
+ static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
+ static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio
returns when stopping or finalizing
+ static int32_t AIO_TIMEOUT;
+ static int32_t THREAD_BLOCKED;
+
/**
* \brief Journal constructor.
*
@@ -527,7 +532,7 @@
* dequeue() operations, but if these operations cease, then this call needs to be
made to
* force the processing of any outstanding AIO operations.
*/
- u_int32_t get_wr_events();
+ int32_t get_wr_events(timespec* const timeout);
/**
* \brief Forces a check for returned AIO read events.
@@ -536,7 +541,7 @@
* operations, but if these operations cease, then this call needs to be made to
force the
* processing of any outstanding AIO operations.
*/
- u_int32_t get_rd_events();
+ int32_t get_rd_events(timespec* const timeout);
/**
* \brief Stop the journal from accepting any further requests to read or write
data.
@@ -659,6 +664,9 @@
static fcntl* new_fcntl(jcntl* const jcp, const u_int16_t lid, const u_int16_t
fid, const rcvdat* const rdp);
protected:
+ static bool _init;
+ static bool init_statics();
+
/**
* \brief Check status of journal before allowing write operations.
*/
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -123,7 +123,7 @@
pmgr(jcntl* jc, enq_map& emap, txn_map& tmap);
virtual ~pmgr();
- virtual u_int32_t get_events(page_state state) = 0;
+ virtual int32_t get_events(page_state state, timespec* const timeout, bool flush
= false) = 0;
inline u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
static const char* page_state_str(page_state ps);
inline u_int32_t cache_pgsize_sblks() const { return _cache_pgsize_sblks; }
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -229,18 +229,23 @@
}
}
-u_int32_t
-rmgr::get_events(page_state state)
+int32_t
+rmgr::get_events(page_state state, timespec* const timeout, bool flush)
{
- int ret = 0;
- if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(),
_aio_event_arr, 0)) < 0)
+ if (_aio_evt_rem == 0) // no events to get
+ return 0;
+
+ int32_t ret;
+ if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1,
_aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) <
0)
{
- if (ret == -EINTR) // No events
+ if (ret == -EINTR) // Interrupted by signal
return 0;
std::ostringstream oss;
oss << "io_getevents() failed: " << std::strerror(-ret)
<< " (" << ret << ")";
throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr",
"get_events");
}
+ if (ret == 0 && timeout)
+ return _jc->AIO_TIMEOUT;
std::vector<u_int16_t> pil;
pil.reserve(ret);
@@ -315,26 +320,16 @@
_rrfc.set_invalid();
}
-#define MAX_AIO_SLEEPS 1000 // 10 sec
-#define AIO_SLEEP_TIME 10000 // 10 ms
void
-rmgr::init_validation()
+rmgr::flush(timespec* timeout)
{
// Wait for any outstanding AIO read operations to complete before synchronizing
- int aio_sleep_cnt = 0;
while (_aio_evt_rem)
{
- get_events();
- if (_aio_evt_rem)
+ if (get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT) // timed out,
nothing returned
{
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
- {
- get_events();
- usleep(AIO_SLEEP_TIME);
- }
- else
- throw jexception(jerrno::JERR__TIMEOUT,
- "Invalidate timed out waiting for outstanding read aio to
return", "rmgr", "invalidate");
+ throw jexception(jerrno::JERR__TIMEOUT,
+ "Timed out waiting for outstanding read aio to
return", "rmgr", "init_validation");
}
}
@@ -346,11 +341,24 @@
_pg_offset_dblks = 0;
}
+bool
+rmgr::wait_for_validity(timespec* timeout, const bool throw_on_timeout)
+{
+ bool timed_out = false;
+ while (!_rrfc.is_valid() && !timed_out)
+ {
+ timed_out = get_events(AIO_COMPLETE, timeout) == _jc->AIO_TIMEOUT;
+ if (timed_out && throw_on_timeout)
+ throw jexception(jerrno::JERR__TIMEOUT, "Timed out waiting for read
validity", "rmgr", "wait_for_validity");
+ }
+ return _rrfc.is_valid();
+}
+
iores
rmgr::pre_read_check(data_tok* dtokp)
{
if (_aio_evt_rem)
- get_events();
+ get_events(AIO_COMPLETE, 0);
if (!_rrfc.is_valid())
return RHM_IORES_RCINVALID;
@@ -524,11 +532,13 @@
rmgr::aio_cycle()
{
// Perform validity checks
- if (_fhdr_rd_outstanding)
+ if (_fhdr_rd_outstanding) // read of file header still outstanding in aio
return RHM_IORES_SUCCESS;
if (!_rrfc.is_valid())
{
- init_validation(); // flush outstanding read aio ops (if any), set all pages to
UNUSED state, reset counters.
+ // Flush and reset all read states and pointers
+ flush(&jcntl::_aio_cmpl_timeout);
+
_jc->get_earliest_fid(); // determine initial file to read; calls
_rrfc.set_findex() to set value
// If this file has not yet been written to, return RHM_IORES_EMPTY
if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
@@ -570,7 +580,7 @@
else if (num_compl == _cache_num_pages) // This condition exists after invalidation
res = init_aio_reads(0, _cache_num_pages);
if (outstanding)
- get_events();
+ get_events(AIO_COMPLETE, 0);
return res;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -78,10 +78,11 @@
iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize, bool& transient, bool& external,
data_tok* dtokp,
bool ignore_pending_txns);
- u_int32_t get_events(page_state state = AIO_COMPLETE);
+ int32_t get_events(page_state state, timespec* const timeout, bool flush =
false);
void recover_complete();
inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS;
return aio_cycle(); }
void invalidate();
+ bool wait_for_validity(timespec* const timeout, const bool throw_on_timeout =
false);
/* TODO (if required)
const iores get(const u_int64_t& rid, const std::size_t& dsize, const
std::size_t& dsize_avail,
@@ -91,7 +92,7 @@
private:
void clean();
- void init_validation();
+ void flush(timespec* timeout);
iores pre_read_check(data_tok* dtokp);
iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -647,7 +647,7 @@
_page_cb_arr[_pg_index]._state = IN_USE;
}
}
- get_events(UNUSED);
+ get_events(UNUSED, 0);
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
return res;
@@ -662,18 +662,24 @@
return res;
}
-u_int32_t
-wmgr::get_events(page_state state)
+int32_t
+wmgr::get_events(page_state state, timespec* const timeout, bool flush)
{
+ if (_aio_evt_rem == 0) // no events to get
+ return 0;
+
int ret = 0;
- if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(),
_aio_event_arr, 0)) < 0)
+ if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1,
_aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) <
0)
{
std::ostringstream oss;
oss << "io_getevents() failed: " << std::strerror(-ret)
<< " (" << ret << ")";
throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr",
"get_events");
}
- u_int32_t tot_data_toks = 0;
+ if (ret == 0 && timeout)
+ return _jc->AIO_TIMEOUT;
+
+ int32_t tot_data_toks = 0;
for (int i=0; i<ret; i++) // Index of returned AIOs
{
if (_aio_evt_rem == 0)
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -115,7 +115,7 @@
iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t
xid_len);
iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t
xid_len);
iores flush();
- u_int32_t get_events(page_state state);
+ int32_t get_events(page_state state, timespec* const timeout, bool flush =
false);
bool is_txn_synced(const std::string& xid);
inline bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state !=
UNUSED; }
inline bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; }
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/Makefile.am 2010-07-26 19:48:20 UTC (rev 4148)
@@ -29,11 +29,7 @@
TMP_DATA_DIR=$(abs_srcdir)/tmp_data_dir
TMP_PYTHON_TEST_DIR=$(abs_srcdir)/python_tests.tmp
-if DO_CLUSTER_TESTS
-SUBDIRS = jrnl . cluster
-else
SUBDIRS = jrnl .
-endif
TESTS = \
SimpleTest \
Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -547,7 +547,7 @@
{
if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
{
- jc.get_wr_events();
+ jc.get_wr_events(0); // *** GEV2
usleep(AIO_SLEEP_TIME);
}
else
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2010-07-26 15:58:14 UTC (rev 4147)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2010-07-26 19:48:20 UTC (rev 4148)
@@ -191,7 +191,7 @@
_tcrp->add_exception("Timeout waiting for
RHM_IORES_ENQCAPTHRESH to clear.");
panic();
}
- else if (get_wr_events() == 0)
+ else if (get_wr_events(0) == 0) // *** GEV2
{
mrg::journal::slock sl(_wr_full_mutex);
_wr_full_cv.waitintvl(MAX_WR_WAIT * 1000000); // MAX_WR_WAIT in ms
@@ -273,7 +273,7 @@
xptr = 0;
break;
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
- if (get_rd_events() == 0)
+ if (get_rd_events(0) == 0)
{
mrg::journal::slock sl(_rd_aio_mutex);
_rd_aio_cv.waitintvl(MAX_RD_WAIT * 1000000); //
MAX_RD_WAIT in ms