[rhmessaging-commits] rhmessaging commits: r1001 - in store/trunk/cpp: tests/jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Oct 10 17:36:41 EDT 2007
Author: kpvdr
Date: 2007-10-10 17:36:41 -0400 (Wed, 10 Oct 2007)
New Revision: 1001
Modified:
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/rtest
Log:
Transactional recover now almost complete, but there is still some testing to be done. Correlation with prepared xid list still missing.
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -84,7 +84,7 @@
_buff = NULL;
}
-u_int32_t
+const u_int32_t
deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
{
assert(wptr != NULL);
@@ -196,7 +196,7 @@
return size_dblks(wr_cnt);
}
-u_int32_t
+const u_int32_t
deq_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception)
{
@@ -318,8 +318,57 @@
return size_dblks(rd_cnt);
}
+const bool
+deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+{
+ if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ {
+ // TODO
+ }
+ else // Start at beginning of record
+ {
+ _deq_hdr._hdr.copy(h);
+ ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t));
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+ ifsp->read((char*)&_deq_hdr._xidsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+ if (_deq_hdr._xidsize)
+ {
+ _buff = ::malloc(_deq_hdr._xidsize);
+ if (_buff == NULL)
+ {
+ std::stringstream ss;
+ ss << "_buff malloc(): errno=" << errno;
+ throw jexception(jerrno::JERR__MALLOC, ss.str(), "deq_rec", "decode");
+ }
+ // Decode xid
+ ifsp->read((char*)_buff, _deq_hdr._xidsize);
+ if ((size_t)ifsp->gcount() == _deq_hdr._xidsize)
+ {
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) -
+ _deq_hdr._xidsize);
+ return true;
+ }
+ else
+ ; // TODO
+ }
+ else
+ {
+ // Igore rest of record
+ rec_offs_dblks = rec_size_dblks();
+ ifsp->ignore(rec_offs_dblks * JRNL_DBLK_SIZE - sizeof(_deq_hdr));
+ return true;
+ }
+ }
+ return false;
+}
+
const size_t
-deq_rec::get_xid(const void** const xidpp)
+deq_rec::get_xid(void** const xidpp)
{
if (!_buff)
{
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -73,11 +73,15 @@
// Prepare instance for use in writing data to journal
void reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
const size_t xidlen);
- u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception);
- u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+ u_int32_t max_size_dblks) throw (jexception);
+ // Decode used for recover
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
throw (jexception);
- const size_t get_xid(const void** const xidpp);
+ inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
+ const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
inline const size_t data_size() const { return 0; } // This record never carries data
const size_t xid_size() const;
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -98,7 +98,7 @@
_enq_tail._rid = rid;
}
-u_int32_t
+const u_int32_t
enq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
{
assert(wptr != NULL);
@@ -240,7 +240,7 @@
return size_dblks(wr_cnt);
}
-u_int32_t
+const u_int32_t
enq_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception)
{
@@ -427,6 +427,60 @@
return size_dblks(rd_cnt);
}
+const bool
+enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+{
+ if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ {
+ // TODO
+ }
+ else // Start at beginning of record
+ {
+ _enq_hdr._hdr.copy(h);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+ ifsp->read((char*)&_enq_hdr._xidsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler1
+#endif
+ ifsp->read((char*)&_enq_hdr._dsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler1
+#endif
+ if (_enq_hdr._xidsize)
+ {
+ _buff = ::malloc(_enq_hdr._xidsize);
+ if (_buff == NULL)
+ {
+ std::stringstream ss;
+ ss << "_buff malloc(): errno=" << errno;
+ throw jexception(jerrno::JERR__MALLOC, ss.str(), "deq_rec", "decode");
+ }
+ // Decode xid
+ ifsp->read((char*)_buff, _enq_hdr._xidsize);
+ if ((size_t)ifsp->gcount() == _enq_hdr._xidsize)
+ {
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) -
+ _enq_hdr._xidsize);
+ return true;
+ }
+ else
+ ; // TODO
+ }
+ else
+ {
+ // Igore rest of record
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr));
+ return true;
+ }
+ }
+ return false;
+}
+
const size_t
enq_rec::get_xid(void** const xidpp)
{
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -84,9 +84,12 @@
void reset(const u_int64_t rid, const void* const dbuf, const size_t dlen,
const void* const xidp, const size_t xidlen, bool transient);
- u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception);
- u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+ u_int32_t max_size_dblks) throw (jexception);
+ // Decode used for recover
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
throw (jexception);
const size_t get_xid(void** const xidpp);
const size_t get_data(void** const datapp);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -361,143 +361,335 @@
throw e;
}
- // Restore all read and write pointers
+ // Restore all read and write pointers and transactions
if (!rd._empty)
{
- bool eoj = false;
- for (u_int16_t fnum=0; fnum<JRNL_NUM_FILES && !eoj; fnum++)
- {
- u_int16_t fid = (fnum + rd._ffid) % JRNL_NUM_FILES;
- eoj = rcvr_fanalyze(fid, rd, prep_txn_list);
- }
+ u_int16_t fid = rd._ffid;
+ std::ifstream ifs;
+ while (rcvr_get_next_record(fid, &ifs, rd, prep_txn_list));
}
}
const bool
-jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const std::vector<std::string>& /*prep_txn_list*/)
- throw (jexception)
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+ const std::vector<std::string>& /*prep_txn_list*/) throw (jexception)
{
- bool eoj = false;
- std::stringstream ss;
- ss << _jdir.dirname() << "/" << _base_filename << ".";
- ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
-//std::cout << "rcvr_fanalyze: " << ss.str() << ":";
- std::ifstream jifs(ss.str().c_str());
- if (!jifs.good())
- throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf", "analyze");
-
- // 1. Read file header
- file_hdr fhdr;
- jifs.read((char*)&fhdr, sizeof(fhdr));
- if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
+ u_int32_t dblks_read = 0;
+ bool done = false;
+ void* xidp = NULL;
+ hdr h;
+ jfile_cycle(fid, ifsp, rd, true);
+ ifsp->read((char*)&h, sizeof(hdr));
+ switch(h._magic)
{
- assert(fhdr._fid == fid);
- if (!rd._fro)
- rd._fro = fhdr._fro;
- std::streamoff foffs = fhdr._fro;
- jifs.seekg(foffs);
-
- // 2. Read file records
- while (jifs.good() && !eoj)
- {
- hdr h;
- jifs.read((char*)&h, sizeof(hdr));
- switch(h._magic)
+ case RHM_JDAT_ENQ_MAGIC:
{
- case RHM_JDAT_ENQ_MAGIC:
+//std::cout << " e" << h._rid << std::flush;
+ enq_rec er;
+ while (!done)
{
- size_t xidsize = 0;
- size_t recsize = 0;
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- jifs.ignore(sizeof(u_int32_t));
-#endif
- jifs.read((char*)&xidsize, sizeof(size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
- jifs.ignore(sizeof(u_int32_t));
-#endif
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- jifs.ignore(sizeof(u_int32_t));
-#endif
- jifs.read((char*)&recsize, sizeof(size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
- jifs.ignore(sizeof(u_int32_t));
-#endif
+ done = er.rcv_decode(h, ifsp, dblks_read);
+ jfile_cycle(fid, ifsp, rd, false);
+ }
+ if (er.xid_size())
+ {
+//std::cout << "$" << std::flush;
+ er.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, er.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+ free(xidp);
+ }
+ else
+ {
_emap.insert_fid(h._rid, fid);
rd._enq_cnt_list[fid]++;
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
-//std::cout << " e" << h._rid;
- u_int32_t rec_dblks = jrec::size_dblks((size_t)recsize + sizeof(enq_hdr) +
- sizeof(rec_tail));
- foffs += rec_dblks * JRNL_DBLK_SIZE;
- jifs.seekg(foffs);
}
- break;
- case RHM_JDAT_DEQ_MAGIC:
+ }
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
+ {
+//std::cout << " d" << h._rid << std::flush;
+ deq_rec dr;
+ while (!done)
{
- u_int64_t drid = 0;
- size_t xidsize = 0;
- jifs.read((char*)&drid, sizeof(u_int64_t));
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- jifs.ignore(sizeof(u_int32_t));
-#endif
- jifs.read((char*)&xidsize, sizeof(size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
- jifs.ignore(sizeof(u_int32_t));
-#endif
+ done = dr.rcv_decode(h, ifsp, dblks_read);
+ jfile_cycle(fid, ifsp, rd, false);
+ }
+ if (dr.xid_size())
+ {
+//std::cout << "$" << std::flush;
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ try { _emap.lock(h._rid); }
+ catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+ dr.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, dr.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
+ free(xidp);
+ }
+ else
+ {
try
{
- _emap.get_remove_fid(drid);
+ _emap.get_remove_fid(dr.deq_rid());
rd._enq_cnt_list[fid]--;
}
- catch (jexception& e) {} // ignore JERR_EMAP_NOTFOUND thrown here
+ catch (jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw e;
+ }
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
-//std::cout << " d" << drid << ")";
- u_int32_t rec_dblks = jrec::size_dblks(sizeof(deq_hdr));
- foffs += rec_dblks * JRNL_DBLK_SIZE;
- jifs.seekg(foffs);
}
- break;
- case RHM_JDAT_TXA_MAGIC:
-//std::cout << " a";
- break;
- case RHM_JDAT_TXC_MAGIC:
-//std::cout << " c";
- break;
- case RHM_JDAT_EMPTY_MAGIC:
+ }
+ break;
+ case RHM_JDAT_TXA_MAGIC:
+ {
+//std::cout << " a" << h._rid << std::flush;
+ txn_rec ar;
+ while (!done)
{
-//std::cout << " x";
- u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
- foffs += rec_dblks * JRNL_DBLK_SIZE;
- jifs.seekg(foffs);
+ done = ar.rcv_decode(h, ifsp, dblks_read);
+ jfile_cycle(fid, ifsp, rd, false);
}
- break;
- case 0:
- rd._lfid = fid;
- rd._eo = foffs;
- if (!jifs.eof())
- eoj = true;
-//std::cout << (jifs.eof()?" <eof>":" <end>");
- break;
- default:
- std::stringstream ss;
- ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
- throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
- "rcvr_fanalyze");
+ // Delete this txn from tmap, unlock any locked records in emap
+ ar.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, ar.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ try { _emap.unlock(itr->_rid); }
+ catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+ if (itr->_enq_flag)
+ _wrfc.decr_enqcnt(itr->_fid);
+ }
+ free(xidp);
}
+ break;
+ case RHM_JDAT_TXC_MAGIC:
+ {
+//std::cout << " c" << h._rid << std::flush;
+ txn_rec cr;
+ while (!done)
+ {
+ done = cr.rcv_decode(h, ifsp, dblks_read);
+ jfile_cycle(fid, ifsp, rd, false);
+ }
+ // Delete this txn from tmap, process records into emap
+ cr.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, cr.xid_size());
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ u_int16_t fid = _emap.get_remove_fid(h._rid);
+ _wrfc.decr_enqcnt(fid);
+ }
+ }
+ free(xidp);
+ }
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
+ {
+//std::cout << " x" << std::flush;
+ u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+ ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+ }
+ break;
+ case 0:
+//std::cout << " z" << std::flush;
+ rd._lfid = fid;
+ rd._eo = ifsp->tellg();
+ return false;
+ default:
+//std::cout << " ?" << std::flush;
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+ throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
+ "rcvr_get_next_record");
+ }
+
+ return true;
+}
+
+const bool
+jcntl::jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd, const bool jump_fro)
+{
+ if (ifsp->is_open())
+ {
+ if (ifsp->eof() || !ifsp->good())
+ {
+ ifsp->close();
+ fid++;
+ if (fid >= JRNL_NUM_FILES)
+ fid = 0;
+ if (fid == rd._ffid) // used up all journal files
+ return false;
}
}
- else
+ if (!ifsp->is_open())
{
- eoj = true;
-//std::cout << " <empty>";
+ std::stringstream ss;
+ ss << _jdir.dirname() << "/" << _base_filename << ".";
+ ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+ ifsp->open(ss.str().c_str());
+ if (!ifsp->good())
+ throw jexception(jerrno::JERR__FILEIO, ss.str(), "jcntl", "rcvr_get_next_record");
+
+ // Read file header
+ file_hdr fhdr;
+ ifsp->read((char*)&fhdr, sizeof(fhdr));
+ if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
+ {
+ assert(fhdr._fid == fid);
+ if (!rd._fro)
+ rd._fro = fhdr._fro;
+ std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+ ifsp->seekg(foffs);
+ }
+ else
+ {
+ ifsp->close();
+ return false;
+ }
}
- jifs.close();
-//std::cout << std::endl;
- return eoj;
+ return true;
}
+// const bool
+// jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const std::vector<std::string>& /*prep_txn_list*/)
+// throw (jexception)
+// {
+// bool eoj = false;
+// std::stringstream ss;
+// ss << _jdir.dirname() << "/" << _base_filename << ".";
+// ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+// //std::cout << "rcvr_fanalyze: " << ss.str() << ":";
+// std::ifstream jifs(ss.str().c_str());
+// if (!jifs.good())
+// throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf", "analyze");
+//
+// // 1. Read file header
+// file_hdr fhdr;
+// jifs.read((char*)&fhdr, sizeof(fhdr));
+// if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
+// {
+// assert(fhdr._fid == fid);
+// if (!rd._fro)
+// rd._fro = fhdr._fro;
+// std::streamoff foffs = fhdr._fro;
+// jifs.seekg(foffs);
+//
+// // 2. Read file records
+// while (jifs.good() && !eoj)
+// {
+// hdr h;
+// jifs.read((char*)&h, sizeof(hdr));
+// switch(h._magic)
+// {
+// case RHM_JDAT_ENQ_MAGIC:
+// {
+// size_t xidsize = 0;
+// size_t recsize = 0;
+// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+// jifs.ignore(sizeof(u_int32_t));
+// #endif
+// jifs.read((char*)&xidsize, sizeof(size_t));
+// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+// jifs.ignore(sizeof(u_int32_t));
+// #endif
+// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+// jifs.ignore(sizeof(u_int32_t));
+// #endif
+// jifs.read((char*)&recsize, sizeof(size_t));
+// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+// jifs.ignore(sizeof(u_int32_t));
+// #endif
+// _emap.insert_fid(h._rid, fid);
+// rd._enq_cnt_list[fid]++;
+// if (rd._h_rid < h._rid)
+// rd._h_rid = h._rid;
+// //std::cout << " e" << h._rid;
+// u_int32_t rec_dblks = jrec::size_dblks((size_t)recsize + sizeof(enq_hdr) +
+// sizeof(rec_tail));
+// foffs += rec_dblks * JRNL_DBLK_SIZE;
+// jifs.seekg(foffs);
+// }
+// break;
+// case RHM_JDAT_DEQ_MAGIC:
+// {
+// u_int64_t drid = 0;
+// size_t xidsize = 0;
+// jifs.read((char*)&drid, sizeof(u_int64_t));
+// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+// jifs.ignore(sizeof(u_int32_t));
+// #endif
+// jifs.read((char*)&xidsize, sizeof(size_t));
+// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+// jifs.ignore(sizeof(u_int32_t));
+// #endif
+// try
+// {
+// _emap.get_remove_fid(drid);
+// rd._enq_cnt_list[fid]--;
+// }
+// catch (jexception& e) {} // ignore JERR_EMAP_NOTFOUND thrown here
+// if (rd._h_rid < h._rid)
+// rd._h_rid = h._rid;
+// //std::cout << " d" << drid << ")";
+// u_int32_t rec_dblks = jrec::size_dblks(sizeof(deq_hdr));
+// foffs += rec_dblks * JRNL_DBLK_SIZE;
+// jifs.seekg(foffs);
+// }
+// break;
+// case RHM_JDAT_TXA_MAGIC:
+// //std::cout << " a";
+// break;
+// case RHM_JDAT_TXC_MAGIC:
+// //std::cout << " c";
+// break;
+// case RHM_JDAT_EMPTY_MAGIC:
+// {
+// //std::cout << " x";
+// u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+// foffs += rec_dblks * JRNL_DBLK_SIZE;
+// jifs.seekg(foffs);
+// }
+// break;
+// case 0:
+// rd._lfid = fid;
+// rd._eo = foffs;
+// if (!jifs.eof())
+// eoj = true;
+// //std::cout << (jifs.eof()?" <eof>":" <end>");
+// break;
+// default:
+// std::stringstream ss;
+// ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+// throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
+// "rcvr_fanalyze");
+// }
+// }
+// }
+// else
+// {
+// eoj = true;
+// //std::cout << " <empty>";
+// }
+// jifs.close();
+// //std::cout << std::endl;
+// return eoj;
+// }
+
void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -611,16 +611,21 @@
/**
* \brief Analyze journal for recovery.
*/
- void rcvr_janalyze(rcvdat& jrs, const std::vector<std::string>& prep_txn_list)
+ void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
throw (jexception);
+ const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+ const std::vector<std::string>& prep_txn_list) throw (jexception);
+
+ const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+ const bool jump_fro);
/**
* \brief Analyze a particular journal file for recovery.
*
* \return <b><i>true</i></b> if end of journal (eoj) found; <b><i>false</i></b> otherwise.
*/
- const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs,
- const std::vector<std::string>& prep_txn_list) throw (jexception);
+// const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs,
+// const std::vector<std::string>& prep_txn_list) throw (jexception);
/**
* Intenal callback write
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -41,6 +41,7 @@
}
}
+#include <fstream>
#include <string>
#include <sys/types.h>
#include <jrnl/file_hdr.hpp>
@@ -113,8 +114,8 @@
* \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
* \returns Number of data-blocks encoded.
*/
- virtual u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
- throw (jexception) = 0;
+ virtual const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks,
+ u_int32_t max_size_dblks) throw (jexception) = 0;
/**
* \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned
@@ -146,9 +147,12 @@
* \param max_size_dblks Maximum number of data-blocks to read from pointer rptr.
* \returns Number of data-blocks read (consumed).
*/
- virtual u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+ virtual const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception) = 0;
+ virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
+ throw (jexception) = 0;
+
virtual std::string& str(std::string& str) const = 0;
virtual const size_t data_size() const = 0;
virtual const size_t xid_size() const = 0;
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -62,7 +62,7 @@
_lfid(0),
_eo(0),
_h_rid(0),
- _enq_cnt_list(JRNL_NUM_FILES),
+ _enq_cnt_list(JRNL_NUM_FILES, 0),
_edm()
{}
void reset()
@@ -73,7 +73,8 @@
_lfid=0;
_eo=0;
_h_rid=0;
- _enq_cnt_list.clear();
+ for (unsigned f=0; f<_enq_cnt_list.size(); f++)
+ _enq_cnt_list[f] = 0;
_edm.clear();
}
};
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -250,7 +250,7 @@
while (true)
{
//std::string s;
-//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
+//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " c=" << (_rrfc.is_compl()?"T":"F") << " o=" << (_rrfc.is_wr_aio_outstanding()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if any AIOs have returned
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -88,7 +88,7 @@
_txn_tail._rid = rid;
}
-u_int32_t
+const u_int32_t
txn_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
{
assert(wptr != NULL);
@@ -196,7 +196,7 @@
return size_dblks(wr_cnt);
}
-u_int32_t
+const u_int32_t
txn_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception)
{
@@ -313,8 +313,45 @@
return size_dblks(rd_cnt);
}
+const bool
+txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+{
+ if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ {
+ // TODO
+ }
+ else // Start at beginning of record
+ {
+ _txn_hdr._hdr.copy(h);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+ ifsp->read((char*)&_txn_hdr._xidsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+ _buff = ::malloc(_txn_hdr._xidsize);
+ if (_buff == NULL)
+ {
+ std::stringstream ss;
+ ss << "_buff malloc(): errno=" << errno;
+ throw jexception(jerrno::JERR__MALLOC, ss.str(), "deq_rec", "decode");
+ }
+ // Decode xid
+ ifsp->read((char*)_buff, _txn_hdr._xidsize);
+ if ((size_t)ifsp->gcount() == _txn_hdr._xidsize)
+ {
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
+ return true;
+ }
+ else
+ ; // TODO
+ }
+ return false;
+}
+
const size_t
-txn_rec::get_xid(const void** const xidpp)
+txn_rec::get_xid(void** const xidpp)
{
if (!_buff)
{
Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -73,11 +73,14 @@
// Prepare instance for use in writing data to journal
void reset(const u_int32_t magic, const u_int64_t rid, const void* const xidp,
const size_t xidlen);
- u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception);
- u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+ u_int32_t max_size_dblks) throw (jexception);
+ // Decode used for recover
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
throw (jexception);
- const size_t get_xid(const void** const xidpp);
+ const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
inline const size_t data_size() const { return 0; } // This record never carries data
const size_t xid_size() const;
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-10 21:36:41 UTC (rev 1001)
@@ -38,6 +38,8 @@
#define NUM_MSGS 5
#define MAX_AIO_SLEEPS 500
#define AIO_SLEEP_TIME 1000
+#define MSG_SIZE 100
+#define XID_SIZE 64
class JournalSystemTests : public CppUnit::TestCase
{
@@ -46,9 +48,13 @@
CPPUNIT_TEST(InitializationTest);
CPPUNIT_TEST(EmptyRecoverTest);
CPPUNIT_TEST(EnqueueTest);
+ CPPUNIT_TEST(TxnEnqueueTest);
CPPUNIT_TEST(RecoverReadTest);
+ CPPUNIT_TEST(TxnRecoverReadTest);
CPPUNIT_TEST(RecoveredReadTest);
+ CPPUNIT_TEST(TxnRecoveredReadTest);
CPPUNIT_TEST(RecoveredDequeueTest);
+// CPPUNIT_TEST(TxnRecoveredDequeueTest);
CPPUNIT_TEST(ComplexRecoveryTest1);
CPPUNIT_TEST(EncodeTest_000);
CPPUNIT_TEST(EncodeTest_001);
@@ -83,6 +89,7 @@
jtest t;
std::string msg;
+ std::string xid;
void* mbuff;
size_t msize;
void* xidbuff;
@@ -230,7 +237,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
}
catch (rhm::journal::jexception& e)
{
@@ -247,7 +254,7 @@
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
jcp->initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
delete jcp;
}
catch (rhm::journal::jexception& e)
@@ -260,6 +267,48 @@
}
}
+ void TxnEnqueueTest()
+ {
+ //Stack
+ char* test_name = "TxnEnqueueTest_Stack";
+ try
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 0, XID_SIZE);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(&jc, xid);
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ // Heap
+ test_name = "TxnEnqueueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(jcp, xid);
+ delete jcp;
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
void RecoverReadTest()
{
std::vector<std::string> txn_list;
@@ -271,7 +320,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -280,7 +329,7 @@
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
cleanMessage();
}
}
@@ -301,7 +350,7 @@
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
jcp->initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
delete jcp;
jcp = NULL;
}
@@ -314,7 +363,7 @@
read_msg(jcp);
std::string msg((char*)mbuff, msize);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
delete jcp;
@@ -330,6 +379,79 @@
}
}
+ void TxnRecoverReadTest()
+ {
+ std::vector<std::string> txn_list;
+ //Stack
+ char* test_name = "TxnRecoverReadTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 1, XID_SIZE);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(&jc, xid);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ // Heap
+ test_name = "TxnRecoverReadTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(jcp, xid);
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(jcp);
+ std::string msg((char*)mbuff, msize);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
void RecoveredReadTest()
{
std::vector<std::string> txn_list;
@@ -341,7 +463,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -350,7 +472,7 @@
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
jc.recover_complete();
@@ -358,7 +480,7 @@
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
}
@@ -379,7 +501,7 @@
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
jcp->initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
delete jcp;
jcp = NULL;
}
@@ -391,7 +513,7 @@
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
jcp->recover_complete();
@@ -399,7 +521,7 @@
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
delete jcp;
@@ -415,6 +537,94 @@
}
}
+ void TxnRecoveredReadTest()
+ {
+ std::vector<std::string> txn_list;
+ //Stack
+ char* test_name = "TxnRecoveredReadTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 2, XID_SIZE);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(&jc, xid);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ // Heap
+ test_name = "TxnRecoveredReadTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(jcp, xid);
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ jcp->recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
void RecoveredDequeueTest()
{
std::vector<std::string> txn_list;
@@ -426,7 +636,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m));
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -435,7 +645,7 @@
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
jc.recover_complete();
@@ -443,7 +653,7 @@
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
for (int m=0; m<NUM_MSGS; m++)
@@ -466,7 +676,7 @@
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
jcp->initialize();
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m));
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
delete jcp;
jcp = NULL;
}
@@ -478,7 +688,7 @@
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
jcp->recover_complete();
@@ -486,7 +696,7 @@
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
for (int m=0; m<NUM_MSGS; m++)
@@ -504,6 +714,98 @@
}
}
+ void TxnRecoveredDequeueTest()
+ {
+ std::vector<std::string> txn_list;
+ //Stack
+ char* test_name = "TxnRecoveredDequeueTest_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 3, XID_SIZE);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(&jc, xid);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ // Heap
+ test_name = "TxnRecoveredDequeueTest_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+ txn_commit(jcp, xid);
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ jcp->recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(jcp, m);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
void ComplexRecoveryTest1()
{
std::vector<std::string> txn_list;
@@ -516,7 +818,7 @@
jc.initialize();
// rids: 0 to NUM_MSGS*2 - 1
for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(&jc, create_msg(msg, m));
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
// rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
for (int m=0; m<NUM_MSGS; m++)
deq_msg(&jc, m);
@@ -525,7 +827,7 @@
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
}
@@ -536,26 +838,26 @@
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
jc.recover_complete();
// rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(&jc, create_msg(msg, m));
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
jc.flush();
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
read_msg(&jc);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
// rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
@@ -582,7 +884,7 @@
jcp->initialize();
// rids: 0 to NUM_MSGS*2 - 1
for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(jcp, create_msg(msg, m));
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
// rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
for (int m=0; m<NUM_MSGS; m++)
deq_msg(jcp, m);
@@ -591,7 +893,7 @@
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
delete jcp;
@@ -605,26 +907,26 @@
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
jcp->recover_complete();
// rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(jcp, create_msg(msg, m));
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
jcp->flush();
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
read_msg(jcp);
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
cleanMessage();
}
// rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
@@ -809,6 +1111,16 @@
dtp, false), jc, aio_sleep_cnt, dtp));
}
+ void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(), msg.size(), msg.size(),
+ dtp, xid, false), jc, aio_sleep_cnt, dtp));
+ }
+
void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
{
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
@@ -820,6 +1132,36 @@
while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt, dtp));
}
+ void deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const std::string xid)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(rid);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
+ jc, aio_sleep_cnt, dtp));
+ }
+
+ void txn_abort(rhm::journal::jcntl* jc, const std::string xid)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt, dtp));
+ }
+
+ void txn_commit(rhm::journal::jcntl* jc, const std::string xid)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt, dtp));
+ }
+
char* read_msg(rhm::journal::jcntl* jc)
{
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
@@ -849,34 +1191,47 @@
else
{
delete dtp;
- CPPUNIT_FAIL("dequeue_data(): timeout on RHM_IORES_AIO_WAIT.");
+ CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
}
break;
case rhm::journal::RHM_IORES_EMPTY:
delete dtp;
- CPPUNIT_FAIL("dequeue_data(): RHM_IORES_EMPTY.");
+ CPPUNIT_FAIL("RHM_IORES_EMPTY");
case rhm::journal::RHM_IORES_FULL:
delete dtp;
- CPPUNIT_FAIL("dequeue_data(): RHM_IORES_FULL.");
+ CPPUNIT_FAIL("RHM_IORES_FULL");
case rhm::journal::RHM_IORES_BUSY:
delete dtp;
- CPPUNIT_FAIL("dequeue_data(): RHM_IORES_BUSY.");
+ CPPUNIT_FAIL("RHM_IORES_BUSY");
+ case rhm::journal::RHM_IORES_TXPENDING:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_TXPENDING");
default:
delete dtp;
- CPPUNIT_FAIL("dequeue_data(): unknown return value.");
+ CPPUNIT_FAIL("unknown return value");
}
return true;
}
- static std::string& create_msg(std::string& s, int msg_num)
+ static std::string& create_msg(std::string& s, int msg_num, int len)
{
std::stringstream ss;
- ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num;
- ss << "_4567890123456789012345678901234567890";
- ss << "12345678901234567890123456789012345678901234567890"; // 100 chars long (2 dblks)
+ ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num << "_";
+ for (int i=14; i<=len; i++)
+ ss << (char)('0' + i%10);
s.assign(ss.str());
return s;
}
+
+ static std::string& create_xid(std::string& s, int msg_num, int len)
+ {
+ std::stringstream ss;
+ ss << "XID_" << std::setfill('0') << std::setw(4) << msg_num << "_";
+ for (int i=9; i<len; i++)
+ ss << (char)('a' + i%26);
+ s.assign(ss.str());
+ return s;
+ }
void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
const unsigned max_msg_szie, const bool auto_deq, const unsigned min_xid_size,
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-10-10 21:36:41 UTC (rev 1001)
@@ -30,8 +30,8 @@
NUM_JFILES=8
VG_ITERATIONS=1
-#VG_NORM_FILESIZE=11
-VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
+VG_NORM_FILESIZE=11
+#VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
# Write test
W_DO_TEST=T
@@ -58,8 +58,8 @@
RM_DIR="${RM} -rf"
TEST_PROG="./jtest"
CHK_PROG="./janalyze.py"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
-#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
MAKE="make -f Makefile.rtest"
More information about the rhmessaging-commits
mailing list