Author: kpvdr
Date: 2008-05-05 10:58:54 -0400 (Mon, 05 May 2008)
New Revision: 1994
Modified:
store/trunk/cpp/configure.ac
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/arr_cnt.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/tests/jrnl/_ut_jinf.cpp
Log:
Bugfix for 444592: "Store fails to recover large dequeued journals owing to
RHM_IORES_PAGE_AIOWAIT". Some tidy-up too.
Modified: store/trunk/cpp/configure.ac
===================================================================
--- store/trunk/cpp/configure.ac 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/configure.ac 2008-05-05 14:58:54 UTC (rev 1994)
@@ -127,7 +127,7 @@
AC_SEARCH_LIBS([__db_open], [db_cxx-4.6 db_cxx-4.5 db_cxx-4.4 db_cxx-4.3],
[test "$ac_cv_search___db_open" = "none required" ||
LIB_BERKELEY_DB=$ac_cv_search___db_open],
- AC_MSG_ERROR([couldn't find required library: db_cxx-4.3]))
+ AC_MSG_ERROR([Couldn't find required library in range db_cxx-4.3 through
db_cxx-4.6]))
AC_SUBST([LIB_BERKELEY_DB])
LIBS=$gl_saved_libs
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -561,6 +561,11 @@
DataTokenImpl dtokp;
size_t readSize = 0;
unsigned msg_count=0;
+
+ // TODO: This optimization to skip reading if there are no enqueued messages to read
+ // breaks the python system test in phase 6 with "Exception: Cannot write lock
file"
+ // Figure out what is breaking.
+ //bool read = jc->get_enq_cnt() > 0;
bool read = true;
void* dbuff = NULL; size_t dbuffSize = 0;
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -183,6 +183,7 @@
std::ostringstream oss2;
oss2 << "Recover phase I complete; highest rid found = 0x" <<
std::hex << highest_rid;
oss2 << std::dec << "; emap.size=" << _emap.size()
<< "; tmap.size=" << _tmap.size();
+ oss2 << "; journal now read-only.";
log(LOG_DEBUG, oss2.str());
if (_mgmtObject.get() != 0)
@@ -194,6 +195,13 @@
}
}
+void
+JournalImpl::recover_complete()
+{
+ jcntl::recover_complete();
+ log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
+}
+
#define MAX_AIO_SLEEPS 500
#define AIO_SLEEP_TIME 1000000
bool
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-05-05 14:58:54 UTC (rev 1994)
@@ -113,6 +113,8 @@
recover(num_jfiles, jfsize_sblks, 0, &aio_wr_callback, prep_tx_list,
highest_rid,
queue_id);
}
+
+ void recover_complete();
// Temporary fn to read and save last msg read from journal so it can be
assigned
// in chunks. To be replaced when coding to do this direct from the journal
is ready.
Modified: store/trunk/cpp/lib/jrnl/arr_cnt.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/arr_cnt.hpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/arr_cnt.hpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -43,7 +43,7 @@
/**
* \class arr_cnt
* \brief Class which implements a dynamically allocated array of u_int32_t counters.
- * This is ideal where it is necessary to increment and decrement counts for an
entuty
+ * This is ideal where it is necessary to increment and decrement counts for an
entity
* for which the number of elements is unknown, but for which the efficiency of a
static
* array is required. None of the counts may go below zero.
*/
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -139,7 +139,7 @@
// constrains read activity (i.e. one can't read what has not yet been written).
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
- _rmgr.initialize(rd_cb, 0);
+ _rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
// Write info file (<basename>.jinf) to disk
@@ -187,7 +187,7 @@
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl",
"recover");
- this->log(LOG_DEBUG, _rcvdat.to_string(_jid));
+ this->log(LOG_DEBUG, _rcvdat.to_log(_jid));
_datafh = new lfh*[_num_jfiles];
// 0 the pointer array first because new() can throw exceptions
@@ -205,7 +205,7 @@
// constrains read activity (i.e. one can't read what has not yet been written).
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
- _rmgr.initialize(rd_cb, _rcvdat._fro);
+ _rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
_readonly_flag = true;
@@ -221,9 +221,8 @@
_datafh[i]->reset(&_rcvdat);
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
- _rmgr.recover_complete(_rcvdat._fro);
+ _rmgr.recover_complete();
_readonly_flag = false;
- this->log(LOG_DEBUG, "Recover phase II complete; journal now
writable.");
}
void
@@ -413,6 +412,21 @@
flush(block_till_aio_cmpl);
}
+u_int16_t
+jcntl::get_earliest_fid()
+{
+ u_int16_t ffid = _wrfc.earliest_index();
+ u_int16_t fid = _wrfc.index();
+ while ( _emap.get_enq_cnt(ffid) == 0 && _tmap.get_txn_fid_cnt(ffid) == 0
&& ffid != fid)
+ {
+ if (++ffid >= _num_jfiles)
+ ffid = 0;
+ }
+ if (ffid != _rrfc.fid())
+ _rrfc.reset(ffid);
+ return ffid;
+}
+
iores
jcntl::flush(const bool block_till_aio_cmpl)
{
@@ -602,7 +616,7 @@
u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
-
+
// Remove all transactions not in prep_txn_list
std::vector<std::string> xid_list;
_tmap.xid_list(xid_list);
@@ -760,7 +774,7 @@
}
break;
case 0:
- rd._eo = ifsp->tellg();
+ rd._eo = file_pos;
return false;
default:
// Stop as this is the overwrite boundary.
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -534,7 +534,7 @@
inline u_int16_t get_rd_fid() const { return _rrfc.index(); }
inline u_int16_t get_wr_fid() const { return _wrfc.index(); }
- inline u_int16_t get_earliest_fid() const { return _wrfc.earliest_index(); }
+ u_int16_t get_earliest_fid();
/**
* \brief Check if a particular rid is enqueued. Note that this function will
return
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -113,13 +113,6 @@
{
if (!ro->_empty)
{
- // For first file only, set read counters to ahead of file header
- if (ro->_ffid == _fid)
- {
- _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
- _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
- }
-
// For last file only, set write counters to end of last record (the
// continuation point); for all others, set to eof.
if (ro->_lfid == _fid)
@@ -199,15 +192,24 @@
bool
nlfh::reset(const rcvdat* const ro)
{
+ rd_reset();
+ return wr_reset(ro);
+}
+
+void
+nlfh::rd_reset()
+{
+ _rd_subm_cnt_dblks = 0;
+ _rd_cmpl_cnt_dblks = 0;
+}
+
+bool
+nlfh::wr_reset(const rcvdat* const ro)
+{
if (ro)
{
if (!ro->_empty)
{
- if (ro->_ffid == _fid)
- {
- _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
- _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
- }
if (ro->_lfid == _fid)
{
_wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
@@ -232,8 +234,6 @@
_wr_subm_cnt_dblks = 0;
_wr_cmpl_cnt_dblks = 0;
#endif
- _rd_subm_cnt_dblks = 0;
- _rd_cmpl_cnt_dblks = 0;
return true;
}
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -79,6 +79,8 @@
virtual void initialize(const std::string& fbasename, const u_int16_t fid,
const u_int32_t jfsize_sblks, const rcvdat* const ro);
virtual bool reset(const rcvdat* const ro = 0);
+ virtual void rd_reset();
+ virtual bool wr_reset(const rcvdat* const ro = 0);
inline const std::string& fname() const { return _fname; }
inline u_int16_t fid() const { return _fid; }
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -107,6 +107,31 @@
std::endl;
return oss.str();
}
+
+ std::string to_log(std::string& jid)
+ {
+ std::ostringstream oss;
+ oss << "Recover file analysis (jid=\"" << jid
<< "\"):";
+ oss << " owi=" << (_owi ? "TRUE" :
"FALSE");
+ oss << " frot=" << (_frot ? "TRUE" :
"FALSE");
+ oss << " empty=" << (_empty ? "TRUE" :
"FALSE");
+ oss << " ffid=" << _ffid;
+ oss << " fro=0x" << std::hex << _fro <<
std::dec << " (" <<
+ (_fro/JRNL_DBLK_SIZE) << " dblks)";
+ oss << " lfid=" << _lfid;
+ oss << " eo=0x" << std::hex << _eo <<
std::dec << " (" <<
+ (_eo/JRNL_DBLK_SIZE) << " dblks)";
+ oss << " h_rid=0x" << std::hex << _h_rid
<< std::dec;
+ oss << " full=" << (_full ? "TRUE" :
"FALSE");
+ oss << " Enqueued records (txn & non-txn): [ ";
+ for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+ {
+ if (i) oss << " ";
+ oss << "fid_" << std::setw(2) <<
std::setfill('0') << i << "=" << _enq_cnt_list[i];
+ }
+ oss << " ]";
+ return oss.str();
+ }
};
}
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -58,21 +58,10 @@
}
void
-rmgr::initialize(const rd_aio_cb rd_cb, const std::size_t fro)
+rmgr::initialize(const rd_aio_cb rd_cb)
{
_cb = rd_cb;
initialize();
- if (fro)
- {
- u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
- _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
- u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
- _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
- _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
- _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
- _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
- }
-
clean();
// Allocate memory for reading file header
if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
@@ -313,8 +302,17 @@
{
std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
- std::size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
- _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
+
+// std::size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
+// _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
+ u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+ _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE *
JRNL_SBLK_SIZE;
+ _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+ _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+ _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+ _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+
_fhdr_rd_outstanding = false;
_valid = true;
}
@@ -327,19 +325,8 @@
}
void
-rmgr::recover_complete(std::size_t fro)
-{
- if (fro)
- {
- u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
- _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
- u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
- _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
- _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
- _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
- _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
- }
-}
+rmgr::recover_complete()
+{}
void
rmgr::invalidate()
@@ -571,26 +558,25 @@
void
rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
{
+ if (_fhdr_rd_outstanding)
+ return;
for (int16_t i=0; i<num_uninit; i++)
{
if (_rrfc.is_void()) // Nothing to do; this file not yet written to
break;
-
+
+ if (!_valid)
+ {
+ u_int16_t fid = _jc->get_earliest_fid();
+ init_file_header_read(fid);
+ break;
+ }
+
if (_rrfc.subm_offs() == 0)
{
- if (_valid)
- {
- _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
- _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
- }
- else
- {
- u_int16_t fid = _jc->get_earliest_fid();
- init_file_header_read(fid);
- }
+ _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+ _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
}
- else
- _valid = true;
// TODO: Future perf improvement: Do a single AIO read for all available file
// space into all contiguous empty pages in one AIO operation.
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -75,11 +75,11 @@
rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
virtual ~rmgr();
- void initialize(const rd_aio_cb rd_cb, const std::size_t fro);
- iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize,
- bool& transient, bool& external, data_tok* dtokp);
+ void initialize(const rd_aio_cb rd_cb);
+ iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
+ std::size_t& xidsize, bool& transient, bool& external,
data_tok* dtokp);
u_int32_t get_events(page_state state = AIO_COMPLETE);
- void recover_complete(std::size_t fro);
+ void recover_complete();
inline bool is_valid() const {return _valid; }
inline void synchronize() { if (!_valid) aio_cycle(); }
void invalidate();
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -62,6 +62,7 @@
{
_fh_index = fh_index;
_curr_fh = _fh_arr[_fh_index];
+ _curr_fh->rd_reset();
}
bool
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -72,7 +72,7 @@
* each of which correspond to one of the physical files.
* \param fh_index Initial index of journal file. Default = 0.
*/
- void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
+ virtual void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index
= 0);
void reset(u_int32_t fh_index = 0);
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -62,7 +62,11 @@
{
if (rdp)
{
- rrfc::initialize(nfiles, fh_arr, rdp->_lfid);
+ _nfiles = nfiles;
+ _fh_arr = fh_arr;
+ _fh_index = rdp->_lfid;
+ _curr_fh = _fh_arr[rdp->_lfid];
+ _curr_fh->wr_reset(rdp);
_rid = rdp->_h_rid + 1;
_reset_ok = true;
_owi = rdp->_owi;
Modified: store/trunk/cpp/tests/jrnl/_ut_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_jinf.cpp 2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/tests/jrnl/_ut_jinf.cpp 2008-05-05 14:58:54 UTC (rev 1994)
@@ -38,6 +38,7 @@
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
#include "jrnl/jinf.hpp"
+#include <vector>
#define NUM_JFILES 4
#define JFSIZE_SBLKS 128