Author: cctrieloff
Date: 2007-10-16 08:29:00 -0400 (Tue, 16 Oct 2007)
New Revision: 1079
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
- Txn fixes
- TPC fixes
- All unit tests now pass
- still has sys tests issues, so jrnl not enabled yet
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -262,15 +262,18 @@
}
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- RecoverableTransaction::shared_ptr dtx =
- registry.recoverTransaction(i->xid,
std::auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(i->xid,
&messageIdSequence)));
+
+ TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+ RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid,
std::auto_ptr<TPCTransactionContext>(tpcc));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j !=
i->enqueues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (i->dequeues.get()) {
for (LockedMappings::iterator j = i->dequeues->begin(); j !=
i->dequeues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
dtx->dequeue(queues[j->first], messages[j->second]);
}
}
@@ -427,7 +430,8 @@
Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
- if (xidbuffSize > 0 && PreparedTransaction::isLocked(locked,
queue->getPersistenceId(), dtokp.rid()) ) {
+
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(),
dtokp.rid()) ) {
prepared[dtokp.rid()] = msg;
} else {
queue->recover(msg);
@@ -450,8 +454,6 @@
break;
case rhm::journal::RHM_IORES_EMPTY:
read = false;
- // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
- assert (jc->get_enq_cnt() == msg_count);
break; // done with all messages. ((add call in jrnl to test that _emap is empty.
default:
assert( "Store Error: Unexpected msg state");
@@ -558,7 +560,7 @@
for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
if (prepared.find(*i) == prepared.end()) {
TPCTxnCtxt txn(*i, NULL);
- completed(txn, dequeueXidDb, enqueueXidDb);
+ completed(txn, dequeueXidDb, enqueueXidDb, false);
}
}
readLockedMappings(enqueueXidDb, enqueues);
@@ -758,7 +760,7 @@
if (usingJrnl()){
// add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue);
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}else{
msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
@@ -876,7 +878,7 @@
if (usingJrnl()){
// add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue);
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
} else if (txn->isTPC()) {
@@ -997,7 +999,7 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply)
+void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool
commit)
{
if (!txn.get()) txn.begin(env);
@@ -1022,7 +1024,7 @@
}
prepareXidDb.del(txn.get(), &key, 0);
- txn.commit();
+ txn.complete(commit);
} catch (std::exception& e) {
std::cout << "Error completing xid " << txn.getXid()
<< ": " << e.what() << std::endl;
txn.abort();
@@ -1061,7 +1063,7 @@
Dbt value(&dummy, sizeof(dummy));
// make sure all the data is written to disk before returning
- txn->sync();
+ txn->sync();
prepareXidDb.put(txn->get(), &key, &value, 0);
txn->commit();
@@ -1069,14 +1071,14 @@
txn->abort();
throw e;
}
-
}
void BdbMessageStore::commit(TransactionContext& ctxt)
{
- TxnCtxt* txn(check(&ctxt));
+std::cout << " commit1" << std::flush;
+ TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb,
true);
} else {
txn->commit();
}
@@ -1086,7 +1088,7 @@
{
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb,
false);
} else {
txn->abort();
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-10-16 12:29:00 UTC (rev 1079)
@@ -100,7 +100,7 @@
bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
bool create(Db& db, IdSequence& seq, const
qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, Db& discard, Db& apply);
+ void completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool
commit);
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId,
u_int64_t queueId);
u_int64_t getRecordSize(Db& db, Dbt& key);
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -58,7 +58,7 @@
if (tdl_itr->_enq_flag) { // enqueue op
i->enqueues->add(queue_id, tdl_itr->_rid);
} else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_rid);
+ i->dequeues->add(queue_id, tdl_itr->_drid);
}
}
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-10-16 12:29:00 UTC (rev 1079)
@@ -43,8 +43,8 @@
class TxnCtxt : public qpid::broker::TransactionContext
{
-private:
- typedef std::set<const qpid::broker::PersistableQueue*> ipqdef;
+protected:
+ typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
ipqdef impactedQueues; // list of Queues used in the txn
static unsigned int count;
mutable qpid::sys::Mutex Lock;
@@ -62,7 +62,7 @@
void completeTXN(bool commit){
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end();
i++) {
- JournalImpl* jc =
static_cast<JournalImpl*>((*i)->getExternalQueueStore());
+ JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
DataTokenImpl* dtokp = new DataTokenImpl;
dtokp->set_rid(loggedtx->next());
@@ -70,11 +70,12 @@
if (commit)
jc->txn_commit(dtokp, getXid());
else
+ {
jc->txn_abort(dtokp, getXid());
-
+ }
} catch (rhm::journal::jexception& e) {
std::string str;
-std::cout << "Error commit" << e << std::endl;
+//std::cout << "Error commit" << e << std::endl;
delete dtokp;
THROW_STORE_EXCEPTION("Error commit" + e.to_string(str));
}
@@ -103,7 +104,7 @@
if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events
call aiolib..
allWritten = true;
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end();
i++) {
- JournalImpl* jc =
static_cast<JournalImpl*>((*i)->getExternalQueueStore());
+ JournalImpl* jc = static_cast<JournalImpl*>(*i);
try
{
@@ -116,8 +117,7 @@
}
}catch (rhm::journal::jexception& e) {
std::string str;
-std::cout << "Error sync" << e << std::endl;
-
+//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
THROW_STORE_EXCEPTION("Error sync" + e.to_string(str));
}
}
@@ -134,8 +134,8 @@
virtual const std::string& getXid() { return tid; }
void deleteXidRecord(){ impactedQueues.clear(); }
- void addXidRecord(const qpid::broker::PersistableQueue& queue){
- impactedQueues.insert(&queue); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue){
+ impactedQueues.insert(queue); }
};
@@ -146,6 +146,11 @@
TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx),
xid(_xid) {}
virtual bool isTPC() { return true; }
virtual const std::string& getXid() { return xid; }
+ // commit the BDB abort, abort commit the jnrl
+ void commit(){ txn->commit(0); txn = 0; }
+ void abort(){ txn->abort(); txn = 0; }
+ void complete(bool commit){
+ txn->commit(0); completeTXN(commit); txn = 0; }
};
}}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -375,7 +375,7 @@
std::vector<std::string>::const_iterator pitr =
std::find(prep_txn_list.begin(), prep_txn_list.end(), *itr);
if (pitr == prep_txn_list.end())
_tmap.get_remove_tdata_list(*itr);
- }
+ }
}
}
@@ -392,22 +392,19 @@
{
case RHM_JDAT_ENQ_MAGIC:
{
-std::cout << " e" << h._rid << std::flush;
enq_rec er;
while (!done)
{
-std::cout << "*" << std::flush;
done = er.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
rd._enq_cnt_list[fid]++;
if (er.xid_size())
{
-std::cout << "$" << std::flush;
er.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, er.xid_size());
- _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+ _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
free(xidp);
}
else
@@ -418,24 +415,21 @@
break;
case RHM_JDAT_DEQ_MAGIC:
{
-std::cout << " d" << h._rid << std::flush;
deq_rec dr;
while (!done)
{
-std::cout << "*" << std::flush;
done = dr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
if (dr.xid_size())
{
-std::cout << "$" << std::flush;
// If the enqueue is part of a pending txn, it will not yet be in
emap
- try { _emap.lock(h._rid); }
+ try { _emap.lock(dr.deq_rid()); }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e; }
dr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.deq_rid(), fid, false));
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid,
false));
free(xidp);
}
else
@@ -457,12 +451,10 @@
break;
case RHM_JDAT_TXA_MAGIC:
{
-std::cout << " a" << h._rid << std::flush;
txn_rec ar;
while (!done)
{
-std::cout << "*" << std::flush;
- done = ar.rcv_decode(h, ifsp, cum_size_read);
+ done = ar.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
// Delete this txn from tmap, unlock any locked records in emap
@@ -472,14 +464,18 @@
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- try { _emap.unlock(itr->_rid); }
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
+ }
catch(jexception e)
{
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e;
}
if (itr->_enq_flag)
- _wrfc.decr_enqcnt(itr->_fid);
+ rd._enq_cnt_list[fid]--;
}
free(xidp);
if (rd._h_rid < h._rid)
@@ -488,11 +484,9 @@
break;
case RHM_JDAT_TXC_MAGIC:
{
-std::cout << " c" << h._rid << std::flush;
txn_rec cr;
while (!done)
{
-std::cout << "*" << std::flush;
done = cr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
@@ -500,17 +494,15 @@
cr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, cr.xid_size());
-std::cout << "@" << std::flush;
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
-std::cout << " enq_flag=" << itr->_enq_flag <<
std::flush;
if (itr->_enq_flag) // txn enqueue
_emap.insert_fid(itr->_rid, itr->_fid);
else // txn dequeue
{
- u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
- _wrfc.decr_enqcnt(fid);
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ rd._enq_cnt_list[fid]--;
}
}
@@ -521,18 +513,15 @@
break;
case RHM_JDAT_EMPTY_MAGIC:
{
-std::cout << " x" << std::flush;
u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
break;
case 0:
-std::cout << " z" << std::flush;
rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
default:
-std::cout << " ?" << std::flush;
std::stringstream ss;
ss << std::hex << std::setfill('0') <<
"Magic=0x" << std::setw(8) << h._magic;
throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(),
"jcntl",
@@ -589,7 +578,6 @@
void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
-
//kpvdr TODO -- this list needs to be mutexed...???
// need to delete the dtok's
std::deque<rhm::journal::data_tok*>
this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
@@ -601,28 +589,23 @@
data_tok*& dtokp = this_dtok_list.front();
if (!journal->is_stopped() && dtokp->getSourceMessage())
{
- data_tok::write_state st = dtokp->wstate();
- if (st == data_tok::ENQ)
- {
-//std::cout << "----- enqueueComplete rid=" << dtokp->rid()
<< std::endl;
-
- dtokp->getSourceMessage()->enqueueComplete();
- /// cct --- if TPC work out what to do !!!
- }
- else if (dtokp->wstate() == data_tok::DEQ)
+ switch (dtokp->wstate())
{
-//std::cout << "----- dequeueComplete rid=" << dtokp->rid()
<< std::endl;
-
- dtokp->getSourceMessage()->dequeueComplete();
-
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after
last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after
last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+ break;
+ default:
+ ;
}
}
this_dtok_list.pop_front();
delete dtokp;
}
-
}
void
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -32,6 +32,7 @@
#include <jrnl/rmgr.hpp>
+#include <jrnl/jcntl.hpp>
#include <assert.h>
#include <cerrno>
#include <sstream>
@@ -287,8 +288,24 @@
}
catch (jexception& e)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ if (e.err_code() == jerrno::JERR_MAP_LOCKED &&
!_jc->is_read_only())
throw e;
+
+ // Ok, not in emap, now search tmap for recover
+ if (_jc->is_read_only())
+ {
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr !=
xid_list.end() && !is_enq; itr++)
+ {
+ txn_data_list tx_list = _tmap.get_tdata_list(*itr);
+ for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
ditr++)
+ {
+ if (ditr->_rid == _hdr._rid)
+ is_enq = true;
+ }
+ }
+ }
//std::cout << "-nf" << std::flush;
}
#endif
@@ -299,7 +316,7 @@
// Is this locked by a pending dequeue transaction?
try
{
- if (_emap.is_locked(_hdr._rid))
+ if (_emap.is_locked(_hdr._rid) &&
!_jc->is_read_only())
return RHM_IORES_TXPENDING;
}
catch (jexception e)
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -36,13 +36,17 @@
#include <sstream>
#include <jrnl/jerrno.hpp>
+#include <iostream> // for debug
+
namespace rhm
{
namespace journal
{
-txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool
enq_flag):
+txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const
u_int16_t fid,
+ const bool enq_flag):
_rid(rid),
+ _drid(drid),
_fid(fid),
_enq_flag(enq_flag),
_aio_compl(false)
@@ -139,10 +143,11 @@
ss << std::hex << "xid=\"" << xid <<
"\"";
throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"is_txn_synced");
}
- txn_data_list list = itr->second;
+//std::cout << " its: found XID" << std::flush;
bool is_synced = true;
- for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
+//std::cout << " rid=" << litr->_rid << " aioc="
<< litr->_aio_compl << std::flush;
if (!litr->_aio_compl)
{
is_synced = false;
@@ -154,7 +159,7 @@
}
const bool
-txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid) throw
(jexception)
{
bool ok = true;
bool found = false;
@@ -164,13 +169,19 @@
ok = false;
else
{
- txn_data_list list = itr->second;
- for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+//std::cout << " sac: found XID" << std::flush;
+// txn_data_list list = itr->second;
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end();
litr++)
{
if (litr->_rid == rid)
{
+// txn_data_struct t(litr->_rid, litr->_drid, litr->_fid, litr->_enq_flag);
+// t._aio_compl = true;
+// itr->second.erase(litr);
+// itr->second.push_back(t);
found = true;
litr->_aio_compl = true;
+//std::cout << " rid=" << rid << " aioc=" <<
litr->_aio_compl << " ptr=" << std::flush;
break;
}
}
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -54,10 +54,11 @@
struct txn_data_struct
{
u_int64_t _rid; ///< Record id for this operation
+ u_int64_t _drid; ///< Dequeue record id for this operation
u_int16_t _fid; ///< File id, to be used when transferring to emap on
commit
bool _enq_flag; ///< If true, enq op, otherwise deq op
bool _aio_compl; ///< Initially false, set to true when AIO returns
- txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
+ txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
const bool enq_flag);
};
typedef txn_data_struct txn_data;
typedef std::vector<txn_data> txn_data_list;
@@ -82,7 +83,7 @@
const txn_data_list get_remove_tdata_list(const std::string& xid) throw
(jexception);
const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
const bool is_txn_synced(const std::string& xid) throw (jexception);
- const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
+ const bool set_aio_compl(const std::string& xid, const u_int64_t rid) throw
(jexception);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -167,7 +167,7 @@
if (xid_len) // If part of transaction, add to transaction map
{
std::string xid((char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), true));
+ _tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
}
else
_emap.insert_fid(rid, dtokp->fid());
@@ -307,10 +307,10 @@
if (xid_len) // If part of transaction, add to transaction map
{
// If the enqueue is part of a pending txn, it will not yet be in emap
- try { _emap.lock(rid); }
+ try { _emap.lock(dequeue_rid); }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e; }
std::string xid((char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data(dequeue_rid, dtokp->fid(),
false));
+ _tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(),
false));
}
else
{
@@ -444,7 +444,11 @@
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- try { _emap.unlock(itr->_rid); }
+ try
+ {
+ if (!itr->_enq_flag)
+ _emap.unlock(itr->_drid);
+ }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e; }
if (itr->_enq_flag)
_wrfc.decr_enqcnt(itr->_fid);
@@ -587,7 +591,7 @@
_emap.insert_fid(itr->_rid, itr->_fid);
else // txn dequeue
{
- u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
+ u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
_wrfc.decr_enqcnt(fid);
}
}
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-10-15 21:13:51 UTC (rev 1078)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-10-16 12:29:00 UTC (rev 1079)
@@ -41,7 +41,6 @@
class TwoPhaseCommitTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(TwoPhaseCommitTest);
-
CPPUNIT_TEST(testCommitSwap);
CPPUNIT_TEST(testPrepareAndAbortSwap);
CPPUNIT_TEST(testAbortNoPrepareSwap);
@@ -72,9 +71,10 @@
{
TwoPhaseCommitTest* const test;
const string messageId;
+ Message::shared_ptr msg;
public:
Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_),
messageId(messageId_) {}
- void init(){ test->deliver(messageId, test->queueA); }
+ void init(){ msg = test->deliver(messageId, test->queueA); }
void run(TPCTransactionContext* txn) { test->swap(txn); }
void check(bool committed) { test->swapCheck(committed, messageId); }
};
@@ -82,13 +82,16 @@
class Enqueue : public Strategy
{
TwoPhaseCommitTest* const test;
+ Message::shared_ptr msg1;
+ Message::shared_ptr msg2;
+ Message::shared_ptr msg3;
public:
Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {}
void run(TPCTransactionContext* txn) {
- test->enqueue(txn, "Enqueue1");
- test->enqueue(txn, "Enqueue2");
- test->enqueue(txn, "Enqueue3");
+ msg1 = test->enqueue(txn, "Enqueue1");
+ msg2 = test->enqueue(txn, "Enqueue2");
+ msg3 = test->enqueue(txn, "Enqueue3");
}
void check(bool committed) {
if (committed) {
@@ -103,12 +106,15 @@
class Dequeue : public Strategy
{
TwoPhaseCommitTest* const test;
+ Message::shared_ptr msg1;
+ Message::shared_ptr msg2;
+ Message::shared_ptr msg3;
public:
Dequeue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {
- test->deliver("Dequeue1", test->queueA);
- test->deliver("Dequeue2", test->queueA);
- test->deliver("Dequeue3", test->queueA);
+ msg1 = test->deliver("Dequeue1", test->queueA);
+ msg2 = test->deliver("Dequeue2", test->queueA);
+ msg3 = test->deliver("Dequeue3", test->queueA);
}
void run(TPCTransactionContext* txn) {
test->dequeue(txn);
@@ -132,7 +138,10 @@
QueueRegistry queues;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
-
+ Message::shared_ptr msg1;
+ Message::shared_ptr msg2;
+ Message::shared_ptr msg4;
+
public:
TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
@@ -214,7 +223,6 @@
std::auto_ptr<TPCTransactionContext>
txn(store->begin("my-xid"));
swap.run(txn.get());
store->prepare(*txn);
-
restart();
//check that the message is not available from either queue
@@ -261,29 +269,31 @@
void swap(TPCTransactionContext* txn)
{
- Message::shared_ptr msg = queueA->dequeue().payload;//just dequeues in memory
+ msg1 = queueA->dequeue().payload;//just dequeues in memory
//move the message from one queue to the other as part of a
//distributed transaction
- queueB->enqueue(txn, msg);//note: need to enqueue it first to avoid message
being deleted
- queueA->dequeue(txn, msg);
+ queueB->enqueue(txn, msg1);//note: need to enqueue it first to avoid message
being deleted
+ queueA->dequeue(txn, msg1);
}
void dequeue(TPCTransactionContext* txn)
{
- Message::shared_ptr msg = queueA->dequeue().payload;//just dequeues in memory
- queueA->dequeue(txn, msg);
+ msg2 = queueA->dequeue().payload;//just dequeues in memory
+ queueA->dequeue(txn, msg2);
}
- void enqueue(TPCTransactionContext* txn, const string& msgid)
+ Message::shared_ptr enqueue(TPCTransactionContext* txn, const string& msgid)
{
Message::shared_ptr msg = createMessage(msgid);
queueA->enqueue(txn, msg);
+ return msg;
}
- void deliver(const string& msgid, Queue::shared_ptr& queue)
+ Message::shared_ptr deliver(const string& msgid, Queue::shared_ptr& queue)
{
- Message::shared_ptr msg = createMessage(msgid);
- queue->deliver(msg);
+ msg4 = createMessage(msgid);
+ queue->deliver(msg4);
+ return msg4;
}
void setup()