Author: kpvdr
Date: 2008-03-17 17:11:19 -0400 (Mon, 17 Mar 2008)
New Revision: 1771
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/jrnl/enums.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/_st_basic.cpp
store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Bugfix for BZ437880: Small total message journal capacity can cause premature file
overwrite. Added an outstanding aio count to class nlfh and an additional aio return
enumeration to cover this case. Modified the return handler logic in class jcntl to handle
this new return value.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -579,16 +579,16 @@
aio_sleep_cnt = 0;
break;
}
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
- ::usleep(AIO_SLEEP_TIME);
- break;
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap
is empty.
+ read = false;
+ break; // done with all messages. ((add call in jrnl to test that _emap
is empty.
default:
- assert( "Store Error: Unexpected msg state");
+ assert( "Store Error: Unexpected msg state");
} // switch
} // while
} catch (const journal::jexception& e) {
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -147,7 +147,7 @@
&_dtok);
if (res == journal::RHM_IORES_SUCCESS) {
done = true;
- } else if (res == journal::RHM_IORES_AIO_WAIT) {
+ } else if (res == journal::RHM_IORES_PAGE_AIOWAIT) {
if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
get_wr_events();
usleep(AIO_SLEEP_TIME);
Modified: store/trunk/cpp/lib/jrnl/enums.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enums.hpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/enums.hpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -44,7 +44,8 @@
enum _iores
{
RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly.
- RHM_IORES_AIO_WAIT, ///< IO operation suspended - all pages are waiting
for AIO.
+ RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for
AIO.
+ RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for
AIO.
RHM_IORES_EMPTY, ///< During read operations, nothing further is
available to read.
RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
RHM_IORES_FULL, ///< During write operations, the journal files are
full.
@@ -59,7 +60,8 @@
switch (res)
{
case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
- case RHM_IORES_AIO_WAIT: return "RHM_IORES_AIO_WAIT";
+ case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
+ case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
case RHM_IORES_FULL: return "RHM_IORES_FULL";
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -217,7 +217,7 @@
slock s(&_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len,
dtokp, 0,
- 0, transient, false), r));
+ 0, transient, false), r, dtokp));
return r;
}
}
@@ -230,7 +230,7 @@
slock s(&_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient,
- true), r));
+ true), r, dtokp));
return r;
}
}
@@ -245,7 +245,7 @@
slock s(&_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len,
dtokp,
- xid.data(), xid.size(), transient, false), r));
+ xid.data(), xid.size(), transient, false), r, dtokp));
return r;
}
}
@@ -259,7 +259,7 @@
slock s(&_wr_mutex);
iores r;
while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(),
xid.size(),
- transient, true), r));
+ transient, true), r, dtokp));
return r;
}
}
@@ -294,7 +294,7 @@
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r));
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r, dtokp));
return r;
}
}
@@ -306,7 +306,7 @@
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r));
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r, dtokp));
return r;
}
}
@@ -318,7 +318,7 @@
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r));
+ while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp));
return r;
}
}
@@ -330,7 +330,7 @@
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r));
+ while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp));
return r;
}
}
@@ -438,12 +438,13 @@
}
const bool
-jcntl::handle_aio_wait(const iores res, iores& resout)
+jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
{
+ // TODO: factor out the common while loops below into a common fn
+ u_int32_t cnt = 0;
resout = res;
- if (res == RHM_IORES_AIO_WAIT)
+ if (res == RHM_IORES_PAGE_AIOWAIT)
{
- u_int32_t cnt = 0;
while (_wmgr.curr_pg_blocked())
{
_wmgr.get_events(pmgr::UNUSED);
@@ -457,6 +458,25 @@
}
return true;
}
+ else if (res == RHM_IORES_FILE_AIOWAIT)
+ {
+ while (_wmgr.curr_file_blocked())
+ {
+ _wmgr.get_events(pmgr::UNUSED);
+ if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ {
+ // TODO: Log this!
+ std::cout << "**** JERR_JCNTL_AIOCMPLWAIT *** " <<
_wmgr.status_str() << std::endl;
+ throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl",
"handle_aio_wait");
+ }
+ ::usleep(AIO_CMPL_SLEEP);
+ }
+ _wrfc.reset();
+ resout = RHM_IORES_SUCCESS;
+ data_tok::write_state ws = dtp->wstate();
+ return ws == data_tok::ENQ_PART || ws == data_tok::DEQ_PART || ws ==
data_tok::ABORT_PART ||
+ ws == data_tok::COMMIT_PART;
+ }
return false;
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -605,7 +605,7 @@
* \brief Call that blocks until at least one message returns; used to wait for
* AIO wait conditions to clear.
*/
- const bool handle_aio_wait(const iores res, iores& resout);
+ const bool handle_aio_wait(const iores res, iores& resout, const data_tok*
dtp);
/**
* \brief Analyze journal for recovery.
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -55,11 +55,12 @@
_rd_cmpl_cnt_dblks(0),
#ifdef RHM_RDONLY
_wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
+ _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
#else
_wr_subm_cnt_dblks(0),
- _wr_cmpl_cnt_dblks(0)
+ _wr_cmpl_cnt_dblks(0),
#endif
+ _aio_cnt(0)
{}
nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t
jfsize_sblks,
@@ -74,11 +75,12 @@
_rd_cmpl_cnt_dblks(0),
#ifdef RHM_RDONLY
_wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
+ _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
#else
_wr_subm_cnt_dblks(0),
- _wr_cmpl_cnt_dblks(0)
+ _wr_cmpl_cnt_dblks(0),
#endif
+ _aio_cnt(0)
{
initialize(fbasename, fid, jfsize_sblks, ro);
open_fh();
@@ -221,7 +223,8 @@
}
#ifndef RHM_WRONLY
// Journal overflow test - checks if the file to be reset still contains enqueued
records
- if (_rec_enqcnt)
+ // or outstanding aios
+ if (_rec_enqcnt || _aio_cnt)
return false;
#endif
#ifndef RHM_RDONLY
@@ -320,6 +323,15 @@
return _wr_cmpl_cnt_dblks;
}
+const u_int16_t
+nlfh::decr_aio_cnt()
+{
+ if(_aio_cnt == 0)
+ throw jexception(jerrno::JERR__UNDERFLOW, "Decremented _aio_cnt when already
zero", "nlfh",
+ "decr_aio_cnt");
+ return --_aio_cnt;
+}
+
// Debug function
const std::string
nlfh::status_str() const
@@ -327,6 +339,7 @@
std::ostringstream oss;
oss << "fid=" << _fid << " ws=" <<
_wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
oss << " rs=" << _rd_subm_cnt_dblks << " rc="
<< _rd_cmpl_cnt_dblks;
+ oss << " ec=" << _rec_enqcnt << " ac=" <<
_aio_cnt;
return oss.str();
}
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -66,6 +66,7 @@
u_int32_t _rd_cmpl_cnt_dblks; ///< Read file count (data blocks) for
completed AIO
u_int32_t _wr_subm_cnt_dblks; ///< Write file count (data blocks) for
submitted AIO
u_int32_t _wr_cmpl_cnt_dblks; ///< Write file count (data blocks) for
completed AIO
+ u_int16_t _aio_cnt; ///< Outstanding AIO operations on this file
public:
nlfh(const u_int32_t jfsize_sblks);
@@ -104,6 +105,10 @@
inline const size_t wr_cmpl_offs() const { return _wr_cmpl_cnt_dblks *
JRNL_DBLK_SIZE; }
const u_int32_t add_wr_cmpl_cnt_dblks(u_int32_t a);
+ inline const u_int16_t aio_cnt() const { return _aio_cnt; }
+ inline const u_int16_t incr_aio_cnt() { return ++_aio_cnt; }
+ const u_int16_t decr_aio_cnt();
+
// Derived helper functions
inline const bool rd_empty() const { return _wr_cmpl_cnt_dblks == 0; }
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -91,7 +91,7 @@
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
aio_cycle();
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
}
void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
(_pg_offset_dblks * JRNL_DBLK_SIZE));
@@ -185,7 +185,7 @@
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
aio_cycle();
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
}
void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
(_pg_offset_dblks * JRNL_DBLK_SIZE));
@@ -225,7 +225,7 @@
if (dtokp->rstate() == data_tok::SKIP_PART)
{
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
const iores res = skip(dtokp);
if (res != RHM_IORES_SUCCESS)
{
@@ -256,7 +256,7 @@
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
aio_cycle();
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
}
void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
(_pg_offset_dblks * JRNL_DBLK_SIZE));
@@ -476,7 +476,7 @@
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
}
// Read data from this page, first block will have header and data size.
@@ -493,7 +493,7 @@
{
dtokp->set_rstate(data_tok::READ_PART);
dtokp->set_dsize(_enq_rec.data_size());
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
}
rptr = (void*)((char*)_page_ptr_arr[_pg_index]);
@@ -584,7 +584,7 @@
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
dtokp->set_rstate(data_tok::SKIP_PART);
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
}
}
else
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -128,7 +128,7 @@
return res;
bool cont = false;
- if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or
RHM_IORES_AIO_WAIT
+ if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or
RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::ENQ_PART)
cont = true;
@@ -223,7 +223,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
- res = RHM_IORES_AIO_WAIT;
+ res = RHM_IORES_PAGE_AIOWAIT;
done = true;
}
@@ -262,7 +262,7 @@
return res;
bool cont = false;
- if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or
RHM_IORES_AIO_WAIT
+ if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or
RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::DEQ_PART)
cont = true;
@@ -366,7 +366,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
- res = RHM_IORES_AIO_WAIT;
+ res = RHM_IORES_PAGE_AIOWAIT;
done = true;
}
@@ -405,7 +405,7 @@
return res;
bool cont = false;
- if (_abort_busy) // If abort() exited last time with RHM_IORES_FULL or
RHM_IORES_AIO_WAIT
+ if (_abort_busy) // If abort() exited last time with RHM_IORES_FULL or
RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::ABORT_PART)
cont = true;
@@ -507,7 +507,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
- res = RHM_IORES_AIO_WAIT;
+ res = RHM_IORES_PAGE_AIOWAIT;
done = true;
}
@@ -546,7 +546,7 @@
return res;
bool cont = false;
- if (_commit_busy) // If commit() exited last time with RHM_IORES_FULL or
RHM_IORES_AIO_WAIT
+ if (_commit_busy) // If commit() exited last time with RHM_IORES_FULL or
RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::COMMIT_PART)
cont = true;
@@ -644,7 +644,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
- res = RHM_IORES_AIO_WAIT;
+ res = RHM_IORES_PAGE_AIOWAIT;
done = true;
}
@@ -690,7 +690,7 @@
if (_cached_offset_dblks)
{
if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
- res = RHM_IORES_AIO_WAIT;
+ res = RHM_IORES_PAGE_AIOWAIT;
else
{
if (_page_cb_arr[_pg_index]._state != IN_USE)
@@ -718,6 +718,7 @@
if (::io_submit(_ioctx, 1, &this_iocb_ptr) < 0)
throw jexception(jerrno::JERR__AIO, "wmgr",
"write_flush");
_wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
+ _wrfc.incr_aio_cnt();
_aio_evt_rem++;
_cached_offset_dblks = 0;
@@ -735,10 +736,8 @@
const iores
wmgr::rotate_file()
{
- if (!_wrfc.rotate())
- return RHM_IORES_FULL;
_pg_cntr = 0;
- return RHM_IORES_SUCCESS;
+ return _wrfc.rotate();
}
const u_int32_t
@@ -861,6 +860,7 @@
// NOTE: We cannot use _wrfc here, as it may have rotated since submitting
count.
// Use stored pointer to nlfh in the pcb instead.
pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks);
+ pcbp->_wfh->decr_aio_cnt();
// Clean up this pcb's data_tok list
pcbp->_pdtokl->clear();
@@ -951,7 +951,7 @@
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
else if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
- return RHM_IORES_AIO_WAIT;
+ return RHM_IORES_PAGE_AIOWAIT;
else
{
std::ostringstream oss;
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -113,7 +113,9 @@
const iores flush();
const u_int32_t get_events(page_state state);
const bool is_txn_synced(const std::string& xid);
- inline const bool curr_pg_blocked() const { return _page_cb_arr[_pg_index]._state
!= UNUSED; }
+ inline const bool curr_pg_blocked() const
+ { return _page_cb_arr[_pg_index]._state != UNUSED; }
+ inline const bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; }
// Debug aid
const std::string status_str() const;
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -84,7 +84,7 @@
_enq_cap_offs_dblks = _fsize_dblks;
}
-bool
+const iores
wrfc::rotate()
{
if (!_nfiles)
@@ -96,7 +96,11 @@
_owi = !_owi;
}
_curr_fh = _fh_arr[_fh_index];
- return reset(); //Checks if file is still in use (ie not fully dequeued yet)
+ if (_curr_fh->aio_cnt())
+ return RHM_IORES_FILE_AIOWAIT;
+ if (!reset()) //Checks if file is still in use (ie not fully dequeued yet)
+ return RHM_IORES_FULL;
+ return RHM_IORES_SUCCESS;
}
const bool
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -39,6 +39,7 @@
}
}
+#include <jrnl/enums.hpp>
#include <jrnl/rrfc.hpp>
namespace rhm
@@ -80,7 +81,7 @@
* \brief Rotate active file handle to next file in rotating file group.
* \exception jerrno::JERR__NINIT if called before calling initialize().
*/
- bool rotate();
+ const iores rotate();
inline const u_int64_t rid() const { return _rid; }
inline const u_int64_t get_incr_rid() { return _rid++; }
@@ -102,6 +103,10 @@
inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
{ return _curr_fh->add_wr_cmpl_cnt_dblks(a); }
+ inline const u_int16_t aio_cnt() const { return _curr_fh->aio_cnt(); }
+ inline const u_int16_t incr_aio_cnt() { return _curr_fh->incr_aio_cnt(); }
+ inline const u_int16_t decr_aio_cnt() { return _curr_fh->decr_aio_cnt(); }
+
inline const bool empty() const { return _curr_fh->wr_empty(); }
inline const u_int32_t remaining_dblks() const { return
_curr_fh->wr_remaining_dblks(); }
inline const bool is_full() const { return _curr_fh->is_wr_full(); };
Modified: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -119,12 +119,13 @@
jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
jrnl_init(jc);
unsigned n = num_msgs_to_full(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS *
JRNL_SBLK_SIZE,
- MSG_REC_SIZE_DBLKS);
+ MSG_REC_SIZE_DBLKS, true);
for (unsigned m=0; m<3*2*n; m+=2) // overwrite files 3 times
{
enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
deq_msg(jc, m);
}
+ jc.stop(true);
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -35,7 +35,7 @@
#define NUM_TEST_JFILES 4
#define NUM_DEFAULT_JFILES 8
#define JRNL_DEFAULT_FSIZE 24 // Multiples of JRNL_RMGR_PAGE_SIZE
-#define TEST_JFSIZE_SBLKS 96
+#define TEST_JFSIZE_SBLKS 128
#define DEFAULT_JFSIZE_SBLKS (JRNL_DEFAULT_FSIZE * JRNL_RMGR_PAGE_SIZE)
#define NUM_MSGS 5
#define MSG_REC_SIZE_DBLKS 2
@@ -67,7 +67,7 @@
handle_jcntl_response(const iores res, jcntl& jc, unsigned& aio_sleep_cnt, const
std::string& ctxt,
const iores exp_ret)
{
- if (res == RHM_IORES_AIO_WAIT)
+ if (res == RHM_IORES_PAGE_AIOWAIT)
{
if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
{
@@ -102,9 +102,8 @@
ctxt << "enq_msg(" << rid << ")";
data_tok* dtp = new data_tok;
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.enqueue_data_record(msg.c_str(), msg.size(),
msg.size(),
- dtp, transient), jc, aio_sleep_cnt, ctxt.str(), exp_ret));
+ check_iores(ctxt.str(), jc.enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
dtp,
+ transient), exp_ret);
return dtp->rid();
}
@@ -116,9 +115,7 @@
ctxt << "enq_extern_msg(" << rid << ")";
data_tok* dtp = new data_tok;
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.enqueue_extern_data_record(msg_size, dtp, transient),
jc,
- aio_sleep_cnt, ctxt.str(), exp_ret));
+ check_iores(ctxt.str(), jc.enqueue_extern_data_record(msg_size, dtp, transient),
exp_ret);
return dtp->rid();
}
@@ -130,9 +127,8 @@
ctxt << "enq_txn_msg(" << rid << ")";
data_tok* dtp = new data_tok;
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.enqueue_txn_data_record(msg.c_str(), msg.size(),
msg.size(),
- dtp, xid, transient), jc, aio_sleep_cnt, ctxt.str(), exp_ret));
+ check_iores(ctxt.str(), jc.enqueue_txn_data_record(msg.c_str(), msg.size(),
msg.size(), dtp,
+ xid, transient), exp_ret);
return dtp->rid();
}
@@ -144,9 +140,8 @@
ctxt << "enq_extern_txn_msg(" << rid << ")";
data_tok* dtp = new data_tok;
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.enqueue_extern_txn_data_record(msg_size, dtp, xid,
transient),
- jc, aio_sleep_cnt, ctxt.str(), exp_ret));
+ check_iores(ctxt.str(), jc.enqueue_extern_txn_data_record(msg_size, dtp, xid,
transient),
+ exp_ret);
return dtp->rid();
}
@@ -159,9 +154,7 @@
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
dtp->set_wstate(data_tok::ENQ);
dtp->set_rid(drid);
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.dequeue_data_record(dtp), jc, aio_sleep_cnt,
ctxt.str(),
- exp_ret));
+ check_iores(ctxt.str(), jc.dequeue_data_record(dtp), exp_ret);
return dtp->rid();
}
@@ -175,9 +168,7 @@
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
dtp->set_wstate(data_tok::ENQ);
dtp->set_rid(drid);
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.dequeue_txn_data_record(dtp, xid), jc,
aio_sleep_cnt,
- ctxt.str(), exp_ret));
+ check_iores(ctxt.str(), jc.dequeue_txn_data_record(dtp, xid), exp_ret);
return dtp->rid();
}
@@ -186,9 +177,7 @@
{
data_tok* dtp = new data_tok;
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.txn_abort(dtp, xid), jc, aio_sleep_cnt,
"txn_abort", exp_ret));
+ check_iores("txn_abort", jc.txn_abort(dtp, xid), exp_ret);
return dtp->rid();
}
@@ -197,10 +186,7 @@
{
data_tok* dtp = new data_tok;
BOOST_CHECK_MESSAGE(dtp != 0, "Data token allocation failed (dtp == 0).");
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc.txn_commit(dtp, xid), jc, aio_sleep_cnt,
"txn_commit",
- exp_ret));
+ check_iores("txn_commit", jc.txn_commit(dtp, xid), exp_ret);
return dtp->rid();
}
@@ -237,23 +223,26 @@
const u_int32_t
num_msgs_to_full(const u_int16_t num_files, const u_int32_t file_size_dblks,
- const u_int32_t msg_rec_size_dblks)
+ const u_int32_t msg_rec_size_dblks, bool include_deq)
{
- return u_int32_t(::floor(1.0 * num_files * file_size_dblks / msg_rec_size_dblks));
+ u_int32_t rec_size_dblks = msg_rec_size_dblks;
+ if (include_deq)
+ rec_size_dblks++;
+ return u_int32_t(::floor(1.0 * num_files * file_size_dblks / rec_size_dblks));
}
const u_int32_t
num_msgs_to_threshold(const u_int16_t num_files, const u_int32_t file_size_dblks,
const u_int32_t msg_rec_size_dblks)
{
- return u_int32_t(::floor(1.0 * num_files * file_size_dblks * JRNL_ENQ_THRESHOLD /
+ return u_int32_t(::floor(1.0 * num_files * file_size_dblks * JRNL_ENQ_THRESHOLD /
msg_rec_size_dblks / 100));
}
const u_int32_t
num_dequeues_rem(const u_int16_t num_files, const u_int32_t file_size_sblks)
{
- return u_int32_t(::ceil(num_files * file_size_sblks * (1 - (1.0 * JRNL_ENQ_THRESHOLD /
100))));
+ return u_int32_t(::ceil(num_files * file_size_sblks * (1 - (1.0 * JRNL_ENQ_THRESHOLD
/ 100))));
}
const string&
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-03-17 21:11:19 UTC (rev 1771)
@@ -279,7 +279,7 @@
dptr = 0;
xptr = 0;
break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
if (get_rd_events() == 0)
{
rhm::journal::slock sl(&_rd_aio_mutex);
Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests 2008-03-07 16:53:30 UTC (rev 1770)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests 2008-03-17 21:11:19 UTC (rev 1771)
@@ -3,23 +3,23 @@
. ../setup
fail=0
-num_jrnls=3
+num_jrnls=1
# Run jtt using default test set
echo
-echo "***** Mode 1: New journal instance, no recover *****"
+echo "===== Mode 1: New journal instance, no recover ====="
rm -rf /tmp/test_0*
$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk --num-jrnls $num_jrnls || fail = 1
echo
-echo "***** Mode 2: Re-use journal instance, no recover *****"
+echo "===== Mode 2: Re-use journal instance, no recover ====="
rm -rf /tmp/test_0*
$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk --num-jrnls $num_jrnls
|| fail = 1
echo
-echo "***** Mode 3: New journal instance, recover previous test journal *****"
+echo "===== Mode 3: New journal instance, recover previous test journal ====="
rm -rf /tmp/test_0*
$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk --num-jrnls $num_jrnls ||
fail = 1
echo
-echo "***** Mode 4: Re-use journal instance, recover previous test journal
*****"
+echo "===== Mode 4: Re-use journal instance, recover previous test journal
====="
rm -rf /tmp/test_0*
$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk
--num-jrnls $num_jrnls || fail = 1
echo