[rhmessaging-commits] rhmessaging commits: r2342 - in store/branches/mrg-1.0/cpp: tests/jrnl/jtt and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Aug 21 16:24:57 EDT 2008
Author: kpvdr
Date: 2008-08-21 16:24:57 -0400 (Thu, 21 Aug 2008)
New Revision: 2342
Modified:
store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Fix for BZ459627 "Journal recover fails if last record of last file coincides with eof". Also removed the non-TPL transaction abort code, and corrected jcntl logic shortcommings for handling these without explicit aborts.
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -351,6 +351,8 @@
if (size_read < _deq_hdr._xidsize - offs)
{
assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
return false;
}
}
@@ -365,6 +367,8 @@
if (size_read < sizeof(rec_tail) - offs)
{
assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
return false;
}
}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -469,6 +469,8 @@
if (size_read < _enq_hdr._xidsize - offs)
{
assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
return false;
}
}
@@ -484,6 +486,8 @@
if (size_read < _enq_hdr._dsize - offs)
{
assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
return false;
}
}
@@ -501,6 +505,8 @@
if (size_read < sizeof(rec_tail) - offs)
{
assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
return false;
}
}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -209,7 +209,7 @@
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
- _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
_rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
@@ -226,7 +226,7 @@
for (u_int16_t i=0; i<_num_jfiles; i++)
_datafh[i]->reset(&_rcvdat);
_wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
- _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
_rmgr.recover_complete();
_readonly_flag = false;
}
@@ -627,12 +627,7 @@
std::ifstream ifs;
bool lowi = rd._owi; // local copy of owi to be used during analysis
while (rcvr_get_next_record(fid, &ifs, lowi, rd)) ;
-
- // Check for journal full condition
- u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
- if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
- rd._full = true;
-
+
if (!rd._empty && prep_txn_list_ptr)
{
std::vector<std::string> xid_list;
@@ -651,11 +646,14 @@
else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
_emap.unlock(i->_drid);
}
- // Write abort record to disk
- rcvr_write_abort(rd, *itr);
}
}
}
+
+ // Check for journal full condition
+ u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
+ if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
+ rd._full = true;
}
}
@@ -732,7 +730,7 @@
{
try
{
- u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+ u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid(), true);
rd._enq_cnt_list[enq_fid]--;
}
catch(const jexception& e)
@@ -816,6 +814,8 @@
{
u_int32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr));
ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(rec_hdr));
+ if (!jfile_cycle(fid, ifsp, lowi, rd, false))
+ return false;
}
break;
case 0:
@@ -868,6 +868,7 @@
if (ifsp->eof() || !ifsp->good())
{
rd._eo = ifsp->tellg(); // remember file offset before closing
+ assert(rd._eo != 0xffffffffffffffff); // Check for error code -1
ifsp->close();
if (++fid >= _num_jfiles)
{
@@ -979,107 +980,5 @@
}
}
-// TODO - FIXME - Unify the recover and normal aio write methods.
-// Normally, writes are not performed during recover mode (journal is in read-only
-// mode) so initialization of the aio write controllers is deferred until recover
-// is complete. Currenlty because journal is still in recover mode when
-// rcvr_write_abort() is called, normal writes are not possible, so std::ofstream
-// writes are used instead. Lots of logic duplication!
-void
-jcntl::rcvr_write_abort(rcvdat& rd, std::string xid)
-{
- const u_int32_t sblk_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
- const u_int32_t amagic = RHM_JDAT_TXA_MAGIC;
- const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
-
- // Check last record ends on sblk boundary
- assert(rd._eo % sblk_size == 0);
-
- // Find fid and posn to write
- u_int16_t fid = rd._lfid;
- std::streampos file_pos = rd._eo;
- if (rd._eo/sblk_size >= _jfsize_sblks) // file full, use next file
- {
- if (++fid >= _num_jfiles)
- {
- fid = 0;
- rd._owi = !rd._owi;
- }
- file_pos = 0;
- }
-
- // Prepare a buffer of at least 1 sblock
- u_int32_t abort_dblks = txn_rec::size_dblks(sizeof(txn_hdr) + xid.size());
- std::size_t buffsize = abort_dblks < JRNL_SBLK_SIZE ? JRNL_SBLK_SIZE : abort_dblks;
- void* buff = std::malloc(buffsize * JRNL_DBLK_SIZE);
- assert(buff != 0);
-
- // Initialize file stream
- std::ostringstream fn;
- fn << _jdir.dirname() << "/" << _base_filename << ".";
- fn << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
- std::ofstream ofs(fn.str().c_str(),
- std::ios_base::in | std::ios_base::out | std::ios_base::binary);
- if (!ofs.good())
- throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl", "rcvr_write_abort");
- if (file_pos)
- ofs.seekp(file_pos);
- else
- {
- // New file, write new file header
- file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, ++rd._h_rid, fid, sblk_size, rd._owi, true);
- ofs.write((const char*)&fh, sizeof(file_hdr));
- assert(!ofs.fail());
- // fill remainder of sblk with fill char
- std::memset((char*)buff, RHM_CLEAN_CHAR, sblk_size - sizeof(file_hdr));
- ofs.write((const char*)buff, sblk_size - sizeof(file_hdr));
- assert(!ofs.fail());
- // log write action
- std::ostringstream oss;
- oss << "Recover phase write: File header in fid=" << fid << " at offs=0x0 for txn abort record";
- this->log(LOG_NOTICE, oss.str());
- file_pos = ofs.tellp();
- }
-
- // Write abort record
- txn_rec ar(amagic, ++rd._h_rid, xid.data(), xid.size(), rd._owi);
- ar.encode(buff, 0, abort_dblks);
- ofs.write((const char*)buff, JRNL_DBLK_SIZE);
- assert(!ofs.fail());
- // log write action
- std::ostringstream oss;
- oss << std::hex << std::setfill('0') << "Recover phase write: Aborted unprepared transaction xid=\"";
- for (std::size_t i=0; i<xid.size(); i++)
- {
- int c = xid.at(i);
- if (std::isprint(c))
- oss << (char)c;
- else
- oss << "\\x" << std::setw(2) << c;
- }
- oss << "\" at offs=0x" << file_pos;
- this->log(LOG_NOTICE, oss.str());
- file_pos = ofs.tellp();
-
- // Prepare filler record
- std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
- std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
- // Write filler as many times as needed to get to next sblk boundary
- while (file_pos % sblk_size)
- {
- ofs.write((const char*)buff, JRNL_DBLK_SIZE);
- assert(!ofs.fail());
- std::ostringstream oss;
- oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
- this->log(LOG_NOTICE, oss.str());
- file_pos = ofs.tellp();
- }
- rd._eo = file_pos;
-
- // Clean up
- ofs.close();
- std::free(buff);
-}
-
} // namespace journal
} // namespace rhm
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -679,8 +679,6 @@
std::streampos& read_pos);
void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
-
- void rcvr_write_abort(rcvdat& rd, std::string xid);
};
} // namespace journal
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -374,7 +374,7 @@
if (_jc->unflushed_dblks() > 0)
_jc->flush();
else
- return RHM_IORES_EMPTY;
+ return RHM_IORES_EMPTY;
}
// Check write state of this token is ENQ - required for read
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -50,15 +50,27 @@
rrfc::~rrfc() {}
void
-rrfc::initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index)
+rrfc::initialize(const u_int16_t nfiles, nlfh** fh_arr, rcvdat* rdp)
{
_nfiles = nfiles;
_fh_arr = fh_arr;
- reset(fh_index);
+ if (rdp)
+ {
+ // Find first file with enqueued records
+ u_int16_t index = rdp->_ffid;
+ while (index != rdp->_lfid && rdp->_enq_cnt_list[index] == 0)
+ {
+ if (++index >= _nfiles)
+ index = 0;
+ }
+ reset(index);
+ }
+ else
+ reset(0);
}
void
-rrfc::reset(u_int32_t fh_index)
+rrfc::reset(u_int16_t fh_index)
{
_fh_index = fh_index;
_curr_fh = _fh_arr[_fh_index];
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -70,11 +70,12 @@
* \param nfiles Number of files in the rotating file group.
* \param fh_arr Pointer to an array of file handles (nlogging_fh or subclasses),
* each of which correspond to one of the physical files.
- * \param fh_index Initial index of journal file. Default = 0.
+ * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
+ * 0 (NULL).
*/
- virtual void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
+ virtual void initialize(const u_int16_t nfiles, nlfh** fh_arr, rcvdat* rdp = 0);
- void reset(u_int32_t fh_index = 0);
+ void reset(u_int16_t fh_index = 0);
/**
* \brief Rotate active file handle to next file in rotating file group.
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp 2008-08-21 20:24:57 UTC (rev 2342)
@@ -339,6 +339,8 @@
if (size_read < _txn_hdr._xidsize - offs)
{
assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
return false;
}
}
@@ -352,6 +354,8 @@
if (size_read < sizeof(rec_tail) - offs)
{
assert(ifsp->eof());
+ // As we may have read past eof, turn off fail bit
+ ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
return false;
}
}
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-21 20:24:57 UTC (rev 2342)
@@ -104,19 +104,20 @@
raise Exception('Inconsistent data size: dsize=%d, data(%d)=\"%s\"' % (dsize, len(data), datastr))
return 'data(%d)=\"%s\" ' % (dsize, datastr)
-def hex_split_str(s):
- hex_str = ''
- if len(s) > 10:
- for i in range(0, 5):
- hex_str += '%02x ' % ord(s[i])
- hex_str += '... '
- for i in range(len(s)-5, len(s)):
- hex_str += '%02x ' % ord(s[i])
- else:
- for i in range(0, len(s)):
- hex_str += '%02x ' % ord(s[i])
- return hex_str
+def hex_split_str(s, split_size = 50):
+ if len(s) <= split_size:
+ return hex_str(s, 0, len(s))
+ return hex_str(s, 0, 10) + ' ... ' + hex_str(s, len(s)-10, len(s))
+def hex_str(s, b, e):
+ o = ''
+ for i in range(b, e):
+ if isprintable(s[i]):
+ o += s[i]
+ else:
+ o += '\\%02x' % ord(s[i])
+ return o
+
def split_str(s):
if len(s) > 25:
return s[:12] + ' ... ' + s[-10:]
@@ -540,7 +541,7 @@
if hdr.xid == None:
if hdr.deq_rid in self.emap:
if self.emap[hdr.deq_rid][2]:
- warn = ' (WARNING: dequeue rid %d dequeues locked enqueue records %d)' % (hdr.rid, hdr.deq_rid)
+ warn = ' (WARNING: dequeue rid 0x%x dequeues locked enqueue record 0x%x)' % (hdr.rid, hdr.deq_rid)
del self.emap[hdr.deq_rid]
else:
warn = ' (WARNING: rid being dequeued %d not found in enqueued records)' % hdr.deq_rid
More information about the rhmessaging-commits
mailing list