Author: kpvdr
Date: 2007-10-30 16:55:56 -0400 (Tue, 30 Oct 2007)
New Revision: 1187
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/tests/persistence.py
store/trunk/cpp/tests/system_test.sh
Log:
Added journal and bdb modes to system tests, both now pass.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-30 20:55:56 UTC (rev 1187)
@@ -335,7 +335,8 @@
jQueue->recover(prepared, key.id); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
- } catch (journal::jexception& e) {
+ } catch (const journal::jexception& e) {
+std::cout << e << std::flush;
std::string s;
THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
}
@@ -349,7 +350,6 @@
if (!usingJrnl()) //read all messages:
recoverMessages(txn, registry, queue_index, prepared, messages);
-
}
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-10-30 20:55:56 UTC (rev 1187)
@@ -22,6 +22,7 @@
*/
#include "JournalImpl.h"
+#include "jrnl/jerrno.hpp"
using namespace rhm::bdbstore;
@@ -52,14 +53,20 @@
// Populate PreparedTransaction lists from _tmap
for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
i != prep_tx_list.end(); i++) {
- journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid);
- assert(tdl.size()); // should never be empty
- for (journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++)
{
- if (tdl_itr->_enq_flag) { // enqueue op
- i->enqueues->add(queue_id, tdl_itr->_rid);
- } else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_drid);
+ try {
+ journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+ assert(tdl.size()); // should never be empty
+ for (journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end();
tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_drid);
+ }
}
}
+ catch (const journal::jexception& e) {
+ if (e.err_code() != journal::jerrno::JERR_MAP_NOTFOUND)
+ throw e;
+ }
}
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-10-30 20:55:56 UTC (rev 1187)
@@ -90,7 +90,7 @@
public:
TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
- if (loggedtx) tid = "rhm-tid" + getCount();
+ if (loggedtx){ tid.assign( "rhm-tid"); tid+=getCount(); }
}
/**
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-30 20:55:56 UTC (rev 1187)
@@ -199,7 +199,7 @@
const bool transient) throw (jexception)
{
check_wstatus("enqueue_tx_data_record");
- return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.c_str(),
xid.size(),
+ return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(),
xid.size(),
transient);
}
@@ -237,21 +237,21 @@
jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid) throw
(jexception)
{
check_wstatus("dequeue_data");
- return _wmgr.dequeue(dtokp, xid.c_str(), xid.size());
+ return _wmgr.dequeue(dtokp, xid.data(), xid.size());
}
const iores
jcntl::txn_abort(data_tok* const dtokp, const std::string& xid) throw (jexception)
{
check_wstatus("txn_abort");
- return _wmgr.abort(dtokp, xid.c_str(), xid.size());
+ return _wmgr.abort(dtokp, xid.data(), xid.size());
}
const iores
jcntl::txn_commit(data_tok* const dtokp, const std::string& xid) throw (jexception)
{
check_wstatus("txn_commit");
- return _wmgr.commit(dtokp, xid.c_str(), xid.size());
+ return _wmgr.commit(dtokp, xid.data(), xid.size());
}
const bool
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-30 20:55:56 UTC (rev 1187)
@@ -215,7 +215,7 @@
rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t&
xidsize, bool& transient,
data_tok* dtokp) throw (jexception)
{
-//std::cout << " rmgr::read()" << std::flush;
+//std::cout << " rmgr::read() ro=" <<
(_jc->is_read_only()?"T":"F") << std::flush;
iores res = pre_read_check(dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -281,34 +281,44 @@
bool is_enq = false;
try
{
-//std::cout << " ?" << _hdr._rid << std::flush;
+//std::cout << " rid=" << _hdr._rid << std::flush;
fid = _emap.get_fid(_hdr._rid);
-//std::cout << "-ok" << std::flush;
+//std::cout << ":ok" << std::flush;
is_enq = true;
}
catch (jexception& e)
{
+ // Block read for transactionally locked record (only when not
recovering)
if (e.err_code() == jerrno::JERR_MAP_LOCKED &&
!_jc->is_read_only())
throw e;
+//if (e.err_code() == jerrno::JERR_MAP_LOCKED) std::cout << ":locked"
<< std::flush;
- // Ok, not in emap, now search tmap for recover
+ // (Recover mode only) Ok, not in emap - now search tmap, if present then read
if (_jc->is_read_only())
{
std::vector<std::string> xid_list;
_tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr !=
xid_list.end() && !is_enq; itr++)
+ for (std::vector<std::string>::iterator itr = xid_list.begin();
+ itr != xid_list.end() && !is_enq; itr++)
{
+//std::cout << "[xid=" << *itr << ":" <<
std::flush;
txn_data_list tx_list = _tmap.get_tdata_list(*itr);
- for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
ditr++)
+ for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
+ ditr++)
{
- if (ditr->_rid == _hdr._rid)
- is_enq = true;
+//std::cout << (ditr->_enq_flag?"enq":"deq") <<
",rid=" << ditr->_rid << std::flush;
+//if (!ditr->_enq_flag) std::cout << ",drid=" << ditr->_drid
<< std::flush;
+ if (ditr->_enq_flag)
+ is_enq = ditr->_rid == _hdr._rid;
+ else
+ is_enq = ditr->_drid == _hdr._rid;
}
+//std::cout << "]" << std::flush;
}
- }
-//std::cout << "-nf" << std::flush;
+ }
}
#endif
+//std::cout << (is_enq?":enq":":not-enq") << std::flush;
if (is_enq) // ok, this record is enqueued, check it, then read it...
{
//std::cout << "e" << std::flush;
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-30 20:55:56 UTC (rev 1187)
@@ -93,7 +93,7 @@
{
std::stringstream ss;
ss << std::hex << "xid=\"" << xid <<
"\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"txn_data_list");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_tdata_list");
}
return itr->second;
}
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-30 20:55:56 UTC (rev 1187)
@@ -58,7 +58,8 @@
u_int16_t _fid; ///< File id, to be used when transferring to emap on
commit
bool _enq_flag; ///< If true, enq op, otherwise deq op
bool _aio_compl; ///< Initially false, set to true when AIO returns
- txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
const bool enq_flag);
+ txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
+ const bool enq_flag);
};
typedef txn_data_struct txn_data;
typedef std::vector<txn_data> txn_data_list;
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/tests/persistence.py 2007-10-30 20:55:56 UTC (rev 1187)
@@ -251,7 +251,7 @@
def xid(self, txid, branchqual = ''):
- return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+ return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
def txswap(self, src, dest, tx):
self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2007-10-30 20:54:29 UTC (rev 1186)
+++ store/trunk/cpp/tests/system_test.sh 2007-10-30 20:55:56 UTC (rev 1187)
@@ -33,6 +33,7 @@
exit
fi
+WORKING_DIR=/var/rhm
QPIDD=$QPID_DIR/cpp/src/qpidd
if test "$VERBOSE" = yes; then
@@ -58,19 +59,36 @@
fail=0
-for p in `seq 1 6`; do
- log="$abs_srcdir/vg-log.$p"
- $vg $QPIDD -s $LIBBDBSTORE >> "$abs_srcdir/qpid.log" 2> $log &
pid=$!
- sleep 5
+let sync=0
+JRNLFLAGS=''
+while ((sync <= 1)); do
+ echo
+ echo '*** FIXME: Journal cannot be started when previous mode was BDB and
database exists.'
+ rm -rf $WORKING_DIR/*
+ if ((sync == 1)); then
+ JRNLFLAGS='--store-async yes'
+ mode='jrnl'
+ echo 'Journal (AIO) persistence...'
+ else
+ mode='bdb'
+ echo 'BDB persistence...'
+ fi
+ for p in `seq 1 6`; do
+ log="$abs_srcdir/vg-log.$mode.$p"
+ echo "$vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS"
+ $vg $QPIDD -s $LIBBDBSTORE $JRNLFLAGS >> "$abs_srcdir/qpid.log"
2> $log & pid=$!
+ sleep 5
- echo phase $p...
- pwd
- python "$abs_srcdir/persistence.py" -s "$xml_spec" -p $p -r 3 ||
fail=1
- sleep 1
- kill -SIGINT $pid || { echo process already died; cat qpid.log ; fail=1; }
- wait $pid #temporarily disable failure until broker can shutdown cleanly|| fail=1
+ echo phase $p...
+ pwd
+ python "$abs_srcdir/persistence.py" -s "$xml_spec" -p $p -r 3
|| fail=1
+ sleep 1
+ kill -SIGINT $pid || { echo process already died; cat qpid.log ; fail=1; }
+ wait $pid #temporarily disable failure until broker can shutdown cleanly||
fail=1
- vg_check $log || fail=1
+ vg_check $log || fail=1
+ done
+ ((sync++))
done
pid=0