[rhmessaging-commits] rhmessaging commits: r1611 - in store/trunk/cpp: tests/jrnl/jtt and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Jan 25 14:39:33 EST 2008
Author: kpvdr
Date: 2008-01-25 14:39:32 -0500 (Fri, 25 Jan 2008)
New Revision: 1611
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp
store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp
store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp
store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
Log:
Various fixes for recover, especially transactional recover, both in the journal and the test tool.
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -448,7 +448,11 @@
{
_wmgr.get_events(pmgr::UNUSED);
if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ {
+ // TODO: Log this!
+ std::cout << "**** JERR_JCNTL_AIOCMPLWAIT *** " << _wmgr.status_str() << std::endl;
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+ }
::usleep(AIO_CMPL_SLEEP);
}
return true;
@@ -501,7 +505,8 @@
u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
-
+
+ // Remove all transactions not in prep_txn_list
std::vector<std::string> xid_list;
_tmap.xid_list(xid_list);
for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
@@ -590,21 +595,27 @@
ar.get_xid(&xidp);
assert(xidp != 0);
std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ try
{
- try
- {
- if (!itr->_enq_flag)
- _emap.unlock(itr->_drid);
- }
- catch(const jexception& e)
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ if (itr->_enq_flag)
+ rd._enq_cnt_list[itr->_fid]--;
+ else
+ {
+ try { _emap.unlock(itr->_drid); }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ }
}
- if (itr->_enq_flag)
- rd._enq_cnt_list[itr->_fid]--;
}
+ catch (const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
::free(xidp);
}
break;
@@ -617,17 +628,31 @@
cr.get_xid(&xidp);
assert(xidp != 0);
std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ try
{
- if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
- else // txn dequeue
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
- rd._enq_cnt_list[fid]--;
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ try
+ {
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ rd._enq_cnt_list[fid]--;
+ }
+ catch (const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
+ }
}
}
+ catch (const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
::free(xidp);
}
break;
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -89,7 +89,7 @@
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
- oss << std::hex << "xid=\"" << xid << "\"";
+ oss << std::hex << "xid=" << xid_format(xid);
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_tdata_list");
}
return itr->second;
@@ -104,7 +104,7 @@
{
pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
- oss << std::hex << "xid=\"" << xid << "\"";
+ oss << std::hex << "xid=" << xid_format(xid);
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
"get_remove_tdata_list");
}
@@ -134,7 +134,7 @@
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
- oss << std::hex << "xid=\"" << xid << "\"";
+ oss << std::hex << "xid=" << xid_format(xid);
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_rid_count");
}
return itr->second.size();
@@ -149,7 +149,7 @@
{
pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
- oss << std::hex << "xid=\"" << xid << "\"";
+ oss << std::hex << "xid=" << xid_format(xid);
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "is_txn_synced");
}
bool is_synced = true;
@@ -190,7 +190,7 @@
if (ok && !found)
{
std::ostringstream oss;
- oss << std::hex << "xid=\"" << xid << "\" rid=" << rid;
+ oss << std::hex << "xid=" << xid_format(xid) << " rid=" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "set_aio_compl");
}
return ok;
@@ -206,5 +206,17 @@
pthread_mutex_unlock(&_mutex);
}
+// static fn
+std::string
+txn_map::xid_format(const std::string& xid)
+{
+ if (xid.size() < 100)
+ return xid;
+ std::ostringstream oss;
+ oss << "\"" << xid.substr(0, 20) << " ... " << xid.substr(xid.size() - 20, 20);
+ oss << "\" (size: " << xid.size() << ")";
+ return oss.str();
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -129,6 +129,8 @@
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
void xid_list(std::vector<std::string>& xv);
+ private:
+ static std::string xid_format(const std::string& xid);
};
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -111,10 +111,16 @@
test_case_set tcs;
BOOST_REQUIRE_MESSAGE(check_csv_file(csv_file.c_str()), "Test CSV file \"" << csv_file <<
"\" is missing.");
- unsigned num_tcs = tcs.append_from_csv(csv_file);
+ tcs.append_from_csv(csv_file, false);
BOOST_CHECK(!tcs.empty());
- BOOST_CHECK_EQUAL(num_tcs, unsigned(44));
BOOST_CHECK_EQUAL(tcs.size(), unsigned(44));
+ BOOST_CHECK_EQUAL(tcs.ignored(), unsigned(0));
+ tcs.clear();
+ BOOST_CHECK(tcs.empty());
+ tcs.append_from_csv(csv_file, true);
+ BOOST_CHECK(!tcs.empty());
+ BOOST_CHECK_EQUAL(tcs.size(), unsigned(18));
+ BOOST_CHECK_EQUAL(tcs.ignored(), unsigned(26));
}
Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-01-25 19:39:32 UTC (rev 1611)
@@ -119,7 +119,7 @@
def split_str(s):
if len(s) > 25:
- return s[:10] + ' ... ' + s[-10:]
+ return s[:12] + ' ... ' + s[-10:]
else:
return s
@@ -435,6 +435,7 @@
self.file_num = 0
self.fro = 0x200
self.enqueued = {}
+ self.rec_cnt = 0
self.msg_cnt = 0
self.fhdr = None
self.f = None
@@ -468,6 +469,8 @@
break
if hdr.check():
stop = True;
+ else:
+ self.rec_cnt += 1
if self.first_rec:
if self.fhdr.fro != hdr.foffs:
raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
@@ -569,7 +572,7 @@
tss = fhdr.timestamp_str()
owi_found = True
if not self.qflag:
- print ' %s: owi=%s rid=%d, fro=0x%08x ts=%s' % (jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+ print ' %s: owi=%s rid=0x%x, fro=0x%08x ts=%s' % (jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
if fnum < 0 or rid < 0 or fro < 0:
raise Exception('All journal files empty')
if not self.qflag: print ' Oldest complete file: %s: rid=%d, fro=0x%08x ts=%s' % (fname, rid, fro, tss)
@@ -736,7 +739,7 @@
for h in self.enqueued:
print self.enqueued[h]
print 'WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain.' % len(self.enqueued)
- print 'Test passed; %d records processed.' % self.msg_cnt
+ print '%d enqueues, %d journal records processed.' % (self.msg_cnt, self.rec_cnt)
#===============================================================================
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -78,6 +78,8 @@
jrnl_instance::init_tc(test_case::shared_ptr& tcp, const bool recover_mode, const bool keep_jrnls)
throw ()
{
+ test_case_result::shared_ptr p(new test_case_result(_jpp->jid()));
+ _tcrp = p;
try
{
_tcp = tcp;
@@ -93,22 +95,19 @@
{
std::vector<std::string> prep_txn_list;
u_int64_t highest_rid;
- recover(0, 0, prep_txn_list, highest_rid);
+ recover(aio_rd_callback, aio_wr_callback, prep_txn_list, highest_rid);
recover_complete();
}
catch (const rhm::journal::jexception& e)
{
if (e.err_code() == rhm::journal::jerrno::JERR_JDIR_STAT)
- initialize(0, 0);
+ initialize(aio_rd_callback, aio_wr_callback);
else
throw;
}
}
else
initialize(aio_rd_callback, aio_wr_callback);
-
- test_case_result::shared_ptr p(new test_case_result(_jpp->jid()));
- _tcrp = p;
}
catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -189,7 +188,7 @@
{
std::ostringstream oss;
oss << "ERROR: Timeout waiting for journal \"" << _jid;
- oss << "\" to empty.";
+ oss << "\" to empty. (RHM_IORES_ENQCAPTHRESH)";
_tcrp->add_exception(oss.str());
}
else
Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -24,6 +24,7 @@
#include "test_case_set.hpp"
#include <fstream>
+#include <iostream>
namespace rhm
{
@@ -31,13 +32,16 @@
{
test_case_set::test_case_set():
- _tc_list()
+ _tc_list(),
+ _csv_ignored(0)
{}
-test_case_set::test_case_set(const std::string& csv_filename, const csv_map& cols):
- _tc_list()
+test_case_set::test_case_set(const std::string& csv_filename, const bool recover_mode,
+ const csv_map& cols):
+ _tc_list(),
+ _csv_ignored(0)
{
- append_from_csv(csv_filename, cols);
+ append_from_csv(csv_filename, recover_mode, cols);
}
test_case_set::~test_case_set()
@@ -57,10 +61,10 @@
#define CSV_BUFF_SIZE 2048
-unsigned
-test_case_set::append_from_csv(const std::string& csv_filename, const csv_map& cols)
+void
+test_case_set::append_from_csv(const std::string& csv_filename, const bool recover_mode,
+ const csv_map& cols)
{
- unsigned num_tc = 0;
char buff[CSV_BUFF_SIZE];
std::ifstream ifs(csv_filename.c_str());
while (ifs.good())
@@ -71,12 +75,13 @@
test_case::shared_ptr tcp = get_tc_from_csv(buff, cols);
if (tcp.get())
{
- append(tcp);
- num_tc++;
+ if (!recover_mode || tcp->auto_deq())
+ append(tcp);
+ else
+ _csv_ignored++;
}
}
}
- return num_tc;
}
test_case::shared_ptr
Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -64,13 +64,16 @@
private:
tcl _tc_list;
static bool _map_init;
+ unsigned _csv_ignored;
public:
test_case_set();
- test_case_set(const std::string& csv_filename, const csv_map& cols = std_csv_map);
+ test_case_set(const std::string& csv_filename, const bool recover_mode,
+ const csv_map& cols = std_csv_map);
virtual ~test_case_set();
inline const unsigned size() const { return _tc_list.size(); }
+ inline const unsigned ignored() const { return _csv_ignored; }
inline const bool empty() const { return _tc_list.empty(); }
inline void append(const test_case::shared_ptr& tc) { _tc_list.push_back(tc); }
@@ -79,10 +82,12 @@
const size_t min_xid_size, const size_t max_xid_size,
const test_case::transient_t transient,
const test_case::external_t external, const std::string& comment);
- unsigned append_from_csv(const std::string& csv_filename, const csv_map& cols = std_csv_map);
+ void append_from_csv(const std::string& csv_filename, const bool recover_mode,
+ const csv_map& cols = std_csv_map);
inline tcl_itr begin() { return _tc_list.begin(); }
inline tcl_itr end() { return _tc_list.end(); }
inline const test_case::shared_ptr& operator[](unsigned i) { return _tc_list[i]; }
+ inline void clear() { _tc_list.clear(); }
private:
test_case::shared_ptr get_tc_from_csv(const std::string& csv_line, const csv_map& cols);
Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2008-01-25 19:39:32 UTC (rev 1611)
@@ -52,11 +52,15 @@
const bool summary = true;
std::cout << "CSV file: \"" << _opts.test_case_csv_file_name << "\"";
- test_case_set tcs(_opts.test_case_csv_file_name);
+ test_case_set tcs(_opts.test_case_csv_file_name, _opts.recover_mode);
const unsigned num_test_cases = tcs.size();
if (num_test_cases)
{
- std::cout << " (containing " << tcs.size() << " test cases)" << std::endl;
+ std::cout << " (found " << tcs.size() << " test case" << (tcs.size() != 1 ? "s" : "") <<
+ ")" << std::endl;
+ if (tcs.ignored())
+ std::cout << "WARNING: " << tcs.ignored() << " test cases were ignored. "
+ "(recover-mode selected and test has no auto-dequeue.)" << std::endl;
_opts.print_flags();
}
else
@@ -132,7 +136,7 @@
{
std::ostringstream jid;
jid << std::hex << std::setfill('0');
- jid << "test_" << std::setw(2) << std::hex << i;
+ jid << "test_" << std::setw(4) << std::hex << i;
std::ostringstream jdir;
jdir << "/tmp/" << jid.str();
jrnl_init_params::shared_ptr jpp(new jrnl_init_params(jid.str(), jdir.str(), jid.str()));
More information about the rhmessaging-commits
mailing list