[rhmessaging-commits] rhmessaging commits: r1523 - store/trunk/cpp/lib/jrnl.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Dec 19 15:36:48 EST 2007
Author: kpvdr
Date: 2007-12-19 15:36:48 -0500 (Wed, 19 Dec 2007)
New Revision: 1523
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
Log:
Fix for BZ 425211. Large files exposed incorrect initialization of read manger counters.
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-19 20:36:48 UTC (rev 1523)
@@ -150,8 +150,8 @@
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
- // Debug info; may be useful to print with a flag
- // std::cout << _rcvdat.to_string(_jid) << std::endl;
+ // Debug info; should be sent to log file
+ // std::cout << _rcvdat.to_string(_jid) << std::endl << std::flush;
if (_datafh)
{
@@ -519,14 +519,14 @@
rec_hdr h;
if (!jfile_cycle(fid, ifsp, lowi, rd, true))
return false;
- std::streampos read_pos = ifsp->tellg();
+ std::streampos file_pos = ifsp->tellg();
ifsp->read((char*)&h, sizeof(rec_hdr));
switch(h._magic)
{
case RHM_JDAT_ENQ_MAGIC:
{
enq_rec er;
- if (!decode(er, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+ if (!decode(er, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
return false;
if (!er.is_transient()) // Ignore transient msgs
{
@@ -547,7 +547,7 @@
case RHM_JDAT_DEQ_MAGIC:
{
deq_rec dr;
- if (!decode(dr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+ if (!decode(dr, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
return false;
if (dr.xid_size())
{
@@ -580,7 +580,7 @@
case RHM_JDAT_TXA_MAGIC:
{
txn_rec ar;
- if (!decode(ar, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+ if (!decode(ar, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
return false;
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
@@ -607,7 +607,7 @@
case RHM_JDAT_TXC_MAGIC:
{
txn_rec cr;
- if (!decode(cr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+ if (!decode(cr, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
return false;
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
@@ -638,27 +638,21 @@
rd._eo = ifsp->tellg();
return false;
default:
- // Is this the last file, if so, stop as this is the overwrite boundary.
- if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
- {
- rd._lfid = fid;
- rd._eo = read_pos;
- return false;
- }
- std::ostringstream oss;
- oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
- oss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
- throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, oss.str(), "jcntl",
- "rcvr_get_next_record");
+ // Stop as this is the overwrite boundary.
+ rd._lfid = fid;
+ rd._eo = file_pos;
+ return false;
}
return true;
}
const bool
jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read, rec_hdr& h,
- bool& lowi, rcvdat& rd, std::streampos& rec_offset)
+ bool& lowi, rcvdat& rd, std::streampos& file_offs)
{
- if (!check_owi(fid, h, lowi, rd, rec_offset))
+ u_int16_t start_fid = fid;
+ std::streampos start_file_offs = file_offs;
+ if (!check_owi(fid, h, lowi, rd, file_offs))
return false;
bool done = false;
while (!done)
@@ -668,9 +662,9 @@
{
if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
- check_journal_alignment(fid, rec_offset);
- rd._lfid = fid;
- rd._eo = rec_offset;
+ check_journal_alignment(start_fid, start_file_offs);
+ rd._lfid = start_fid;
+ rd._eo = start_file_offs;
return false;
}
if (!jfile_cycle(fid, ifsp, lowi, rd, false))
@@ -728,22 +722,22 @@
}
const bool
-jcntl::check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& read_pos)
+jcntl::check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& file_pos)
{
if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed
{
u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
if (fid == expected_fid)
{
- check_journal_alignment(fid, read_pos);
+ check_journal_alignment(fid, file_pos);
rd._lfid = fid;
- rd._eo = read_pos;
+ rd._eo = file_pos;
return false;
}
std::ostringstream oss;
oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
oss << " fid=0x" << std::setw(4) << fid << " rid=0x" << std::setw(8) << h._rid;
- oss << " foffs=0x" << std::setw(8) << read_pos;
+ oss << " foffs=0x" << std::setw(8) << file_pos;
oss << " expected_fid=0x" << std::setw(4) << expected_fid;
throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl",
"check_owi");
@@ -755,14 +749,14 @@
void
-jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& file_pos)
{
- unsigned sblk_offs = rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+ unsigned sblk_offs = file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
if (sblk_offs)
{
// TODO: Connect the following with logger:
std::cout << std::hex << "INFO: Bad record alignment found at fid=0x" << fid <<
- " offs=0x" << rec_offset << " (likely journal overwrite boundary); " <<
+ " offs=0x" << file_pos << " (likely journal overwrite boundary); " <<
(JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) <<
" filler record(s) required." << std::endl;
const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
@@ -773,7 +767,7 @@
std::ios_base::in | std::ios_base::out | std::ios_base::binary);
if (!ofsp.good())
throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment");
- ofsp.seekp(rec_offset);
+ ofsp.seekp(file_pos);
void* buff = ::malloc(JRNL_DBLK_SIZE);
assert(buff != NULL);
::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
@@ -782,13 +776,13 @@
// clear should inspection of the file be required.
::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
- while (rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
+ while (file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
{
ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
assert(!ofsp.fail());
// TODO: Connect the following with logger:
- std::cout << "INFO: * Wrote filler record at offs=0x" << rec_offset << std::endl;
- rec_offset = ofsp.tellp();
+ std::cout << "INFO: * Wrote filler record at offs=0x" << file_pos << std::endl;
+ file_pos = ofsp.tellp();
}
ofsp.close();
::free(buff);
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-12-19 20:36:48 UTC (rev 1523)
@@ -108,15 +108,19 @@
if (::stat(_fname.c_str(), &s))
{
#endif
- if (ro)
+ if (ro) // Recovery initialization: set counters only
{
if (!ro->_empty)
{
+ // For first file only, set read counters to ahead of file header
if (ro->_ffid == _fid)
{
_rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
_rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
}
+
+ // For last file only, set write counters to end of last record (the
+ // continuation point); for all others, set to eof.
if (ro->_lfid == _fid)
{
_wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
@@ -127,19 +131,20 @@
_wr_subm_cnt_dblks = _ffull_dblks;
_wr_cmpl_cnt_dblks = _ffull_dblks;
}
+ // Set the number of enqueued records for this file.
_rec_enqcnt = ro->_enq_cnt_list[_fid];
}
}
- else
+ else // Normal initialization: create empty journal files
{
const size_t sblksize = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
const size_t writesize = sblksize * JRNL_WMGR_PAGE_SIZE;
// NOTE: The journal file size is always one sblock bigger than the specified journal
- // file size, which is the data content size. The extra
- // block is for the journal file header which precedes all data on each file and is
- // exactly one softblock in size.
+ // file size, which is the data content size. The extra block is for the journal file
+ // header which precedes all data on each file and is exactly one sblock in size.
u_int32_t nsblks = jfsize_sblks + 1;
+
void* nullbuf = NULL;
if (::posix_memalign(&nullbuf, sblksize, writesize))
{
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-12-19 20:36:48 UTC (rev 1523)
@@ -66,7 +66,15 @@
_cb = rd_cb;
initialize();
if (fro)
- _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+ {
+ u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+ _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+ _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+ _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+ _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+ _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+ }
}
const iores
@@ -444,9 +452,16 @@
void
rmgr::recover_complete(size_t fro)
{
- _pg_index = 0;
- _pg_cntr = 0;
- _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+ if (fro)
+ {
+ u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+ _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+ _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+ _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+ _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+ _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+ }
}
void
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-19 20:36:48 UTC (rev 1523)
@@ -74,7 +74,7 @@
#ifdef DRHM_TESTVALS
_rid = u_int64_t(0xffeeddcc) << 32;
#else
- _rid = 0;
+ _rid = 0ULL;
#endif
_reset_ok = false;
}
More information about the rhmessaging-commits
mailing list