[rhmessaging-commits] rhmessaging commits: r1002 - in store/trunk/cpp: tests/jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Oct 10 22:41:16 EDT 2007
Author: kpvdr
Date: 2007-10-10 22:41:16 -0400 (Wed, 10 Oct 2007)
New Revision: 1002
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Fixed bug where highest rid was not correctly identified after restore; added check for prepared xid list during resore. This more-or-less completes the transaction code for the journal. There still seems to be some memory leaks around, though, and transaction integration testing is still not complete.
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-10 21:36:41 UTC (rev 1001)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-11 02:41:16 UTC (rev 1002)
@@ -33,6 +33,7 @@
#include <jrnl/jcntl.hpp>
+#include <algorithm>
#include <cerrno>
#include <fstream>
#include <iomanip>
@@ -372,7 +373,7 @@
const bool
jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
- const std::vector<std::string>& /*prep_txn_list*/) throw (jexception)
+ const std::vector<std::string>& prep_txn_list) throw (jexception)
{
u_int32_t dblks_read = 0;
bool done = false;
@@ -391,22 +392,23 @@
done = er.rcv_decode(h, ifsp, dblks_read);
jfile_cycle(fid, ifsp, rd, false);
}
+ rd._enq_cnt_list[fid]++;
if (er.xid_size())
{
//std::cout << "$" << std::flush;
er.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+ std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), xid);
+ if (cit != prep_txn_list.end())
+ _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
free(xidp);
}
else
- {
_emap.insert_fid(h._rid, fid);
- rd._enq_cnt_list[fid]++;
- if (rd._h_rid < h._rid)
- rd._h_rid = h._rid;
- }
+ if (rd._h_rid < h._rid)
+ rd._h_rid = h._rid;
}
break;
case RHM_JDAT_DEQ_MAGIC:
@@ -427,7 +429,10 @@
dr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
+ std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), xid);
+ if (cit != prep_txn_list.end())
+ _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
free(xidp);
}
else
@@ -442,9 +447,9 @@
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e;
}
- if (rd._h_rid < h._rid)
- rd._h_rid = h._rid;
}
+ if (rd._h_rid < h._rid)
+ rd._h_rid = h._rid;
}
break;
case RHM_JDAT_TXA_MAGIC:
@@ -460,15 +465,26 @@
ar.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), xid);
+ if (cit != prep_txn_list.end())
{
- try { _emap.unlock(itr->_rid); }
- catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
- if (itr->_enq_flag)
- _wrfc.decr_enqcnt(itr->_fid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ try { _emap.unlock(itr->_rid); }
+ catch(jexception e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw e;
+ }
+ if (itr->_enq_flag)
+ _wrfc.decr_enqcnt(itr->_fid);
+ }
}
free(xidp);
+ if (rd._h_rid < h._rid)
+ rd._h_rid = h._rid;
}
break;
case RHM_JDAT_TXC_MAGIC:
@@ -484,18 +500,25 @@
cr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+ prep_txn_list.end(), xid);
+ if (cit != prep_txn_list.end())
{
- if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
- else // txn dequeue
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- u_int16_t fid = _emap.get_remove_fid(h._rid);
- _wrfc.decr_enqcnt(fid);
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ u_int16_t fid = _emap.get_remove_fid(h._rid);
+ _wrfc.decr_enqcnt(fid);
+ }
}
}
free(xidp);
+ if (rd._h_rid < h._rid)
+ rd._h_rid = h._rid;
}
break;
case RHM_JDAT_EMPTY_MAGIC:
@@ -565,131 +588,6 @@
return true;
}
-// const bool
-// jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const std::vector<std::string>& /*prep_txn_list*/)
-// throw (jexception)
-// {
-// bool eoj = false;
-// std::stringstream ss;
-// ss << _jdir.dirname() << "/" << _base_filename << ".";
-// ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
-// //std::cout << "rcvr_fanalyze: " << ss.str() << ":";
-// std::ifstream jifs(ss.str().c_str());
-// if (!jifs.good())
-// throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf", "analyze");
-//
-// // 1. Read file header
-// file_hdr fhdr;
-// jifs.read((char*)&fhdr, sizeof(fhdr));
-// if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
-// {
-// assert(fhdr._fid == fid);
-// if (!rd._fro)
-// rd._fro = fhdr._fro;
-// std::streamoff foffs = fhdr._fro;
-// jifs.seekg(foffs);
-//
-// // 2. Read file records
-// while (jifs.good() && !eoj)
-// {
-// hdr h;
-// jifs.read((char*)&h, sizeof(hdr));
-// switch(h._magic)
-// {
-// case RHM_JDAT_ENQ_MAGIC:
-// {
-// size_t xidsize = 0;
-// size_t recsize = 0;
-// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-// jifs.ignore(sizeof(u_int32_t));
-// #endif
-// jifs.read((char*)&xidsize, sizeof(size_t));
-// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-// jifs.ignore(sizeof(u_int32_t));
-// #endif
-// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-// jifs.ignore(sizeof(u_int32_t));
-// #endif
-// jifs.read((char*)&recsize, sizeof(size_t));
-// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-// jifs.ignore(sizeof(u_int32_t));
-// #endif
-// _emap.insert_fid(h._rid, fid);
-// rd._enq_cnt_list[fid]++;
-// if (rd._h_rid < h._rid)
-// rd._h_rid = h._rid;
-// //std::cout << " e" << h._rid;
-// u_int32_t rec_dblks = jrec::size_dblks((size_t)recsize + sizeof(enq_hdr) +
-// sizeof(rec_tail));
-// foffs += rec_dblks * JRNL_DBLK_SIZE;
-// jifs.seekg(foffs);
-// }
-// break;
-// case RHM_JDAT_DEQ_MAGIC:
-// {
-// u_int64_t drid = 0;
-// size_t xidsize = 0;
-// jifs.read((char*)&drid, sizeof(u_int64_t));
-// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-// jifs.ignore(sizeof(u_int32_t));
-// #endif
-// jifs.read((char*)&xidsize, sizeof(size_t));
-// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-// jifs.ignore(sizeof(u_int32_t));
-// #endif
-// try
-// {
-// _emap.get_remove_fid(drid);
-// rd._enq_cnt_list[fid]--;
-// }
-// catch (jexception& e) {} // ignore JERR_EMAP_NOTFOUND thrown here
-// if (rd._h_rid < h._rid)
-// rd._h_rid = h._rid;
-// //std::cout << " d" << drid << ")";
-// u_int32_t rec_dblks = jrec::size_dblks(sizeof(deq_hdr));
-// foffs += rec_dblks * JRNL_DBLK_SIZE;
-// jifs.seekg(foffs);
-// }
-// break;
-// case RHM_JDAT_TXA_MAGIC:
-// //std::cout << " a";
-// break;
-// case RHM_JDAT_TXC_MAGIC:
-// //std::cout << " c";
-// break;
-// case RHM_JDAT_EMPTY_MAGIC:
-// {
-// //std::cout << " x";
-// u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
-// foffs += rec_dblks * JRNL_DBLK_SIZE;
-// jifs.seekg(foffs);
-// }
-// break;
-// case 0:
-// rd._lfid = fid;
-// rd._eo = foffs;
-// if (!jifs.eof())
-// eoj = true;
-// //std::cout << (jifs.eof()?" <eof>":" <end>");
-// break;
-// default:
-// std::stringstream ss;
-// ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-// throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
-// "rcvr_fanalyze");
-// }
-// }
-// }
-// else
-// {
-// eoj = true;
-// //std::cout << " <empty>";
-// }
-// jifs.close();
-// //std::cout << std::endl;
-// return eoj;
-// }
-
void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-10 21:36:41 UTC (rev 1001)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-11 02:41:16 UTC (rev 1002)
@@ -42,10 +42,6 @@
struct rcvdat
{
- typedef std::vector<u_int64_t> rid_list;
- typedef std::pair<rid_list, rid_list> enq_deq_rid_list;
- typedef std::map<std::string, enq_deq_rid_list> enq_deq_map;
-
bool _empty; ///< Journal data files empty
u_int16_t _ffid; ///< First file id
size_t _fro; ///< First record offset in ffid
@@ -53,7 +49,6 @@
size_t _eo; ///< End offset (first byte past last record)
u_int64_t _h_rid; ///< Highest rid found
std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
- enq_deq_map _edm; ///< Map of enqueue and dequeue rids for each xid
rcvdat():
_empty(true),
@@ -62,8 +57,7 @@
_lfid(0),
_eo(0),
_h_rid(0),
- _enq_cnt_list(JRNL_NUM_FILES, 0),
- _edm()
+ _enq_cnt_list(JRNL_NUM_FILES, 0)
{}
void reset()
{
@@ -75,8 +69,18 @@
_h_rid=0;
for (unsigned f=0; f<_enq_cnt_list.size(); f++)
_enq_cnt_list[f] = 0;
- _edm.clear();
}
+ void print()
+ {
+ std::cout << "_empty=" << (_empty?"T":"F") << std::endl;
+ std::cout << "_ffid=" << _ffid << std::endl;
+ std::cout << "_fro=" << _fro << std::endl;
+ std::cout << "_lfid=" << _lfid << std::endl;
+ std::cout << "_eo=" << _eo << std::endl;
+ std::cout << "_h_rid=" << _h_rid << std::endl;
+ for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+ std::cout << "_enq_cnt_list[" << i << "]=" << _enq_cnt_list[i] << std::endl;
+ }
};
}
}
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-10 21:36:41 UTC (rev 1001)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-11 02:41:16 UTC (rev 1002)
@@ -54,8 +54,9 @@
CPPUNIT_TEST(RecoveredReadTest);
CPPUNIT_TEST(TxnRecoveredReadTest);
CPPUNIT_TEST(RecoveredDequeueTest);
-// CPPUNIT_TEST(TxnRecoveredDequeueTest);
+ CPPUNIT_TEST(TxnRecoveredDequeueTest);
CPPUNIT_TEST(ComplexRecoveryTest1);
+ CPPUNIT_TEST(TxnComplexRecoveryTest1);
CPPUNIT_TEST(EncodeTest_000);
CPPUNIT_TEST(EncodeTest_001);
CPPUNIT_TEST(EncodeTest_002);
@@ -390,6 +391,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
create_xid(xid, 1, XID_SIZE);
+ txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
txn_commit(&jc, xid);
@@ -548,6 +550,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
create_xid(xid, 2, XID_SIZE);
+ txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
txn_commit(&jc, xid);
@@ -725,6 +728,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
create_xid(xid, 3, XID_SIZE);
+ txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
txn_commit(&jc, xid);
@@ -947,6 +951,160 @@
}
}
+ void TxnComplexRecoveryTest1()
+ {
+ std::vector<std::string> txn_list;
+ //Stack
+ char* test_name = "TxnComplexRecoveryTest1_Stack";
+ try
+ {
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ // rids: 0 to NUM_MSGS - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
+ // rids: NUM_MSGS to NUM_MSGS*2 - 1
+ create_xid(xid, 4, XID_SIZE);
+ txn_list.push_back(xid);
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ // rid: NUM_MSGS*3
+ txn_commit(&jc, xid);
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ jc.recover_complete();
+ // rids: NUM_MSGS*3+1 to NUM_MSGS*4
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ // rids: NUM_MSGS*4+1 to NUM_MSGS*6
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ deq_msg(&jc, m);
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ // Heap
+ test_name = "TxnComplexRecoveryTest1_Heap";
+ rhm::journal::jcntl* jcp = NULL;
+ try
+ {
+ {
+ jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->initialize();
+ // rids: 0 to NUM_MSGS*2 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
+ create_xid(xid, 4, XID_SIZE);
+ // rids: NUM_MSGS to NUM_MSGS*2 - 1
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(jcp, m);
+ // rid: NUM_MSGS*3
+ txn_commit(jcp, xid);
+ jcp->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ delete jcp;
+ jcp = NULL;
+ }
+ {
+ rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+ CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+ jcp->recover(txn_list);
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ jcp->recover_complete();
+ // rids: NUM_MSGS*3+1 to NUM_MSGS*4
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
+ jcp->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ {
+ read_msg(jcp);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+ cleanMessage();
+ }
+ // rids: NUM_MSGS*4+1 to NUM_MSGS*6
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(jcp, m);
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ deq_msg(jcp, m);
+ delete jcp;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ if (jcp)
+ delete jcp;
+ std::stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+ }
+
void EncodeTest_000()
{
runEncodeTest(0, 0, 0, false, 0, 0, false, 2, "Empty journal");
More information about the rhmessaging-commits
mailing list