Author: kpvdr
Date: 2008-03-27 11:24:41 -0400 (Thu, 27 Mar 2008)
New Revision: 1804
Modified:
store/trunk/cpp/lib/jrnl/enums.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/jinf.hpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Added read sync to journal. Fixes BZ430479. This fix should enable lazy-load to work.
Modified: store/trunk/cpp/lib/jrnl/enums.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enums.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/enums.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -47,6 +47,7 @@
RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for
AIO.
RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for
AIO.
RHM_IORES_EMPTY, ///< During read operations, nothing further is
available to read.
+ RHM_IORES_RCINVALID, ///< Read page cache is invalid (ie obsolete or
uninitialized)
RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
RHM_IORES_FULL, ///< During write operations, the journal files are
full.
RHM_IORES_BUSY, ///< Another blocking operation is in progress.
@@ -63,6 +64,7 @@
case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
+ case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
case RHM_IORES_FULL: return "RHM_IORES_FULL";
case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -151,7 +151,7 @@
rcvr_janalyze(_rcvdat, prep_txn_list);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._full)
- throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl",
"recover_complete");
+ throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl",
"recover");
// Debug info; should be sent to log file
// std::cout << _rcvdat.to_string(_jid) << std::endl <<
std::flush;
@@ -264,27 +264,46 @@
}
}
+/* TODO
const iores
jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const
size_t& dsize_avail,
const void** const data, bool auto_discard)
{
check_rstatus("get_data_record");
return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
-}
+} */
+/* TODO
const iores
jcntl::discard_data_record(data_tok* const dtokp)
{
check_rstatus("discard_data_record");
return _rmgr.discard(dtokp);
-}
+} */
+#define MAX_RCINVALID_CNT 50
+#define RCINVALID_SLEEP_TIME_MS 2
const iores
jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp,
size_t& xidsize,
bool& transient, bool& external, data_tok* const dtokp)
{
check_rstatus("read_data");
- return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
+ unsigned cnt = 0;
+ iores res;
+ do
+ {
+ res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
+ if (res == RHM_IORES_RCINVALID)
+ {
+ get_wr_events(); // check for outstanding write events
+ _rmgr.synchronize();
+ if (cnt > 0)
+ ::usleep(RCINVALID_SLEEP_TIME_MS * 1000);
+ }
+ cnt++;
+ }
+ while (cnt < MAX_RCINVALID_CNT && res == RHM_IORES_RCINVALID);
+ return res;
}
const iores
@@ -385,8 +404,15 @@
return res;
}
-// Private functions
+void
+jcntl::chk_wr_frot()
+{
+ if (_wrfc.index() == _rrfc.index())
+ _rmgr.invalidate();
+}
+// Protected/Private functions
+
void
jcntl::check_wstatus(const char* fn_name) const
{
@@ -507,6 +533,7 @@
rd._ffid = ji.get_start_file();
rd._lfid = ji.get_end_file();
rd._owi = ji.get_initial_owi();
+ rd._frot = ji.get_frot();
rd._empty = false;
}
catch (const jexception& e)
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -294,7 +294,8 @@
const iores enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok*
dtokp,
const std::string& xid, const bool transient = false);
- /**
+ /* TODO
+ **
* \brief Retrieve details of next record to be read without consuming the
record.
*
* Retrieve information about current read record. A pointer to the data is
returned, along
@@ -350,18 +351,21 @@
* discard_next_data_record() must be explicitly called.
*
* \exception TODO
- */
+ *
// *** NOT YET IMPLEMENTED ***
const iores get_data_record(const u_int64_t& rid, const size_t& dsize,
const size_t& dsize_avail, const void** const data, bool auto_discard
= false);
+ */
- /**
+ /* TODO
+ **
* \brief Discard (skip) next record to be read without reading or retrieving it.
*
* \exception TODO
- */
+ *
// *** NOT YET IMPLEMENTED ***
const iores discard_data_record(data_tok* const dtokp);
+ */
/**
* \brief Reads data from the journal. It is the responsibility of the reader to
free
@@ -524,6 +528,10 @@
inline const u_int32_t get_rd_outstanding_aio_dblks(u_int16_t pi) const
{ return _rrfc.file_handle(pi)->rd_aio_outstanding_dblks(); }
+ inline const u_int16_t get_rd_fid() const { return _rrfc.index(); }
+ inline const u_int16_t get_wr_fid() const { return _wrfc.index(); }
+ inline const u_int16_t get_earliest_fid() const { return _wrfc.earliest_index();
}
+
/**
* \brief Check if a particular rid is enqueued. Note that this function will
return
* false if the rid is transactionally enqueued and is not committed, or if it
is
@@ -578,8 +586,12 @@
inline const u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+ // these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to
_wmgr:
+ void chk_wr_frot();
+ inline const u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }
+
protected:
/**
* \brief Check status of journal before allowing write operations.
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -57,7 +57,8 @@
_analyzed_flag(false),
_start_file(0),
_end_file(0),
- _initial_owi(false)
+ _initial_owi(false),
+ _frot(false)
{
read(jinf_filename);
if (validate_flag)
@@ -179,6 +180,7 @@
{
if (!fnum)
throw jexception(jerrno::JERR_JINF_JDATEMPTY, "jinf",
"analyze");
+ _frot = true;
done = true;
}
else if (fnum == 0) // First file only
@@ -238,6 +240,14 @@
return _initial_owi;
}
+const bool
+jinf::get_frot()
+{
+ if (!_analyzed_flag)
+ analyze();
+ return _frot;
+}
+
const std::string
jinf::to_string() const
{
Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -64,6 +64,7 @@
u_int16_t _start_file;
u_int16_t _end_file;
bool _initial_owi;
+ bool _frot;
public:
// constructor for reading existing jinf file
@@ -93,6 +94,7 @@
const u_int16_t get_start_file();
const u_int16_t get_end_file();
const bool get_initial_owi();
+ const bool get_frot();
const std::string to_string() const;
const std::string xml_str() const;
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -111,7 +111,8 @@
// Derived helper functions
- inline const bool rd_empty() const { return _wr_cmpl_cnt_dblks == 0; }
+ inline const bool rd_void() const { return _wr_cmpl_cnt_dblks == 0; }
+ inline const bool rd_empty() const { return _wr_cmpl_cnt_dblks <=
JRNL_SBLK_SIZE; }
inline const u_int32_t rd_remaining_dblks() const
{ return _wr_cmpl_cnt_dblks - _rd_subm_cnt_dblks; }
inline const bool is_rd_full() const { return _wr_cmpl_cnt_dblks ==
_rd_subm_cnt_dblks; }
@@ -121,7 +122,8 @@
{ return _rd_subm_cnt_dblks - _rd_cmpl_cnt_dblks; }
inline const bool rd_file_rotate() const { return is_rd_full() &&
is_wr_compl(); }
- inline const bool wr_empty() const { return _wr_subm_cnt_dblks == 0; }
+ inline const bool wr_void() const { return _wr_subm_cnt_dblks == 0; }
+ inline const bool wr_empty() const { return _wr_subm_cnt_dblks <=
JRNL_SBLK_SIZE; }
inline const u_int32_t wr_remaining_dblks() const
{ return _ffull_dblks - _wr_subm_cnt_dblks; }
inline const bool is_wr_full() const { return _ffull_dblks == _wr_subm_cnt_dblks;
}
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -45,7 +45,8 @@
struct rcvdat
{
- bool _owi;
+ bool _owi; ///< Overwrite indicator
+ bool _frot; ///< First rotation flag
bool _empty; ///< Journal data files empty
u_int16_t _ffid; ///< First file id
size_t _fro; ///< First record offset in ffid
@@ -57,6 +58,7 @@
rcvdat(u_int16_t num_jfiles):
_owi(false),
+ _frot(false),
_empty(true),
_ffid(0),
_fro(0),
@@ -70,6 +72,7 @@
void reset()
{
_owi=false;
+ _frot = false;
_empty=true;
_ffid=0;
_fro=0;
@@ -86,6 +89,7 @@
std::ostringstream oss;
oss << "Jorunal file analysis (jid=\"" << jid
<< "\"):" << std::endl;
oss << " Overwrite indicator (_owi) = " << (_owi ?
"TRUE" : "FALSE") << std::endl;
+ oss << " First rotation (_frot) = " << (_frot ?
"TRUE" : "FALSE") << std::endl;
oss << " Journal empty (_empty) = " << (_empty ?
"TRUE" : "FALSE") << std::endl;
oss << " First fid (_ffid) = " << _ffid <<
std::endl;
oss << " First record offset in first fid (_fro) = 0x"
<< std::hex << _fro <<
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -44,11 +44,17 @@
rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc):
pmgr(jc, emap, tmap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES),
_rrfc(rrfc),
- _hdr()
+ _hdr(),
+ _valid(false),
+ _fhdr_buffer(0),
+ _iocbp(0),
+ _fhdr_rd_outstanding(false)
{}
rmgr::~rmgr()
-{}
+{
+ clean();
+}
void
rmgr::initialize(const rd_aio_cb rd_cb, const size_t fro)
@@ -65,150 +71,30 @@
_rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
_rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
}
-}
-const iores
-rmgr::get(const u_int64_t& /*rid*/, const size_t& /*dsize*/, const size_t&
/*dsize_avail*/,
- const void** const /*data*/, bool /*auto_discard*/)
-{
-//std::cout << " rmgr::get()" << std::flush;
- iores res = pre_read_check(0);
- if (res != RHM_IORES_SUCCESS)
- return res;
-
- return RHM_IORES_NOTIMPL;
-
-/* TODO...
- _hdr.reset();
- // Read header, determine next record type
- while (true)
+ // Allocate memory for reading file header
+ if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
{
- if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
- {
- aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_EMPTY;
- }
- if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
- {
- aio_cycle();
- return RHM_IORES_PAGE_AIOWAIT;
- }
- void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
- (_pg_offset_dblks * JRNL_DBLK_SIZE));
- ::memcpy(&_hdr, rptr, sizeof(hdr));
- switch (_hdr._magic)
- {
- case RHM_JDAT_ENQ_MAGIC:
- {
- size_t xid_size = *((size_t*)((char*)rptr + sizeof(hdr)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- + sizeof(u_int32_t) // filler0
-#endif
- ));
- size_t data_size = *((size_t*)((char*)rptr + sizeof(hdr) +
sizeof(u_int64_t)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- + sizeof(u_int32_t) // filler1
-#endif
- ));
- // TODO: Check if transaction is still in transaction map. If so,
block read
- // (unless in recovery, in whcih case return info normally
-// std::string xid = ?? (decode xid here)
-// if (xid_size && !readonly && tx_map.exists(xid))
-// return RHM_IORES_TXPENDING;
- rid = _hdr._rid;
- dsize = data_size;
-
- // Analyze how much of message is available
- void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize *
_pages;
- u_int16_t data_start_pg_index = _pg_index;
- u_int16_t data_start_pg_index = _pg_index;
- for (u_int16_t i=0; i<_pages; i++)
- {
- pi = (i + _pg_index) % _pages;
- if (data_ptr >= _page_ptr_arr[pi] &&
- data_ptr < (char*)_page_ptr_arr[pi] + _pagesize *
_sblksize)
- data_end_pg_index = pi; // found start page index
-
- }
- u_int16_t data_end_pg_index;
- u_int16_t last_pg_avail_index;
-
- void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize *
_pages;
- if (data_ptr >= page_end_ptr) // folded, go back to first page...
- data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
- void* data_end_ptr = (char*)data_ptr + data_size;
- if (data_end_ptr >= page_end_ptr) // folded, go back to first
page...
- data_end_ptr = (char*)_page_base_ptr + data_end_ptr -
page_end_ptr;
- dsize_avail = ??;
- if(data_ptr folded)
- else
- *data = data_ptr;
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- consume_deq();
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- consume_filler();
- break;
- default:
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "Magic=0x" << std::setw(8) <<
_hdr._magic << std::dec;
- throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(),
"rmgr", "get");
- } // switch(_hdr._magic)
- } // while */
+ clean();
+ std::ostringstream oss;
+ oss << "posix_memalign(): blksize=" << _sblksize <<
" size=" << _sblksize;
+ oss << " errno=" << errno;
+ throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr",
"initialize");
+ }
+ _iocbp = new iocb;
+ ::memset(_iocbp, 0, sizeof(iocb*));
}
-const iores
-rmgr::discard(data_tok* dtokp)
+void
+rmgr::clean()
{
-//std::cout << " rmgr::get()" << std::flush;
- iores res = pre_read_check(dtokp);
- if (res != RHM_IORES_SUCCESS)
- return res;
-
- return RHM_IORES_NOTIMPL;
-
-/* TODO...
- _hdr.reset();
- // Read header, determine next record type
- while (true)
+ ::free(_fhdr_buffer);
+ _fhdr_buffer = 0;
+ if (_iocbp)
{
- if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
- {
- aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_EMPTY;
- }
- if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
- {
- aio_cycle();
- return RHM_IORES_PAGE_AIOWAIT;
- }
- void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
- (_pg_offset_dblks * JRNL_DBLK_SIZE));
- ::memcpy(&_hdr, rptr, sizeof(hdr));
- switch (_hdr._magic)
- {
- case RHM_JDAT_ENQ_MAGIC:
- {
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- consume_deq();
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- consume_filler();
- break;
- default:
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "Magic=0x" << std::setw(8) <<
_hdr._magic << std::dec;
- throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(),
"rmgr", "discard");
- } // switch
- } // while */
+ delete _iocbp;
+ _iocbp = 0;
+ }
}
const iores
@@ -225,7 +111,10 @@
if (dtokp->rstate() == data_tok::SKIP_PART)
{
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
return RHM_IORES_PAGE_AIOWAIT;
+ }
const iores res = skip(dtokp);
if (res != RHM_IORES_SUCCESS)
{
@@ -250,8 +139,11 @@
{
if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
{
- aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_EMPTY;
+ aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else
+ return RHM_IORES_EMPTY;
}
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
@@ -405,13 +297,25 @@
throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr",
"get_events");
}
- // Increment the completed read offset
- // NOTE: We cannot use _rrfc here, as it may have rotated since submitting
count.
- // Use stored pointer to nlfh in the pcb instead.
- pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
- pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
- pcbp->_state = state;
- pil[i] = pcbp->_index;
+ if (pcbp) // Page reads have pcb
+ {
+ // Increment the completed read offset
+ // NOTE: We cannot use _rrfc here, as it may have rotated since submitting
count.
+ // Use stored pointer to nlfh in the pcb instead.
+ pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
+ pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
+ pcbp->_state = state;
+ pil[i] = pcbp->_index;
+ }
+ else // File header reads have no pcb
+ {
+ ::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
+ _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+ size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
+ _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
+ _fhdr_rd_outstanding = false;
+ _valid = true;
+ }
}
// Perform AIO return callback
@@ -436,6 +340,19 @@
}
void
+rmgr::invalidate()
+{
+ if (_valid)
+ {
+ _valid = false;
+ for (int i=0; i<_pages; i++)
+ _page_cb_arr[i]._state = UNUSED;
+ _rrfc.reset();
+ _pg_offset_dblks = 0;
+ }
+}
+
+void
rmgr::initialize()
{
pmgr::initialize();
@@ -447,9 +364,24 @@
if (_aio_evt_rem)
get_events();
+ if (!_valid)
+ {
+ if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
+ return RHM_IORES_EMPTY;
+ else
+ return RHM_IORES_RCINVALID;
+ }
+
+ // block reads until outstanding file header read completes as fro is needed to read
+ if (_fhdr_rd_outstanding)
+ return RHM_IORES_PAGE_AIOWAIT;
+
if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if any AIOs have returned
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else
return RHM_IORES_EMPTY;
}
@@ -639,16 +571,24 @@
{
for (int16_t i=0; i<num_uninit; i++)
{
- if (_rrfc.empty()) // Nothing to do; this file not yet written to
+ if (_rrfc.is_void()) // Nothing to do; this file not yet written to
break;
- // If this is the first read from a file, increase the read pointers to beyond
fhdr
- // or consume fhdr here for analysis (not req'd at present)
if (_rrfc.subm_offs() == 0)
{
- _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
- _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+ if (_valid)
+ {
+ _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+ _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+ }
+ else
+ {
+ u_int16_t fid = _jc->get_earliest_fid();
+ init_file_header_read(fid);
+ }
}
+ else
+ _valid = true;
// TODO: Future perf improvement: Do a single AIO read for all available file
// space into all contiguous empty pages in one AIO operation.
@@ -681,9 +621,6 @@
void
rmgr::consume_fhdr()
{
- // If in the future it should become necessary to read each file header, this is
where it would
- // happen.
-
// Set read pointers to first dblk after file header
_rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
@@ -725,5 +662,155 @@
xidsize = 0;
}
+void
+rmgr::init_file_header_read(u_int16_t fid)
+{
+ int rfh = _rrfc.file_handle(fid)->rd_fh();
+ rhm_prep_pread(_iocbp, rfh, _fhdr_buffer, _sblksize, 0);
+ if (::io_submit(_ioctx, 1, &_iocbp) < 0)
+ throw jexception(jerrno::JERR__AIO, "rmgr",
"init_file_header_read");
+ _aio_evt_rem++;
+ _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+ _fhdr_rd_outstanding = true;
+}
+
+/* TODO (sometime in the future)
+const iores
+rmgr::get(const u_int64_t& rid, const size_t& dsize, const size_t&
dsize_avail,
+ const void** const data, bool auto_discard)
+{
+ iores res = pre_read_check(0);
+ if (res != RHM_IORES_SUCCESS)
+ return res;
+
+ _hdr.reset();
+ // Read header, determine next record type
+ while (true)
+ {
+ if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
+ {
+ aio_cycle(); // check if any AIOs have returned
+ return RHM_IORES_EMPTY;
+ }
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ aio_cycle();
+ return RHM_IORES_PAGE_AIOWAIT;
+ }
+ void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
+ (_pg_offset_dblks * JRNL_DBLK_SIZE));
+ ::memcpy(&_hdr, rptr, sizeof(hdr));
+ switch (_hdr._magic)
+ {
+ case RHM_JDAT_ENQ_MAGIC:
+ {
+ size_t xid_size = *((size_t*)((char*)rptr + sizeof(hdr)
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ + sizeof(u_int32_t) // filler0
+#endif
+ ));
+ size_t data_size = *((size_t*)((char*)rptr + sizeof(hdr) +
sizeof(u_int64_t)
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ + sizeof(u_int32_t) // filler1
+#endif
+ ));
+ // TODO: Check if transaction is still in transaction map. If so,
block read
+ // (unless in recovery, in whcih case return info normally
+// std::string xid = ?? (decode xid here)
+// if (xid_size && !readonly && tx_map.exists(xid))
+// return RHM_IORES_TXPENDING;
+ rid = _hdr._rid;
+ dsize = data_size;
+
+ // Analyze how much of message is available
+ void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
+ void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize *
_pages;
+ u_int16_t data_start_pg_index = _pg_index;
+ u_int16_t data_start_pg_index = _pg_index;
+ for (u_int16_t i=0; i<_pages; i++)
+ {
+ pi = (i + _pg_index) % _pages;
+ if (data_ptr >= _page_ptr_arr[pi] &&
+ data_ptr < (char*)_page_ptr_arr[pi] + _pagesize *
_sblksize)
+ data_end_pg_index = pi; // found start page index
+
+ }
+ u_int16_t data_end_pg_index;
+ u_int16_t last_pg_avail_index;
+
+ void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
+ void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize *
_pages;
+ if (data_ptr >= page_end_ptr) // folded, go back to first page...
+ data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
+ void* data_end_ptr = (char*)data_ptr + data_size;
+ if (data_end_ptr >= page_end_ptr) // folded, go back to first
page...
+ data_end_ptr = (char*)_page_base_ptr + data_end_ptr -
page_end_ptr;
+ dsize_avail = ??;
+ if(data_ptr folded)
+ else
+ *data = data_ptr;
+ }
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
+ consume_deq();
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
+ consume_filler();
+ break;
+ default:
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "Magic=0x" << std::setw(8) <<
_hdr._magic << std::dec;
+ throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(),
"rmgr", "get");
+ } // switch(_hdr._magic)
+ } // while
+}
+
+const iores
+rmgr::discard(data_tok* dtokp)
+{
+ iores res = pre_read_check(dtokp);
+ if (res != RHM_IORES_SUCCESS)
+ return res;
+
+ _hdr.reset();
+ // Read header, determine next record type
+ while (true)
+ {
+ if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
+ {
+ aio_cycle(); // check if any AIOs have returned
+ return RHM_IORES_EMPTY;
+ }
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ aio_cycle();
+ return RHM_IORES_PAGE_AIOWAIT;
+ }
+ void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
+ (_pg_offset_dblks * JRNL_DBLK_SIZE));
+ ::memcpy(&_hdr, rptr, sizeof(hdr));
+ switch (_hdr._magic)
+ {
+ case RHM_JDAT_ENQ_MAGIC:
+ {
+ }
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
+ consume_deq();
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
+ consume_filler();
+ break;
+ default:
+ std::ostringstream oss;
+ oss << std::hex << std::setfill('0');
+ oss << "Magic=0x" << std::setw(8) <<
_hdr._magic << std::dec;
+ throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(),
"rmgr", "discard");
+ } // switch
+ } // while
+}
+*/
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -41,6 +41,7 @@
#include <jrnl/aio_cb.hpp>
#include <jrnl/enums.hpp>
+#include <jrnl/file_hdr.hpp>
#include <jrnl/pmgr.hpp>
#include <jrnl/rec_hdr.hpp>
#include <jrnl/rrfc.hpp>
@@ -63,21 +64,34 @@
rec_hdr _hdr; ///< Header used to determind record type
rd_aio_cb _cb; ///< Callback function pointer for AIO events
+ bool _valid; ///< Flag is true when read pages contain vailid data
+ void* _fhdr_buffer; ///< Buffer used for fhdr reads
+ iocb* _iocbp; ///< iocb pointer for fhdr reads
+ file_hdr _fhdr; ///< file header instance for reading file headers
+ bool _fhdr_rd_outstanding; ///< true if a fhdr read is outstanding
+
public:
rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
virtual ~rmgr();
void initialize(const rd_aio_cb rd_cb, const size_t fro);
- const iores get(const u_int64_t& rid, const size_t& dsize, const
size_t& dsize_avail,
- const void** const data, bool auto_discard);
- const iores discard(data_tok* dtok);
const iores read(void** const datapp, size_t& dsize, void** const xidpp,
size_t& xidsize,
bool& transient, bool& external, data_tok* dtokp);
const u_int32_t get_events(page_state state = AIO_COMPLETE);
void recover_complete(size_t fro);
+ inline const bool is_valid() const {return _valid; }
+ inline void synchronize() { if (!_valid) aio_cycle(); }
+ void invalidate();
+
+ /* TODO (if required)
+ const iores get(const u_int64_t& rid, const size_t& dsize, const
size_t& dsize_avail,
+ const void** const data, bool auto_discard);
+ const iores discard(data_tok* dtok);
+ */
private:
void initialize();
+ void clean();
const iores pre_read_check(data_tok* dtokp);
const iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
@@ -90,6 +104,7 @@
const u_int32_t dblks_rem() const;
void set_params_null(void** const datapp, size_t& dsize, void** const xidpp,
size_t& xidsize);
+ void init_file_header_read(u_int16_t fid);
// Special version of libaio's io_prep_pread() which preserves the value of
the data
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -53,6 +53,12 @@
{
_nfiles = nfiles;
_fh_arr = fh_arr;
+ reset(fh_index);
+}
+
+void
+rrfc::reset(u_int32_t fh_index)
+{
_fh_index = fh_index;
_curr_fh = _fh_arr[_fh_index];
}
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -73,6 +73,8 @@
*/
void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
+ void reset(u_int32_t fh_index = 0);
+
/**
* \brief Rotate active file handle to next file in rotating file group.
* \exception jerrno::JERR__NINIT if called before calling initialize().
@@ -99,7 +101,7 @@
// Convenience access methods to current file handle
- inline const u_int32_t fid() const { return _curr_fh->fid(); }
+ inline const u_int16_t fid() const { return _curr_fh->fid(); }
inline const int fh() const { return _curr_fh->rd_fh(); }
inline const u_int32_t enqcnt() const { return _curr_fh->enqcnt(); }
inline const u_int32_t incr_enqcnt() { return _curr_fh->incr_enqcnt(); }
@@ -121,7 +123,8 @@
inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
{ return _curr_fh->add_rd_cmpl_cnt_dblks(a); }
- inline const bool empty() const { return _curr_fh->rd_empty(); }
+ inline const bool is_void() const { return _curr_fh->rd_void(); }
+ inline const bool is_empty() const { return _curr_fh->rd_empty(); }
inline const u_int32_t remaining_dblks() { return
_curr_fh->rd_remaining_dblks(); }
inline const bool is_full() const { return _curr_fh->is_rd_full(); }
inline const bool is_compl() const { return _curr_fh->is_rd_compl(); }
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -199,7 +199,7 @@
dtokp->set_wstate(data_tok::ENQ_PART);
// Has the file header been written (i.e. write pointers still at 0)?
- if (_wrfc.empty())
+ if (_wrfc.is_void())
{
u_int32_t rec_dblks_rem = _enq_rec.rec_size_dblks() - data_offs_dblks;
bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -342,7 +342,7 @@
dtokp->set_wstate(data_tok::DEQ_PART);
// Has the file header been written (i.e. write pointers still at 0)?
- if (_wrfc.empty())
+ if (_wrfc.is_void())
{
u_int32_t rec_dblks_rem = _deq_rec.rec_size_dblks() - data_offs_dblks;
bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -483,7 +483,7 @@
dtokp->set_wstate(data_tok::ABORT_PART);
// Has the file header been written (i.e. write pointers still at 0)?
- if (_wrfc.empty())
+ if (_wrfc.is_void())
{
u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -620,7 +620,7 @@
dtokp->set_wstate(data_tok::COMMIT_PART);
// Has the file header been written (i.e. write pointers still at 0)?
- if (_wrfc.empty())
+ if (_wrfc.is_void())
{
u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -737,7 +737,9 @@
wmgr::rotate_file()
{
_pg_cntr = 0;
- return _wrfc.rotate();
+ iores res = _wrfc.rotate();
+ _jc->chk_wr_frot();
+ return res;
}
const u_int32_t
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -116,6 +116,7 @@
inline const bool curr_pg_blocked() const
{ return _page_cb_arr[_pg_index]._state != UNUSED; }
inline const bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; }
+ inline const u_int32_t unflushed_dblks() { return _cached_offset_dblks; }
// Debug aid
const std::string status_str() const;
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -50,7 +50,8 @@
_rid(0),
#endif
_reset_ok(false),
- _owi(false)
+ _owi(false),
+ _frot(true)
{}
wrfc::~wrfc() {}
@@ -64,6 +65,7 @@
_rid = rdp->_h_rid + 1;
_reset_ok = true;
_owi = rdp->_owi;
+ _frot = rdp->_frot;
}
else
{
@@ -94,6 +96,7 @@
{
_fh_index = 0;
_owi = !_owi;
+ _frot = false;
}
_curr_fh = _fh_arr[_fh_index];
if (_curr_fh->aio_cnt())
@@ -103,6 +106,17 @@
return RHM_IORES_SUCCESS;
}
+const u_int16_t
+wrfc::earliest_index() const
+{
+ if (_frot)
+ return 0;
+ u_int16_t next_index = _fh_index + 1;
+ if (next_index >= _nfiles)
+ next_index = 0;
+ return next_index;
+}
+
const bool
wrfc::reset()
{
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -60,6 +60,7 @@
u_int64_t _rid; ///< Master counter for record ID (rid)
bool _reset_ok; ///< Flag set when reset succeeds
bool _owi; ///< Overwrite indicator
+ bool _frot; ///< Flag is true for first rotation, false at all
other times
public:
wrfc();
@@ -83,11 +84,19 @@
*/
const iores rotate();
+ /**
+ * \brief Returns the index of the earliest complete file within the rotating
+ * file group. Unwritten files are excluded. The currently active file is
+ * excluded unless it is the only written file.
+ */
+ const u_int16_t earliest_index() const;
+
inline const u_int64_t rid() const { return _rid; }
inline const u_int64_t get_incr_rid() { return _rid++; }
const bool reset();
inline const bool is_reset() const { return _reset_ok; }
inline const bool owi() const { return _owi; }
+ inline const bool frot() const { return _frot; }
// Convenience access methods to current file handle
@@ -107,7 +116,8 @@
inline const u_int16_t incr_aio_cnt() { return _curr_fh->incr_aio_cnt(); }
inline const u_int16_t decr_aio_cnt() { return _curr_fh->decr_aio_cnt(); }
- inline const bool empty() const { return _curr_fh->wr_empty(); }
+ inline const bool is_void() const { return _curr_fh->wr_void(); }
+ inline const bool is_empty() const { return _curr_fh->wr_empty(); }
inline const u_int32_t remaining_dblks() const { return
_curr_fh->wr_remaining_dblks(); }
inline const bool is_full() const { return _curr_fh->is_wr_full(); };
inline const bool is_compl() const { return _curr_fh->is_wr_compl(); };
Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-03-27 15:24:41 UTC (rev 1804)
@@ -46,6 +46,25 @@
// === Test suite ===
+QPID_AUTO_TEST_CASE(empty_read)
+{
+ string test_name = get_test_name(test_filename, "empty_read");
+ try
+ {
+ string msg;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(enqueue_read_dequeue_block)
{
string test_name = get_test_name(test_filename,
"enqueue_read_dequeue_block");
@@ -232,24 +251,24 @@
string test_name = get_test_name(test_filename, "delayed_read");
try
{
- cout << "[NOTE: Disabled until outstanding issue(s) resolved.]
";
-// string msg;
-// string rmsg;
-// string xid;
-// bool transientFlag;
-// bool externalFlag;
-//
-// jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
-// jrnl_init(jc);
-// unsigned m;
-// for (m=0; m<2*NUM_MSGS; m+=2)
-// {
-// enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-// deq_msg(jc, m);
-// }
-// enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-// read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
-// BOOST_CHECK_EQUAL(msg, rmsg);
+ string msg;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ unsigned m;
+ for (m=0; m<2*NUM_MSGS; m+=2)
+ {
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ deq_msg(jc, m);
+ }
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(msg, rmsg);
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
@@ -260,29 +279,77 @@
string test_name = get_test_name(test_filename,
"cache_cycled_delayed_read");
try
{
- cout << "[NOTE: Disabled until outstanding issue(s) resolved.]
";
-// string msg;
-// string rmsg;
-// string xid;
-// bool transientFlag;
-// bool externalFlag;
-//
-// jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
-// jrnl_init(jc);
-// unsigned m;
-// unsigned n = num_msgs_to_full(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS *
JRNL_SBLK_SIZE,
-// MSG_REC_SIZE_DBLKS);
-// for (m=0; m<12*2*n; m+=2) // 12 file cycles
-// {
-// enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-// deq_msg(jc, m);
-// }
-// enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-// read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
-// BOOST_CHECK_EQUAL(msg, rmsg);
+ string msg;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ unsigned m;
+ unsigned read_buffer_size_dblks = JRNL_RMGR_PAGES * JRNL_RMGR_PAGE_SIZE *
JRNL_SBLK_SIZE;
+ unsigned n = num_msgs_to_full(1, read_buffer_size_dblks, MSG_REC_SIZE_DBLKS,
true);
+ for (m=0; m<2*2*n + 20; m+=2) // fill read buffer twice + 10 msgs
+ {
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ deq_msg(jc, m);
+ }
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(msg, rmsg);
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(increasing_interval_delayed_read)
+{
+ string test_name = get_test_name(test_filename,
"increasing_interval_delayed_read");
+ try
+ {
+ string msg;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ unsigned read_buffer_size_dblks = JRNL_RMGR_PAGES * JRNL_RMGR_PAGE_SIZE *
JRNL_SBLK_SIZE;
+ unsigned n = num_msgs_to_full(1, read_buffer_size_dblks, MSG_REC_SIZE_DBLKS,
true);
+ unsigned m = 0;
+
+ // Validate read pipeline
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ deq_msg(jc, m);
+ m += 2;
+
+ // repeat the following multiple times...
+ for (int i=0; i<10; i++)
+ {
+ // Invalidate read pipeline with large write
+ unsigned t = m + (i*n) + 25;
+ for (; m<t; m+=2)
+ {
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ deq_msg(jc, m);
+ }
+
+ // Revalidate read pipeline
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(msg, rmsg);
+ deq_msg(jc, m);
+ m += 2;
+ }
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_SUITE_END()