[rhmessaging-commits] rhmessaging commits: r1442 - store/trunk/cpp/lib/jrnl.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Dec 7 12:09:35 EST 2007
Author: kpvdr
Date: 2007-12-07 12:09:35 -0500 (Fri, 07 Dec 2007)
New Revision: 1442
Modified:
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jrec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Fix for BZ413021 - Journal recovery fails... This includes new decode functions that examine the record tails for incomplete and corrupt records, and a new check for sblk alignment of the last record. The recover process now writes filler records to the journal to get it back to a recoverable and usable state.
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -135,8 +135,8 @@
return "SKIP_PART";
case READ:
return "READ";
+ // Not using default: forces compiler to ensure all cases are covered.
}
- // Not using default: forces compiler to ensure all cases are covered.
return "<rstate unknown>";
}
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -33,8 +33,6 @@
#ifndef rhm_journal_data_tok_hpp
#define rhm_journal_data_tok_hpp
-#include <qpid/RefCounted.h>
-
namespace rhm
{
namespace journal
@@ -54,6 +52,7 @@
#include <boost/intrusive_ptr.hpp>
#include <pthread.h>
#include <qpid/broker/PersistableMessage.h>
+#include <qpid/RefCounted.h>
#include <sys/types.h>
#include <jrnl/jexception.hpp>
@@ -68,7 +67,7 @@
* \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
* I/O process
*/
-class data_tok : public qpid::RefCounted
+ class data_tok : public qpid::RefCounted
{
public:
// TODO: Fix this, separate write state from operation
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -315,23 +315,12 @@
}
return size_dblks(rd_cnt);
}
-
+
const bool
deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs) // Contunue decoding xid from previous decode call
+ if (rec_offs == 0)
{
- ifsp->read((char*)_buff + rec_offs, _deq_hdr._xidsize - rec_offs);
- size_t size_read = ifsp->gcount();
- if (size_read < _deq_hdr._xidsize - rec_offs)
- {
- assert(ifsp->eof());
- rec_offs += size_read;
- return false;
- }
- }
- else // Start at beginning of record
- {
_deq_hdr._hdr.copy(h);
ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t));
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
@@ -341,22 +330,44 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
#endif
+ rec_offs = sizeof(_deq_hdr);
+ // Read header, allocate (if req'd) for xid
if (_deq_hdr._xidsize)
{
_buff = ::malloc(_deq_hdr._xidsize);
- MALLOC_CHK(_buff, "_buff", "deq_rec", "rcv_decode");
- // Decode xid
- ifsp->read((char*)_buff, _deq_hdr._xidsize);
- size_t size_read = ifsp->gcount();
- if (size_read < _deq_hdr._xidsize)
- {
- assert(ifsp->eof());
- rec_offs = size_read;
- return false;
- }
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) - _deq_hdr._xidsize);
+ if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ size_t offs = rec_offs - sizeof(_deq_hdr);
+ ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _deq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ if (rec_offs < sizeof(_deq_hdr) +
+ (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail) : 0))
+ {
+ // Read tail (or continue reading tail)
+ size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize;
+ ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail) - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail) - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ if (_deq_hdr._xidsize)
+ chk_tail(); // Throws if tail invalid or record incomplete
return true;
}
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -434,19 +434,9 @@
const bool
enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs) // Contunue decoding xid from previous decode call
+ if (rec_offs == 0)
{
- ifsp->read((char*)_buff + rec_offs, _enq_hdr._xidsize - rec_offs);
- size_t size_read = ifsp->gcount();
- if (size_read < _enq_hdr._xidsize - rec_offs)
- {
- assert(ifsp->eof());
- rec_offs += size_read;
- return false;
- }
- }
- else // Start at beginning of record
- {
+ // Read header, allocate (if req'd) for xid
_enq_hdr._hdr.copy(h);
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
@@ -462,22 +452,60 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler1
#endif
+ rec_offs = sizeof(_enq_hdr);
if (_enq_hdr._xidsize)
{
_buff = ::malloc(_enq_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
- // Decode xid
- ifsp->read((char*)_buff, _enq_hdr._xidsize);
+ }
+ }
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ size_t offs = rec_offs - sizeof(_enq_hdr);
+ ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._xidsize - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ if (!_enq_hdr.is_external())
+ {
+ if (rec_offs < sizeof(hdr) + _enq_hdr._xidsize + _enq_hdr._dsize)
+ {
+ // Ignore data (or continue ignoring data)
+ size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ ifsp->ignore(_enq_hdr._dsize - offs);
size_t size_read = ifsp->gcount();
- if (size_read < _enq_hdr._xidsize)
+ rec_offs += size_read;
+ if (size_read < _enq_hdr._dsize - offs)
{
assert(ifsp->eof());
- rec_offs = size_read;
return false;
}
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) - _enq_hdr._xidsize);
+ if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +
+ (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize) + sizeof(rec_tail))
+ {
+ // Read tail (or continue reading tail)
+ size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+ if (!_enq_hdr.is_external())
+ offs -= _enq_hdr._dsize;
+ ifsp->read((char*)&_enq_tail + offs, sizeof(rec_tail) - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail) - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ chk_tail(); // Throws if tail invalid or record incomplete
return true;
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -1,34 +1,34 @@
/**
- * \file jcntl.cpp
- *
- * Red Hat Messaging - Message Journal
- *
- * Messaging journal top-level control and interface class
- * rhm::journal::jcntl. See comments in file jcntl.hpp for details.
- *
- * \author Kim van der Riet
- *
- * Copyright 2007 Red Hat, Inc.
- *
- * This file is part of Red Hat Messaging.
- *
- * Red Hat Messaging is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- * USA
- *
- * The GNU Lesser General Public License is available in the file COPYING.
- */
+* \file jcntl.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* Messaging journal top-level control and interface class
+* rhm::journal::jcntl. See comments in file jcntl.hpp for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
#include <jrnl/jcntl.hpp>
@@ -51,7 +51,7 @@
// Functions
jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
- const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
_jid(jid),
_jdir(jdir, base_filename),
_base_filename(base_filename),
@@ -90,7 +90,7 @@
void
jcntl::initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
- std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
+ std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
{
// Prepare journal dir, journal files and file handles
_jdir.clear_dir();
@@ -132,21 +132,22 @@
void
jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
- const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- throw (jexception)
+ const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+ throw (jexception)
{
// Verify journal dir and journal files
_jdir.verify_dir();
_rcvdat.reset();
_emap.clear();
_tmap.clear();
+//std::cout << "Starting journal analysis..." << std::endl;
rcvr_janalyze(_rcvdat, prep_txn_list);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
- // Debug info, but may be useful to print with a flag
- //_rcvdat.print(_jid);
+// Debug info, but may be useful to print with a flag
+//_rcvdat.print(_jid);
if (_datafh)
{
@@ -177,6 +178,7 @@
_readonly_flag = true;
_init_flag = true;
+//std::cout << "Journal analysis complete." << std::endl;
}
void
@@ -190,7 +192,7 @@
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete(_rcvdat._fro);
_readonly_flag = false;
- //std::cout << "Journal revovery complete." << std::endl;
+//std::cout << "Journal revovery complete." << std::endl;
}
void
@@ -203,8 +205,8 @@
const iores
jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_data_record");
@@ -212,7 +214,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
- false);
+ false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -221,7 +223,7 @@
const iores
jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
- throw (jexception)
+ throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_data_record");
@@ -237,8 +239,8 @@
const iores
jcntl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient) throw (jexception)
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_tx_data_record");
@@ -246,7 +248,7 @@
try
{
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false);
+ transient, false);
}
catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
@@ -255,7 +257,7 @@
const iores
jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
- const std::string& xid, const bool transient) throw (jexception)
+ const std::string& xid, const bool transient) throw (jexception)
{
iores res;
check_wstatus("enqueue_extern_txn_data_record");
@@ -271,7 +273,7 @@
const iores
jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
- const void** const data, bool auto_discard) throw (jexception)
+ const void** const data, bool auto_discard) throw (jexception)
{
check_rstatus("get_data_record");
return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
@@ -286,7 +288,7 @@
const iores
jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
+ bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
{
check_rstatus("read_data");
return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
@@ -466,14 +468,14 @@
_num_jfiles = ji.num_jfiles();
_rcvdat._enq_cnt_list.resize(_num_jfiles);
std::cout << "WARNING: Recovery found " << _num_jfiles <<
- " files (different from --num-jfiles parameter value)." << std::endl;
+ " files (different from --num-jfiles parameter value)." << std::endl;
}
if (_jfsize_sblks != ji.jfsize_sblks())
{
_jfsize_sblks = ji.jfsize_sblks();
std::cout << "WARNING: Recovery found file size = " <<
- (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
- " (different from --jfile-size-pgs parameter value)." << std::endl;
+ (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+ " (different from --jfile-size-pgs parameter value)." << std::endl;
}
try
@@ -499,16 +501,16 @@
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
- itr++)
- {
- std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
- prep_txn_list.end(), *itr);
- if (pitr == prep_txn_list.end())
- _tmap.get_remove_tdata_list(*itr);
- }
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
+ itr++)
+ {
+ std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), *itr);
+ if (pitr == prep_txn_list.end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
@@ -516,7 +518,6 @@
jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception)
{
size_t cum_size_read = 0;
- bool done = false;
void* xidp = NULL;
hdr h;
if (!jfile_cycle(fid, ifsp, rd, true))
@@ -525,159 +526,159 @@
ifsp->read((char*)&h, sizeof(hdr));
switch(h._magic)
{
- case RHM_JDAT_ENQ_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- enq_rec er;
- while (!done)
+ case RHM_JDAT_ENQ_MAGIC:
{
- done = er.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
+ enq_rec er;
+ if (!decode(er, fid, ifsp, cum_size_read, h, rd, read_pos))
return false;
+ if (!er.is_transient()) // Ignore transient msgs
+ {
+ rd._enq_cnt_list[fid]++;
+ if (er.xid_size())
+ {
+ er.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, er.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+ ::free(xidp);
+ }
+ else
+ _emap.insert_fid(h._rid, fid);
+ }
}
- if (!er.is_transient()) // Ignore transient msgs
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
{
- rd._enq_cnt_list[fid]++;
- if (er.xid_size())
+ deq_rec dr;
+ if (!decode(dr, fid, ifsp, cum_size_read, h, rd, read_pos))
+ return false;
+ if (dr.xid_size())
{
- er.get_xid(&xidp);
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ try { _emap.lock(dr.deq_rid()); }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ dr.get_xid(&xidp);
assert(xidp != NULL);
- std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+ std::string xid((char*)xidp, dr.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
::free(xidp);
}
else
- _emap.insert_fid(h._rid, fid);
+ {
+ try
+ {
+ u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+ rd._enq_cnt_list[enq_fid]--;
+ }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ }
}
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- deq_rec dr;
- while (!done)
+ break;
+ case RHM_JDAT_TXA_MAGIC:
{
- done = dr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
+ txn_rec ar;
+ if (!decode(ar, fid, ifsp, cum_size_read, h, rd, read_pos))
return false;
- }
- if (dr.xid_size())
- {
- // If the enqueue is part of a pending txn, it will not yet be in emap
- try { _emap.lock(dr.deq_rid()); }
- catch(const jexception& e)
+ // Delete this txn from tmap, unlock any locked records in emap
+ ar.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, ar.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
+ }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ if (itr->_enq_flag)
+ rd._enq_cnt_list[itr->_fid]--;
}
- dr.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
::free(xidp);
}
- else
+ break;
+ case RHM_JDAT_TXC_MAGIC:
{
- try
+ txn_rec cr;
+ if (!decode(cr, fid, ifsp, cum_size_read, h, rd, read_pos))
+ return false;
+ // Delete this txn from tmap, process records into emap
+ cr.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, cr.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
- rd._enq_cnt_list[enq_fid]--;
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ rd._enq_cnt_list[fid]--;
+ }
}
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
+ ::free(xidp);
}
- }
- break;
- case RHM_JDAT_TXA_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
- return false;
- txn_rec ar;
- while (!done)
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
{
- done = ar.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
+ u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+ ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
- // Delete this txn from tmap, unlock any locked records in emap
- ar.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ break;
+ case 0:
+ rd._lfid = fid;
+ rd._eo = ifsp->tellg();
+ return false;
+ default:
+ // Is this the last file, if so, stop as this is the overwrite boundary.
+ if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
{
- try
- {
- if (!itr->_enq_flag)
- _emap.unlock(itr->_drid);
- }
- catch(const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- if (itr->_enq_flag)
- rd._enq_cnt_list[itr->_fid]--;
- }
- ::free(xidp);
- }
- break;
- case RHM_JDAT_TXC_MAGIC:
- {
- if (!check_owi(fid, h, rd, read_pos))
+ rd._lfid = fid;
+ rd._eo = read_pos;
return false;
- txn_rec cr;
- while (!done)
- {
- done = cr.rcv_decode(h, ifsp, cum_size_read);
- if (!jfile_cycle(fid, ifsp, rd, true))
- return false;
}
- // Delete this txn from tmap, process records into emap
- cr.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
- {
- if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
- else // txn dequeue
- {
- u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
- rd._enq_cnt_list[fid]--;
- }
- }
- ::free(xidp);
- }
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- {
- u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
- ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
- }
- break;
- case 0:
- rd._lfid = fid;
- rd._eo = ifsp->tellg();
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+ ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
+ throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
+ "rcvr_get_next_record");
+ }
+ return true;
+}
+
+const bool
+jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read, hdr& h,
+ rcvdat& rd, std::streampos& rec_offset)
+{
+ if (!check_owi(fid, h, rd, rec_offset))
return false;
- default:
- // Is this the last file, if so, stop as this is the overwrite boundary.
- if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
+ bool done = false;
+ while (!done)
+ {
+ try { done = rec.rcv_decode(h, ifsp, cum_size_read); }
+ catch (const jexception& e)
{
+ if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+ fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+ check_journal_alignment(fid, rec_offset);
rd._lfid = fid;
- rd._eo = read_pos;
+ rd._eo = rec_offset;
return false;
}
- std::stringstream ss;
- ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
- ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
- throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
- "rcvr_get_next_record");
+ if (!jfile_cycle(fid, ifsp, rd, false))
+ return false;
}
-
return true;
}
@@ -705,7 +706,7 @@
std::stringstream ss;
ss << _jdir.dirname() << "/" << _base_filename << ".";
ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
- ifsp->open(ss.str().c_str());
+ ifsp->open(ss.str().c_str(), std::ios_base::in | std::ios_base::binary);
if (!ifsp->good())
throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl", "jfile_cycle");
@@ -730,13 +731,15 @@
}
const bool
-jcntl::check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos) throw (jexception)
+jcntl::check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
+ throw (jexception)
{
if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
{
u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
if (fid == expected_fid)
{
+ check_journal_alignment(fid, read_pos);
rd._lfid = fid;
rd._eo = read_pos;
return false;
@@ -747,43 +750,90 @@
ss << " foffs=0x" << std::setw(8) << read_pos;
ss << " expected_fid=0x" << std::setw(4) << expected_fid;
throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, ss.str().c_str(), "jcntl",
- "check_owi");
+ "check_owi");
}
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
return true;
}
+
void
+jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+ throw (jexception)
+{
+ unsigned sblk_offs = rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+ if (sblk_offs)
+ {
+ // TODO: Connect the following with logger:
+ std::cout << std::hex << "INFO: Bad record alignment found at fid=0x" << fid <<
+ " offs=0x" << rec_offset << " (likely journal overwrite boundary); " <<
+ (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) <<
+ " filler record(s) required." << std::endl;
+ const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+ std::stringstream ss;
+ ss << _jdir.dirname() << "/" << _base_filename << ".";
+ ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+ std::ofstream ofsp(ss.str().c_str(),
+ std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ if (!ofsp.good())
+ throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl",
+ "check_journal_alignment");
+ ofsp.seekp(rec_offset);
+ void* buff = ::malloc(JRNL_DBLK_SIZE);
+ assert(buff != NULL);
+ ::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+ // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
+ // situation (i.e. performance is not an issue), and it makes the location of the write
+ // clear should inspection of the file be required.
+ ::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+
+ while (rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
+ {
+ ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
+ assert(!ofsp.fail());
+ // TODO: Connect the following with logger:
+ std::cout << "INFO: * Wrote filler record at offs=0x" << rec_offset << std::endl;
+ rec_offset = ofsp.tellp();
+ }
+ ofsp.close();
+ ::free(buff);
+ // TODO: Connect the following with logger:
+ std::cout << "INFO: Bad record alignment fixed." << std::endl;
+ }
+}
+
+
+void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
- //kpvdr TODO -- this list needs to be mutexed...???
+//kpvdr TODO -- this list needs to be mutexed...???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
- journal->_aio_wr_cmpl_dtok_list.end());
+ journal->_aio_wr_cmpl_dtok_list.end());
journal->_aio_wr_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- switch (dtokp->wstate())
- {
- case data_tok::ENQ:
- dtokp->getSourceMessage()->enqueueComplete();
- break;
- case data_tok::DEQ:
- /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
- dtokp->getSourceMessage()->dequeueComplete();
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
- */
- break;
- default:
- ;
- }
- }
- dtokp->release();
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+ break;
+ default:
+ ;
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
}
@@ -792,21 +842,21 @@
jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
{
- //kpvdr TODO -- can we get rid of the copy???
+//kpvdr TODO -- can we get rid of the copy???
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
- journal->_aio_rd_cmpl_dtok_list.end());
+ journal->_aio_rd_cmpl_dtok_list.end());
journal->_aio_rd_cmpl_dtok_list.clear();
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
- {
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+ {
// cct call the recovery manager. / lazyload..
- }
- }
- dtokp->release();
+ }
+ }
+ dtokp->release();
this_dtok_list.pop_front();
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -644,14 +644,22 @@
void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
throw (jexception);
- const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception);
+ const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
+ throw (jexception);
+ const bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read,
+ hdr& h, rcvdat& rd, std::streampos& rec_offset);
+
const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
const bool jump_fro);
- const bool check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos)
+ const bool check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
throw (jexception);
+
+ void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+ throw (jexception);
+
/**
* \brief Analyze a particular journal file for recovery.
*
Modified: store/trunk/cpp/lib/jrnl/jrec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jrec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -52,14 +52,14 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "enq magic NULL: rid=0x" << std::setw(16) << hdr._rid;
+ ss << "enq magic NULL: rid=0x" << hdr._rid;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
}
if (hdr._version != RHM_JDAT_VERSION)
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "version: rid=0x" << std::setw(16) << hdr._rid;
+ ss << "version: rid=0x" << hdr._rid;
ss << ": expected=0x" << std::setw(2) << (int)RHM_JDAT_VERSION;
ss << " read=0x" << std::setw(2) << (int)hdr._version;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
@@ -73,7 +73,7 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "endian_flag: rid=" << std::setw(16) << hdr._rid;
+ ss << "endian_flag: rid=" << hdr._rid;
ss << ": expected=0x" << std::setw(2) << (int)endian_flag;
ss << " read=0x" << std::setw(2) << (int)hdr._eflag;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
@@ -87,8 +87,8 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "rid mismatch: expected=0x" << std::setw(16) << rid;
- ss << " read=0x" << std::setw(16) << hdr._rid;
+ ss << "rid mismatch: expected=0x" << rid;
+ ss << " read=0x" << hdr._rid;
throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
}
}
@@ -100,17 +100,17 @@
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "magic: rid=0x" << std::setw(16) << hdr._rid;
- ss << ": expected=0x" << std::setw(8) << ~hdr._magic;
- ss << " read=0x" << std::setw(8) << tail._xmagic;
+ ss << "magic: rid=0x" << hdr._rid;
+ ss << ": expected=0x" << ~hdr._magic;
+ ss << " read=0x" << tail._xmagic;
throw jexception(jerrno::JERR_JREC_BADRECTAIL, ss.str().c_str(), "jrec", "chk_tail");
}
if (tail._rid != hdr._rid)
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "rid: rid=0x" << std::setw(16) << hdr._rid;
- ss << ": read=0x" << std::setw(16) << tail._rid;
+ ss << "rid: rid=0x" << hdr._rid;
+ ss << ": read=0x" << tail._rid;
throw jexception(jerrno::JERR_JREC_BADRECTAIL, ss.str().c_str(), "jrec", "chk_tail");
}
}
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -314,19 +314,9 @@
const bool
txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs) // Contunue decoding xid from previous decode call
+ if (rec_offs == 0)
{
- ifsp->read((char*)_buff + rec_offs, _txn_hdr._xidsize - rec_offs);
- size_t size_read = ifsp->gcount();
- if (size_read < _txn_hdr._xidsize - rec_offs)
- {
- assert(ifsp->eof());
- rec_offs += size_read;
- return false;
- }
- }
- else // Start at beginning of record
- {
+ // Read header, allocate for xid
_txn_hdr._hdr.copy(h);
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
@@ -335,19 +325,38 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
ifsp->ignore(sizeof(u_int32_t)); // _filler0
#endif
+ rec_offs = sizeof(_txn_hdr);
_buff = ::malloc(_txn_hdr._xidsize);
MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
- // Decode xid
- ifsp->read((char*)_buff, _txn_hdr._xidsize);
+ }
+ if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize)
+ {
+ // Read xid (or continue reading xid)
+ size_t offs = rec_offs - sizeof(_txn_hdr);
+ ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
size_t size_read = ifsp->gcount();
- if (size_read < _txn_hdr._xidsize)
+ rec_offs += size_read;
+ if (size_read < _txn_hdr._xidsize - offs)
{
assert(ifsp->eof());
- rec_offs = size_read;
return false;
}
}
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
+ if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize + sizeof(rec_tail))
+ {
+ // Read tail (or continue reading tail)
+ size_t offs = rec_offs - sizeof(_txn_hdr) - _txn_hdr._xidsize;
+ ifsp->read((char*)&_txn_tail + offs, sizeof(rec_tail) - offs);
+ size_t size_read = ifsp->gcount();
+ rec_offs += size_read;
+ if (size_read < sizeof(rec_tail) - offs)
+ {
+ assert(ifsp->eof());
+ return false;
+ }
+ }
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+ chk_tail(); // Throws if tail invalid or record incomplete
return true;
}
@@ -387,7 +396,7 @@
const size_t
txn_rec::rec_size() const
{
- return deq_hdr::size() + _txn_hdr._xidsize + rec_tail::size();
+ return txn_hdr::size() + _txn_hdr._xidsize + rec_tail::size();
}
void
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-07 17:09:35 UTC (rev 1442)
@@ -1003,9 +1003,9 @@
while (_cached_offset_dblks < wdblks)
{
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
- ::memcpy(wptr, (void*)&xmagic, 4);
+ ::memcpy(wptr, (void*)&xmagic, sizeof(xmagic));
#ifdef RHM_CLEAN
- ::memset((char*)wptr + 4, RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - 4);
+ ::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
#endif
_pg_offset_dblks++;
_cached_offset_dblks++;
More information about the rhmessaging-commits
mailing list