Author: kpvdr
Date: 2007-11-07 14:25:50 -0500 (Wed, 07 Nov 2007)
New Revision: 1262
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Bugfix: transient and external header flags were not being handled correctly. Also drastic
simplification to JournalSystemTests.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -931,13 +931,13 @@
rhm::journal::iores eres;
if (txn->getXid().empty()){
if (message.isContentReleased()){
- eres = jc->enqueue_extern_data_record(0, dtokp.get(), false);
+ eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
}else {
eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
}else {
if (message.isContentReleased()){
- eres = jc->enqueue_extern_txn_data_record(0, dtokp.get(), txn->getXid(),
false);
+ eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(),
false);
} else {
eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), false);
}
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -506,7 +506,10 @@
*datapp = NULL;
return 0;
}
- *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+ if (_enq_hdr.is_external())
+ *datapp = NULL;
+ else
+ *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
return _enq_hdr._dsize;
}
@@ -539,6 +542,8 @@
const size_t
enq_rec::rec_size() const
{
+ if (_enq_hdr.is_external())
+ return enq_hdr::size() + _enq_hdr._xidsize + rec_tail::size();
return enq_hdr::size() + _enq_hdr._xidsize + _enq_hdr._dsize + rec_tail::size();
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -176,6 +176,7 @@
_datafh[i]->reset(&_rcvdat);
_wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _rmgr.recover_complete();
_readonly_flag = false;
}
@@ -440,17 +441,22 @@
done = er.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
- rd._enq_cnt_list[fid]++;
- if (er.xid_size())
+//std::cout << " E";
+ if (!er.is_transient()) // Ignore transient msgs
{
- er.get_xid(&xidp);
- assert(xidp != NULL);
- std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
- free(xidp);
+ rd._enq_cnt_list[fid]++;
+ if (er.xid_size())
+ {
+ er.get_xid(&xidp);
+ assert(xidp != NULL);
+ std::string xid((char*)xidp, er.xid_size());
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+ free(xidp);
+ }
+ else
+ _emap.insert_fid(h._rid, fid);
}
- else
- _emap.insert_fid(h._rid, fid);
+//else std::cout << "t";
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
}
@@ -463,6 +469,7 @@
done = dr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
+//std::cout << " D";
if (dr.xid_size())
{
// If the enqueue is part of a pending txn, it will not yet be in
emap
@@ -503,6 +510,7 @@
done = ar.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
+//std::cout << " A";
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
assert(xidp != NULL);
@@ -536,6 +544,7 @@
done = cr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
+//std::cout << " C";
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
assert(xidp != NULL);
@@ -559,11 +568,13 @@
break;
case RHM_JDAT_EMPTY_MAGIC:
{
+//std::cout << " X";
u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
break;
case 0:
+//std::cout << " 0";
rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -139,7 +139,6 @@
_pg_offset_dblks = 0;
_aio_evt_rem = 0;
clean();
-// _emap.clear();
// 1. Allocate page memory (as a single block)
size_t pagesize = _pages * _pagesize * _sblksize;
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -215,7 +215,7 @@
rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t&
xidsize, bool& transient,
bool& external, data_tok* dtokp) throw (jexception)
{
-//std::cout << " rmgr::read() ro=" <<
(_jc->is_read_only()?"T":"F") << std::flush;
+//std::cout << " rmgr::read() ro=" <<
(_jc->is_read_only()?"T":"F") << " po=" <<
_pg_offset_dblks << " ems=" << _emap.size() << std::flush;
iores res = pre_read_check(dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -465,6 +465,14 @@
}
void
+rmgr::recover_complete()
+{
+ _pg_index = 0;
+ _pg_cntr = 0;
+ _pg_offset_dblks = 0;
+}
+
+void
rmgr::initialize() throw (jexception)
{
pmgr::initialize();
@@ -554,7 +562,10 @@
{
enq_hdr ehdr;
::memcpy(&ehdr, rptr, sizeof(enq_hdr));
- dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) +
sizeof(rec_tail));
+ if (ehdr.is_external())
+ dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+ else
+ dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) +
sizeof(rec_tail));
}
else if (h._magic == RHM_JDAT_DEQ_MAGIC)
{
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -74,6 +74,7 @@
const iores read(void** const datapp, size_t& dsize, void** const xidpp,
size_t& xidsize,
bool& transient, bool& external, data_tok* dtokp) throw
(jexception);
const u_int32_t get_events(page_state state = AIO_COMPLETE) throw (jexception);
+ void recover_complete();
private:
void initialize() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -96,6 +96,7 @@
data_tok* dtokp, const void* const xid_ptr, const size_t xid_len, const bool
transient,
const bool external) throw (jexception)
{
+//std::cout << "wmgr::enqueue() dl=" << tot_data_len <<
" xl=" << xid_len << " t=" <<
(transient?"T":"F") << " e=" <<
(external?"T":"F") << " msg=" <<
(data_buff?std::string((const char*)data_buff, tot_data_len):"<null>")
<< std::endl;
if (xid_len)
assert(xid_ptr != NULL);
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -52,13 +52,6 @@
wrfc::~wrfc() {}
-// void
-// wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index, u_int64_t rid)
throw (jexception)
-// {
-// rrfc::initialize(nfiles, fh_arr, fh_index);
-// _rid = rid;
-// _reset_ok = false;
-// }
void
wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp) throw (jexception)
{
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-07 17:19:21 UTC (rev 1261)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-07 19:25:50 UTC (rev 1262)
@@ -29,10 +29,9 @@
* The GNU Lesser General Public License is available in the file COPYING.
*/
-#include "../test_plugin.h"
-#include "msg_producer.hpp"
-#include "msg_consumer.hpp"
-#include "jtest.hpp"
+#include "JournalSystemTests.hpp"
+//#include "msg_producer.hpp"
+//#include "msg_consumer.hpp"
#include <vector>
#define NUM_MSGS 5
@@ -41,1419 +40,784 @@
#define MSG_SIZE 100
#define XID_SIZE 64
-class JournalSystemTests : public CppUnit::TestCase
+using namespace std;
+
+void
+JournalSystemTests::InstantiationTest()
{
- CPPUNIT_TEST_SUITE(JournalSystemTests);
- CPPUNIT_TEST(InstantiationTest);
- CPPUNIT_TEST(InitializationTest);
- CPPUNIT_TEST(EmptyRecoverTest);
- CPPUNIT_TEST(EnqueueTest);
- CPPUNIT_TEST(TxnEnqueueTest);
- CPPUNIT_TEST(RecoverReadTest);
- CPPUNIT_TEST(TxnRecoverReadTest);
- CPPUNIT_TEST(RecoveredReadTest);
- CPPUNIT_TEST(TxnRecoveredReadTest);
- CPPUNIT_TEST(RecoveredDequeueTest);
- CPPUNIT_TEST(TxnRecoveredDequeueTest);
- CPPUNIT_TEST(ComplexRecoveryTest1);
- CPPUNIT_TEST(TxnComplexRecoveryTest1);
- CPPUNIT_TEST(EncodeTest_000);
- CPPUNIT_TEST(EncodeTest_001);
- CPPUNIT_TEST(EncodeTest_002);
- CPPUNIT_TEST(EncodeTest_003);
- CPPUNIT_TEST(EncodeTest_004);
- CPPUNIT_TEST(EncodeTest_005);
- CPPUNIT_TEST(EncodeTest_006);
- CPPUNIT_TEST(EncodeTest_007);
- CPPUNIT_TEST(EncodeTest_008);
- CPPUNIT_TEST(EncodeTest_009);
- CPPUNIT_TEST(EncodeTest_010);
- CPPUNIT_TEST(EncodeTest_011);
- CPPUNIT_TEST(EncodeTest_012);
- CPPUNIT_TEST(EncodeTest_013);
- CPPUNIT_TEST(EncodeTest_014);
- CPPUNIT_TEST(EncodeTest_015);
- CPPUNIT_TEST(EncodeTest_016);
- CPPUNIT_TEST(EncodeTest_017);
- CPPUNIT_TEST(EncodeTest_018);
- CPPUNIT_TEST(EncodeTest_019);
- CPPUNIT_TEST(EncodeTest_020);
- CPPUNIT_TEST(EncodeTest_021);
- CPPUNIT_TEST(EncodeTest_022);
- CPPUNIT_TEST(EncodeTest_023);
- CPPUNIT_TEST(EncodeTest_024);
- CPPUNIT_TEST(EncodeTest_025);
- CPPUNIT_TEST(EncodeTest_026);
-// CPPUNIT_TEST(EncodeTest_027); // Until race condition fixed
-// CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
- CPPUNIT_TEST_SUITE_END();
+ try
+ {
+ char* test_name = "InstantiationTest";
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ }
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- jtest t;
- std::string msg;
- std::string xid;
- void* mbuff;
- size_t msize;
- void* xidbuff;
- size_t xidsize;
- bool transientFlag;
- bool externalFlag;
-
-public:
-
- void InstantiationTest()
+void
+JournalSystemTests::InitializationTest()
+{
+ try
{
- //Stack
- char* test_name = "InstantiationTest_Stack";
- try
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "InstantiationTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp !=
NULL);
- delete jcp;
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
+ char* test_name = "InitializationTest";
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void InitializationTest()
+void
+JournalSystemTests::EmptyRecoverTest()
+{
+ try
{
- //Stack
- char* test_name = "InitializationTest_Stack";
- try
+ vector<string> txn_list;
+ char* test_name = "EmptyRecoverTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
}
- // Heap
- test_name = "InitializationTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp !=
NULL);
- jcp->initialize();
- delete jcp;
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ jc.recover_complete();
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void EmptyRecoverTest()
+void
+JournalSystemTests::EnqueueTest()
+{
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "EmptyRecoverTest_Stack";
- try
- {
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- jc.recover_complete();
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "EmptyRecoverTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- jcp->recover_complete();
- delete jcp;
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
+ char* test_name = "EnqueueTest";
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+
+ // Non-txn
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+
+ // Txn
+ create_xid(xid, 0, XID_SIZE);
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ txn_commit(&jc, xid);
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void EnqueueTest()
+void
+JournalSystemTests::RecoverReadTest()
+{
+ vector<string> txn_list;
+ try
{
- //Stack
- char* test_name = "EnqueueTest_Stack";
- try
+ // Non-txn
+ char* test_name = "RecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "EnqueueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp !=
NULL);
- jcp->initialize();
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
+ }
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
- void TxnEnqueueTest()
- {
- //Stack
- char* test_name = "TxnEnqueueTest_Stack";
- try
+ // Txn
+ test_name = "TxnRecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
- create_xid(xid, 0, XID_SIZE);
+ create_xid(xid, 1, XID_SIZE);
+ txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
txn_commit(&jc, xid);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnEnqueueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp !=
NULL);
- jcp->initialize();
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
+ {
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
+ }
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void RecoverReadTest()
+void
+JournalSystemTests::RecoveredReadTest()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "RecoverReadTest_Stack";
- try
+ // Non-txn
+ char* test_name = "RecoveredReadTest";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff,
msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
}
- catch (const rhm::journal::jexception& e)
+
+ // Txn
+ test_name = "TxnRecoveredReadTest";
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 2, XID_SIZE);
+ txn_list.push_back(xid);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ txn_commit(&jc, xid);
}
- // Heap
- test_name = "RecoverReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- std::string msg((char*)mbuff, msize);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void TxnRecoverReadTest()
+void
+JournalSystemTests::RecoveredDequeueTest()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnRecoverReadTest_Stack";
- try
+ // Non-txn
+ char* test_name = "RecoveredDequeueTest";
{
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- create_xid(xid, 1, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(&jc, xid);
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff,
msize)) == 0);
- cleanMessage();
- }
- }
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnRecoverReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- std::string msg((char*)mbuff, msize);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
- void RecoveredReadTest()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "RecoveredReadTest_Stack";
- try
+ // Txn
+ test_name = "TxnRecoveredDequeueTest";
{
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- }
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- }
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 3, XID_SIZE);
+ txn_list.push_back(xid);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ txn_commit(&jc, xid);
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "RecoveredReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ for (int m=0; m<NUM_MSGS; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void TxnRecoveredReadTest()
+void
+JournalSystemTests::HeaderFlagsTest()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnRecoveredReadTest_Stack";
- try
+ // Non-txn
+ char* test_name = "FlagsRecoverdTest";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ // Transient msgs - should not recover
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), true);
+ // Persistent msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ // Transient extern msgs - should not recover
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ enq_extern_msg(&jc, true);
+ // Persistnet extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_extern_msg(&jc, false);
+ }
+ {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ // Recover non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- create_xid(xid, 2, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(&jc, xid);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during
recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ // Recover non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null
pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnRecoveredReadTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ jc.recover_complete();
+ // Read recovered non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during
recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ // Read recovered non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null
pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
+ // Dequeue recovered messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
+
+ // Txn
+ test_name = "TxnFlagsRecoverdTest";
{
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+ create_xid(xid, 4, XID_SIZE);
+ txn_list.push_back(xid);
+ // Transient msgs - should not recover
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, true);
+ // Persistent msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ // Transient extern msgs - should not recover
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ enq_extern_txn_msg(&jc, xid, true);
+ // Persistnet extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ enq_extern_txn_msg(&jc, xid, false);
+ txn_commit(&jc, xid);
}
- }
-
- void RecoveredDequeueTest()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "RecoveredDequeueTest_Stack";
- try
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+ // Recover non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during
recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ // Recover non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null
pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
- }
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "RecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ jc.recover_complete();
+ // Read recovered non-transient msgs
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== false);
+ CPPUNIT_ASSERT_MESSAGE("Non-transient message corrupt during
recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ // Read recovered non-transient extern msgs
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Transient message recovered.",
transientFlag == false);
+ CPPUNIT_ASSERT_MESSAGE("External flag incorrect.", externalFlag
== true);
+ CPPUNIT_ASSERT_MESSAGE("External message returned non-null
pointer.",
+ mbuff == NULL);
+ cleanMessage();
}
+ // Dequeue recovered messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
+ catch (const rhm::journal::jexception& e)
+ {
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
+ }
+}
- void TxnRecoveredDequeueTest()
+void
+JournalSystemTests::ComplexRecoveryTest1()
+{
+ vector<string> txn_list;
+ try
{
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnRecoveredDequeueTest_Stack";
- try
+ // Non-txn
+ char* test_name = "ComplexRecoveryTest1";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+
+ // Enqueue 2n, then dequeue first n msgs; check that only last n readable
+ // rids: 0 to NUM_MSGS*2 - 1
+ for (int m=0; m<NUM_MSGS*2; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- create_xid(xid, 3, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(&jc, xid);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- }
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnRecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+
+ // Check that only last n readable (as before)
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jcp, xid);
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ jc.recover_complete();
+
+ // Enqueue another n msgs
+ // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+
+ // Check that 2n messages are now readable
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
{
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- for (int m=0; m<NUM_MSGS; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- delete jcp;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+
+ // Dequeue all remaining messages
+ // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
- void ComplexRecoveryTest1()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "ComplexRecoveryTest1_Stack";
- try
+ // Txn
+ test_name = "TxnComplexRecoveryTest1";
{
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.initialize();
+
+ // Enqueue 2n, then dequeue first n msgs; check that only last n readable
+ // rids: 0 to NUM_MSGS - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ // rids: NUM_MSGS to NUM_MSGS*2 - 1
+ create_xid(xid, 5, XID_SIZE);
+ txn_list.push_back(xid);
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
+ // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(&jc, m);
+ // rid: NUM_MSGS*3
+ txn_commit(&jc, xid);
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
- {
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(&jc, m);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- deq_msg(&jc, m);
- }
}
- catch (const rhm::journal::jexception& e)
{
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "RecoveredDequeueTest_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
+ rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ jc.recover(txn_list);
+
+ // Check that only last n readable (as before)
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS*2; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
- jcp = NULL;
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(jcp, m);
- for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
- deq_msg(jcp, m);
- delete jcp;
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- }
+ jc.recover_complete();
- void TxnComplexRecoveryTest1()
- {
- std::vector<std::string> txn_list;
- //Stack
- char* test_name = "TxnComplexRecoveryTest1_Stack";
- try
- {
+ // Enqueue another n msgs
+ // rids: NUM_MSGS*3+1 to NUM_MSGS*4
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+
+ // Check that 2n messages are now readable
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.initialize();
- // rids: 0 to NUM_MSGS - 1
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- // rids: NUM_MSGS to NUM_MSGS*2 - 1
- create_xid(xid, 4, XID_SIZE);
- txn_list.push_back(xid);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid, false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(&jc, m);
- // rid: NUM_MSGS*3
- txn_commit(&jc, xid);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jc.recover_complete();
- // rids: NUM_MSGS*3+1 to NUM_MSGS*4
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
- jc.flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- {
- read_msg(&jc);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4+1 to NUM_MSGS*6
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(&jc, m);
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- deq_msg(&jc, m);
+ read_msg(&jc);
+ CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+ create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize))
== 0);
+ cleanMessage();
}
+
+ // Dequeue all remaining messages
+ // rids: NUM_MSGS*4+1 to NUM_MSGS*6
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ deq_msg(&jc, m);
+ for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+ deq_msg(&jc, m);
}
- catch (const rhm::journal::jexception& e)
- {
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
- // Heap
- test_name = "TxnComplexRecoveryTest1_Heap";
- rhm::journal::jcntl* jcp = NULL;
- try
- {
- {
- jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->initialize();
- // rids: 0 to NUM_MSGS*2 - 1
- for (int m=0; m<NUM_MSGS; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- create_xid(xid, 4, XID_SIZE);
- // rids: NUM_MSGS to NUM_MSGS*2 - 1
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid, false);
- // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jcp, m);
- // rid: NUM_MSGS*3
- txn_commit(jcp, xid);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- delete jcp;
- jcp = NULL;
- }
- {
- rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
- CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
- jcp->recover(txn_list);
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- jcp->recover_complete();
- // rids: NUM_MSGS*3+1 to NUM_MSGS*4
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- enq_msg(jcp, create_msg(msg, m, MSG_SIZE), false);
- jcp->flush();
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- {
- read_msg(jcp);
- CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
- create_msg(msg, m,
MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
- cleanMessage();
- }
- // rids: NUM_MSGS*4+1 to NUM_MSGS*6
- for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
- deq_msg(jcp, m);
- for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
- deq_msg(jcp, m);
- delete jcp;
- }
- }
- catch (const rhm::journal::jexception& e)
- {
- if (jcp)
- delete jcp;
- std::stringstream ss;
- ss << e;
- CPPUNIT_FAIL(ss.str());
- }
}
-
- void EncodeTest_000()
+ catch (const rhm::journal::jexception& e)
{
- runEncodeTest(0, 0, 0, false, 0, 0, false, false, 2, "Empty journal");
+ stringstream ss;
+ ss << e;
+ CPPUNIT_FAIL(ss.str());
}
+}
- void EncodeTest_001()
- {
- runEncodeTest(1, 10, 10, false, 0, 0, false, false, 2, "1*(10
bytes)");
- }
+// === Private helper functions ===
- void EncodeTest_002()
- {
- runEncodeTest(1, 10, 10, true, 0, 0, false, false, 2, "1*(10 bytes),
auto-deq");
- }
+void
+JournalSystemTests::enq_msg(rhm::journal::jcntl* jc, const string msg, const bool
transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- void EncodeTest_003()
- {
- runEncodeTest(10, 10, 10, false, 0, 0, false, false, 2, "10*(10
bytes)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(),
msg.size(),
+ dtp, transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_004()
- {
- runEncodeTest(10, 10, 10, true, 0, 0, false, false, 2, "10*(10 bytes),
auto-deq");
- }
+void
+JournalSystemTests::enq_extern_msg(rhm::journal::jcntl* jc, const bool transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- void EncodeTest_005()
- {
- runEncodeTest(10, 92, 92, false, 0, 0, false, false, 2, "10*(1 dblk exact
fit)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_extern_data_record(msg.size(),
+ dtp, transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_006()
- {
- runEncodeTest(10, 92, 92, true, 0, 0, false, false, 2, "10*(1 dblk exact
fit), auto-deq");
- }
+void
+JournalSystemTests::enq_txn_msg(rhm::journal::jcntl* jc, const string msg,
+ const string xid, const bool transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- void EncodeTest_007()
- {
- runEncodeTest(10, 93, 93, false, 0, 0, false, false, 2, "10*(1 dblk + 1
byte)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(), msg.size(),
+ msg.size(), dtp, xid, transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_008()
- {
- runEncodeTest(10, 93, 93, true, 0, 0, false, false, 2, "10*(1 dblk + 1
byte), auto-deq");
- }
+void
+JournalSystemTests::enq_extern_txn_msg(rhm::journal::jcntl* jc, const string xid,
+ const bool transient)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- void EncodeTest_009()
- {
- runEncodeTest(10, 476, 476, false, 0, 0, false, false, 2, "10*(1 sblk exact
fit)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->enqueue_extern_txn_data_record(msg.size(), dtp,
xid,
+ transient), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_010()
- {
- runEncodeTest(10, 476, 476, true, 0, 0, false, false, 2, "10*(1 sblk exact
fit), auto-deq");
- }
+void
+JournalSystemTests::deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(rid);
- void EncodeTest_011()
- {
- runEncodeTest(10, 477, 477, false, 0, 0, false, false, 2, "10*(1 sblk + 1
byte)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt,
dtp));
+}
- void EncodeTest_012()
- {
- runEncodeTest(10, 477, 477, true, 0, 0, false, false, 2, "10*(1 sblk + 1
byte), auto-deq");
- }
+void
+JournalSystemTests::deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const string
xid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(rid);
- void EncodeTest_013()
- {
- runEncodeTest(8, 4060, 4060, false, 0, 0, false, false, 2, "8*(1/8
page)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
+ jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_014()
- {
- runEncodeTest(9, 4060, 4060, false, 0, 0, false, false, 2, "9*(1/8
page)");
- }
+void
+JournalSystemTests::txn_abort(rhm::journal::jcntl* jc, const string xid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- void EncodeTest_015()
- {
- runEncodeTest(8, 4061, 4061, false, 0, 0, false, false, 2, "8*(1/8 page + 1
byte)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_016()
- {
- runEncodeTest(8, 3932, 3932, true, 0, 0, false, false, 2,
- "8*(1/8 page - 1 dblk for deq record), auto-deq");
- }
+void
+JournalSystemTests::txn_commit(rhm::journal::jcntl* jc, const string xid)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- void EncodeTest_017()
- {
- runEncodeTest(9, 3932, 3932, true, 0, 0, false, false, 2,
- "9*(1/8 page - 1 dblk for deq record), auto-deq");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt, dtp));
+}
- void EncodeTest_018()
- {
- runEncodeTest(8, 3933, 3933, true, 0, 0, false, false, 2,
- "8*(1/8 page - 1 dblk for deq record + 1 byte), auto-deq");
- }
+char*
+JournalSystemTests::read_msg(rhm::journal::jcntl* jc)
+{
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
- void EncodeTest_019()
- {
- runEncodeTest(32, 32732, 32732, false, 0, 0, false, false, 2, "32*(1 page
exact fit)");
- }
+ unsigned aio_sleep_cnt = 0;
+ while (handle_jcntl_response(jc->read_data_record(&mbuff, msize, &xidbuff,
xidsize,
+ transientFlag, externalFlag, dtp), jc, aio_sleep_cnt, dtp));
+ return (char*)mbuff;
+}
- void EncodeTest_020()
+bool
+JournalSystemTests::handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl*
jc,
+ unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
+{
+ switch (res)
{
- runEncodeTest(33, 32732, 32732, false, 0, 0, false, false, 2, "33*(1 page
exact fit)");
+ case rhm::journal::RHM_IORES_SUCCESS:
+ //((char*)mbuff)[msize] = '\0';
+ return false;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+ {
+ jc->get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ }
+ else
+ {
+ delete dtp;
+ CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
+ }
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_EMPTY");
+ case rhm::journal::RHM_IORES_FULL:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_FULL");
+ case rhm::journal::RHM_IORES_BUSY:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_BUSY");
+ case rhm::journal::RHM_IORES_TXPENDING:
+ delete dtp;
+ CPPUNIT_FAIL("RHM_IORES_TXPENDING");
+ default:
+ delete dtp;
+ CPPUNIT_FAIL("unknown return value");
}
+ return true;
+}
- void EncodeTest_021()
- {
- runEncodeTest(22, 49116, 49116, false, 0, 0, false, false, 2, "22*(1.5
pages)");
- }
+// static fn
+string&
+JournalSystemTests::create_msg(string& s, int msg_num, int len)
+{
+ stringstream ss;
+ ss << "Message_" << setfill('0') << setw(4)
<< msg_num << "_";
+ for (int i=14; i<=len; i++)
+ ss << (char)('0' + i%10);
+ s.assign(ss.str());
+ return s;
+}
- void EncodeTest_022()
- {
- runEncodeTest(22, 48988, 48988, true, 0, 0, false, false, 2,
- "22*(1.5 pages - 1 dblk for deq record), auto-deq");
- }
+// static fn
+string&
+JournalSystemTests::create_xid(string& s, int msg_num, int len)
+{
+ stringstream ss;
+ ss << "XID_" << setfill('0') << setw(4) <<
msg_num << "_";
+ for (int i=9; i<len; i++)
+ ss << (char)('a' + i%26);
+ s.assign(ss.str());
+ return s;
+}
- void EncodeTest_023()
+void JournalSystemTests::cleanMessage()
+{
+ if (xidbuff)
{
- runEncodeTest(48, 32732, 32732, false, 0, 0, false, false, 2, "48*(1 page
exact fit)");
+ free(xidbuff);
+ xidbuff = NULL;
+ mbuff = NULL;
}
-
- void EncodeTest_024()
+ else if (mbuff)
{
- runEncodeTest(49, 32732, 32732, false, 0, 0, false, false, 2, "49*(1 page
exact fit)");
+ free (mbuff);
+ mbuff = NULL;
}
-
- void EncodeTest_025()
- {
- runEncodeTest(20, 81884, 81884, false, 0, 0, false, false, 2, "20*(2.5
pages)");
- }
-
- void EncodeTest_026()
- {
- runEncodeTest(20, 81756, 81756, true, 0, 0, false, false, 2,
- "20*(2.5 pages - 1 dblk for deq record), auto-deq");
- }
-
- void EncodeTest_027()
- {
- runEncodeTest(16, 786268, 786268, true, 0, 0, false, false, 2,
- "16*(24 pages = 1/2 file); Total = 8 files exactly (full journal
filespace)");
- }
-
- void EncodeTest_028()
- {
- runEncodeTest(17, 786268, 786268, true, 0, 0, false, false, 2,
- "17*(24 pages = 1/2 file); Total = 8 files + file 0 overwritten by
1/2 file");
- }
-
-private:
-
- void enq_msg(rhm::journal::jcntl* jc, const std::string msg, const bool transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(),
msg.size(),
- dtp, transient), jc, aio_sleep_cnt, dtp));
- }
-
- void enq_extern_msg(rhm::journal::jcntl* jc, const bool transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_extern_data_record(msg.size(),
- dtp, transient), jc, aio_sleep_cnt, dtp));
- }
-
- void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string
xid,
- const bool transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(),
msg.size(),
- msg.size(), dtp, xid, transient), jc, aio_sleep_cnt, dtp));
- }
-
- void enq_extern_txn_msg(rhm::journal::jcntl* jc, const std::string xid, const bool
transient)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_extern_txn_data_record(msg.size(),
dtp, xid,
- transient), jc, aio_sleep_cnt, dtp));
- }
-
- void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
- dtp->set_rid(rid);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt,
dtp));
- }
-
- void deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const std::string xid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
- dtp->set_rid(rid);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
- jc, aio_sleep_cnt, dtp));
- }
-
- void txn_abort(rhm::journal::jcntl* jc, const std::string xid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt,
dtp));
- }
-
- void txn_commit(rhm::journal::jcntl* jc, const std::string xid)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt,
dtp));
- }
-
- char* read_msg(rhm::journal::jcntl* jc)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
- dtp->set_wstate(rhm::journal::data_tok::ENQ);
-
- unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->read_data_record(&mbuff, msize,
&xidbuff, xidsize,
- transientFlag, externalFlag, dtp), jc, aio_sleep_cnt, dtp));
- return (char*)mbuff;
- }
-
- bool handle_jcntl_response(rhm::journal::iores res, rhm::journal::jcntl* jc,
- unsigned& aio_sleep_cnt, rhm::journal::data_tok* dtp)
- {
- switch (res)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- //((char*)mbuff)[msize] = '\0';
- return false;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
- {
- jc->get_wr_events();
- usleep(AIO_SLEEP_TIME);
- }
- else
- {
- delete dtp;
- CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
- }
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_EMPTY");
- case rhm::journal::RHM_IORES_FULL:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_FULL");
- case rhm::journal::RHM_IORES_BUSY:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_BUSY");
- case rhm::journal::RHM_IORES_TXPENDING:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_TXPENDING");
- default:
- delete dtp;
- CPPUNIT_FAIL("unknown return value");
- }
- return true;
- }
-
- static std::string& create_msg(std::string& s, int msg_num, int len)
- {
- std::stringstream ss;
- ss << "Message_" << std::setfill('0') <<
std::setw(4) << msg_num << "_";
- for (int i=14; i<=len; i++)
- ss << (char)('0' + i%10);
- s.assign(ss.str());
- return s;
- }
-
- static std::string& create_xid(std::string& s, int msg_num, int len)
- {
- std::stringstream ss;
- ss << "XID_" << std::setfill('0') <<
std::setw(4) << msg_num << "_";
- for (int i=9; i<len; i++)
- ss << (char)('a' + i%26);
- s.assign(ss.str());
- return s;
- }
-
- void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
- const unsigned max_msg_szie, const bool auto_deq, const unsigned
min_xid_size,
- const unsigned max_xid_size, const bool transient, const bool external,
- const unsigned iterations, char* test_descr)
- {
- std::cout << " [" << test_descr << "] "
<< std::flush;
- jtest::targs ta(num_msgs, min_msg_size, max_msg_szie, auto_deq, min_xid_size,
- max_xid_size, transient, external, test_descr);
- for (unsigned i=0; i<iterations; i++)
- {
- std::cout << "." << std::flush;
- try
- {
- t.initialize(ta);
- t.run();
- }
- catch (rhm::journal::jexception e)
- {
- t.finalize();
- CPPUNIT_FAIL(e.to_string());
- }
- t.finalize();
- }
- }
-
- void cleanMessage()
- {
- if (xidbuff)
- {
- free(xidbuff);
- xidbuff = NULL;
- mbuff = NULL;
- }
- else if (mbuff)
- {
- free (mbuff);
- mbuff = NULL;
- }
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(JournalSystemTests);
+}