[rhmessaging-commits] rhmessaging commits: r1765 - in store/trunk/cpp: tests/jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Mar 6 16:14:08 EST 2008
Author: kpvdr
Date: 2008-03-06 16:14:07 -0500 (Thu, 06 Mar 2008)
New Revision: 1765
Modified:
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/tests/jrnl/_st_read_txn.cpp
Log:
Bugfix for transactional reads; aslo some new transactional read tests
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-03-06 18:25:07 UTC (rev 1764)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-03-06 21:14:07 UTC (rev 1765)
@@ -274,50 +274,37 @@
bool is_enq = false;
try
{
- fid = _emap.get_fid(_hdr._rid);
+ fid = _emap.get_fid(_hdr._rid); // If locked, will throw JERR_MAP_LOCKED
is_enq = true;
}
catch (const jexception& e)
{
// Block read for transactionally locked record (only when not recovering)
if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
- throw;
-
- // (Recover mode only) Ok, not in emap - now search tmap, if present then read
- if (_jc->is_read_only())
- {
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin();
- itr != xid_list.end() && !is_enq; itr++)
- {
- txn_data_list tx_list = _tmap.get_tdata_list(*itr);
- for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
- ditr++)
- {
- if (ditr->_enq_flag)
- is_enq = ditr->_rid == _hdr._rid;
- else
- is_enq = ditr->_drid == _hdr._rid;
- }
- }
+ return RHM_IORES_TXPENDING;
+
+ // (Recover mode only) Ok, not in emap - now search tmap, if present then read
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin();
+ itr != xid_list.end() && !is_enq; itr++)
+ {
+ txn_data_list tx_list = _tmap.get_tdata_list(*itr);
+ for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
+ ditr++)
+ {
+ if (ditr->_enq_flag)
+ is_enq = ditr->_rid == _hdr._rid;
+ else
+ is_enq = ditr->_drid == _hdr._rid;
+ }
}
+ if (!_jc->is_read_only() && is_enq)
+ return RHM_IORES_TXPENDING;
}
#endif
if (is_enq) // ok, this record is enqueued, check it, then read it...
{
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
- // Is this locked by a pending dequeue transaction?
- try
- {
- if (_emap.is_locked(_hdr._rid) && !_jc->is_read_only())
- return RHM_IORES_TXPENDING;
- }
- catch (const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
-#endif
if (dtokp->rid())
{
if (_hdr._rid != dtokp->rid())
Modified: store/trunk/cpp/tests/jrnl/_st_read_txn.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read_txn.cpp 2008-03-06 18:25:07 UTC (rev 1764)
+++ store/trunk/cpp/tests/jrnl/_st_read_txn.cpp 2008-03-06 21:14:07 UTC (rev 1765)
@@ -46,9 +46,9 @@
// === Test suite ===
-QPID_AUTO_TEST_CASE(enqueue_commit_block)
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_block)
{
- string test_name = get_test_name(test_filename, "enqueue_commit_block");
+ string test_name = get_test_name(test_filename, "tx_enqueue_commit_block");
try
{
string msg;
@@ -57,33 +57,32 @@
string rxid;
bool transientFlag;
bool externalFlag;
-
jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
jrnl_init(jc);
create_xid(xid, 0, XID_SIZE);
for (int m=0; m<NUM_MSGS; m++)
enq_txn_msg(jc, m, create_msg(msg, m, MSG_SIZE), xid, false);
jc.flush();
- read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
txn_commit(jc, xid);
jc.flush();
- cout << "[NOTE: Partially disabled until outstanding issue(s) resolved.] ";
-// for (int m=0; m<NUM_MSGS; m++)
-// {
-// read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
-// BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
-// BOOST_CHECK_EQUAL(rxid, xid);
-// BOOST_CHECK_EQUAL(transientFlag, false);
-// BOOST_CHECK_EQUAL(externalFlag, false);
-// }
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(rxid, xid);
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(enqueue_commit_dequeue_block)
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_interleaved)
{
- string test_name = get_test_name(test_filename, "enqueue_commit_dequeue_block");
+ string test_name = get_test_name(test_filename, "tx_enqueue_commit_interleaved");
try
{
string msg;
@@ -95,22 +94,28 @@
jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
jrnl_init(jc);
- create_xid(xid, 0, XID_SIZE);
for (int m=0; m<NUM_MSGS; m++)
- enq_txn_msg(jc, m, create_msg(msg, m, MSG_SIZE), xid, false);
- txn_commit(jc, xid);
- for (int m=0; m<NUM_MSGS; m++)
- deq_msg(jc, m);
- jc.flush();
- read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ {
+ create_xid(xid, m, XID_SIZE);
+ enq_txn_msg(jc, 3*m, create_msg(msg, m, MSG_SIZE), xid, false);
+ jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+ txn_commit(jc, xid);
+ jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(rxid, xid);
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(enqueue_abort_block)
+QPID_AUTO_TEST_CASE(tx_enqueue_abort_block)
{
- string test_name = get_test_name(test_filename, "enqueue_abort_block");
+ string test_name = get_test_name(test_filename, "tx_enqueue_abort_block");
try
{
string msg;
@@ -119,14 +124,13 @@
string rxid;
bool transientFlag;
bool externalFlag;
-
jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
jrnl_init(jc);
- create_xid(xid, 0, XID_SIZE);
+ create_xid(xid, 1, XID_SIZE);
for (int m=0; m<NUM_MSGS; m++)
enq_txn_msg(jc, m, create_msg(msg, m, MSG_SIZE), xid, false);
jc.flush();
- read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
txn_abort(jc, xid);
jc.flush();
read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
@@ -135,9 +139,9 @@
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(enqueue_commit_interleaved)
+QPID_AUTO_TEST_CASE(tx_enqueue_abort_interleaved)
{
- string test_name = get_test_name(test_filename, "enqueue_commit_interleaved");
+ string test_name = get_test_name(test_filename, "tx_enqueue_abort_interleaved");
try
{
string msg;
@@ -149,29 +153,24 @@
jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
jrnl_init(jc);
- cout << "[NOTE: Partially disabled until outstanding issue(s) resolved.] ";
for (int m=0; m<NUM_MSGS; m++)
{
create_xid(xid, m, XID_SIZE);
enq_txn_msg(jc, 3*m, create_msg(msg, m, MSG_SIZE), xid, false);
jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+ txn_abort(jc, xid);
+ jc.flush();
read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
- txn_commit(jc, xid);
-// jc.flush();
-// read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
-// BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
-// BOOST_CHECK_EQUAL(rxid, xid);
-// BOOST_CHECK_EQUAL(transientFlag, false);
-// BOOST_CHECK_EQUAL(externalFlag, false);
}
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(enqueue_commit_dequeue_interleaved)
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_dequeue_block)
{
- string test_name = get_test_name(test_filename, "enqueue_commit_dequeue_interleaved");
+ string test_name = get_test_name(test_filename, "tx_enqueue_commit_dequeue_block");
try
{
string msg;
@@ -183,7 +182,34 @@
jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
jrnl_init(jc);
+ create_xid(xid, 2, XID_SIZE);
for (int m=0; m<NUM_MSGS; m++)
+ enq_txn_msg(jc, m, create_msg(msg, m, MSG_SIZE), xid, false);
+ txn_commit(jc, xid);
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_msg(jc, m);
+ jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(tx_enqueue_commit_dequeue_interleaved)
+{
+ string test_name = get_test_name(test_filename, "tx_enqueue_commit_dequeue_interleaved");
+ try
+ {
+ string msg;
+ string xid;
+ string rmsg;
+ string rxid;
+ bool transientFlag;
+ bool externalFlag;
+
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ for (int m=0; m<NUM_MSGS; m++)
{
create_xid(xid, m, XID_SIZE);
enq_txn_msg(jc, 3*m, create_msg(msg, m, MSG_SIZE), xid, false);
@@ -197,9 +223,9 @@
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(enqueue_abort_interleaved)
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_commit_block)
{
- string test_name = get_test_name(test_filename, "enqueue_abort_interleaved");
+ string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_commit_block");
try
{
string msg;
@@ -209,16 +235,114 @@
bool transientFlag;
bool externalFlag;
+ create_xid(xid, 3, XID_SIZE);
jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
jrnl_init(jc);
for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_txn_msg(jc, m, xid);
+ jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+ txn_commit(jc, xid);
+ jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_commit_interleaved)
+{
+ string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_commit_interleaved");
+ try
+ {
+ string msg;
+ string xid;
+ string rmsg;
+ string rxid;
+ bool transientFlag;
+ bool externalFlag;
+
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ for (int m=0; m<3*NUM_MSGS; m+=3)
{
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
create_xid(xid, m, XID_SIZE);
- enq_txn_msg(jc, 3*m, create_msg(msg, m, MSG_SIZE), xid, false);
+ deq_txn_msg(jc, m, xid);
jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+ txn_commit(jc, xid);
+ jc.flush();
read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ }
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_abort_block)
+{
+ string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_abort_block");
+ try
+ {
+ string msg;
+ string xid;
+ string rmsg;
+ string rxid;
+ bool transientFlag;
+ bool externalFlag;
+
+ create_xid(xid, 4, XID_SIZE);
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ for (int m=0; m<NUM_MSGS; m++)
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ for (int m=0; m<NUM_MSGS; m++)
+ deq_txn_msg(jc, m, xid);
+ jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
+ txn_abort(jc, xid);
+ jc.flush();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(rxid.length(), size_t(0));
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(enqueue_tx_dequeue_abort_interleaved)
+{
+ string test_name = get_test_name(test_filename, "enqueue_tx_dequeue_abort_interleaved");
+ try
+ {
+ string msg;
+ string xid;
+ string rmsg;
+ string rxid;
+ bool transientFlag;
+ bool externalFlag;
+
+ jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+ jrnl_init(jc);
+ for (int m=0; m<3*NUM_MSGS; m+=3)
+ {
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ create_xid(xid, m, XID_SIZE);
+ deq_txn_msg(jc, m, xid);
+ jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_TXPENDING);
txn_abort(jc, xid);
jc.flush();
+ read_msg(jc, rmsg, rxid, transientFlag, externalFlag);
read_msg(jc, rmsg, rxid, transientFlag, externalFlag, RHM_IORES_EMPTY);
}
}
More information about the rhmessaging-commits
mailing list