Author: cctrieloff
Date: 2007-10-12 18:20:26 -0400 (Fri, 12 Oct 2007)
New Revision: 1033
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
lots of small fixes for txns
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-12 22:20:26 UTC (rev 1033)
@@ -263,7 +263,7 @@
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
RecoverableTransaction::shared_ptr dtx =
- registry.recoverTransaction(i->xid,
std::auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(i->xid)));
+ registry.recoverTransaction(i->xid,
std::auto_ptr<TPCTransactionContext>(new TPCTxnCtxt(i->xid,
&messageIdSequence)));
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j !=
i->enqueues->end(); j++) {
dtx->enqueue(queues[j->first], messages[j->second]);
@@ -401,7 +401,10 @@
dtokp.set_wstate(DataTokenImpl::ENQ);
// read the message from the Journal.
try {
- while (read) {
+
+//std::cout << jc->dirname() <<"-queueName:" <<
queue->getName() << "-enq count:" << jc->get_enq_cnt()
<< std::endl;
+
+ while (read) {
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize,
&xidbuff, xidbuffSize, transientFlag, &dtokp);
readSize = dtokp.dsize();
@@ -540,6 +543,9 @@
std::set<string> prepared;
collectPreparedXids(prepared);
+//std::cout << "prep size:" << prepared.size() << std::endl;
+
+
//when using the async journal, it will abort unprepaired xids and populate the locked
maps
if (!usingJrnl()){
txn_lock_map enqueues;
@@ -551,7 +557,7 @@
//abort all known but unprepared xids:
for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
if (prepared.find(*i) == prepared.end()) {
- TPCTxnCtxt txn(*i);
+ TPCTxnCtxt txn(*i, NULL);
completed(txn, dequeueXidDb, enqueueXidDb);
}
}
@@ -562,6 +568,8 @@
}
} else {
for (std::set<string>::iterator i = prepared.begin(); i != prepared.end();
i++) {
+
+//std::cout << "prep:" << *i << std::endl;
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -900,8 +908,8 @@
bool written = false;
DataTokenImpl* ddtokp = new DataTokenImpl;
ddtokp->setSourceMessage (&msg);
- ddtokp->set_rid(msg.getPersistenceId()); // message id to be dequeued
- ddtokp->set_dequeue_rid(messageIdSequence.next());
+ ddtokp->set_rid(messageIdSequence.next());
+ ddtokp->set_dequeue_rid(msg.getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
string tid;
@@ -921,7 +929,7 @@
}
} catch (rhm::journal::jexception& e) {
std::string str;
- delete ddtokp;
+ //delete ddtokp;
THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
}
switch (dres)
@@ -1024,14 +1032,19 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
- TxnCtxt* txn(new TxnCtxt(true));
+ // pass sequence number for c/a when using jrnl
+ TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const
std::string& xid)
{
- TPCTxnCtxt* txn(new TPCTxnCtxt(xid));
+ IdSequence* jtx = NULL;
+ if (usingJrnl()) jtx = &messageIdSequence;
+
+ // pass sequence number for c/a when using jrnl
+ TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
txn->begin(env);
return auto_ptr<TPCTransactionContext>(txn);
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-10-12 22:20:26 UTC (rev 1033)
@@ -48,7 +48,7 @@
ipqdef impactedQueues; // list of Queues used in the txn
static unsigned int count;
mutable qpid::sys::Mutex Lock;
- bool loggedtx;
+ IdSequence* loggedtx;
unsigned int getCount() {
qpid::sys::Mutex::ScopedLock locker(Lock);
@@ -63,21 +63,31 @@
void completeTXN(bool commit){
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end();
i++) {
JournalImpl* jc =
static_cast<JournalImpl*>((*i)->getExternalQueueStore());
- if (jc) { /* if using journal */
+ if (jc && loggedtx) { /* if using journal */
DataTokenImpl* dtokp = new DataTokenImpl;
- if (commit)
- jc->txn_commit(dtokp, getXid());
- else
- jc->txn_abort(dtokp, getXid());
- }
+ dtokp->set_rid(loggedtx->next());
+ try{
+ if (commit)
+ jc->txn_commit(dtokp, getXid());
+ else
+ jc->txn_abort(dtokp, getXid());
+
+ } catch (rhm::journal::jexception& e) {
+ std::string str;
+std::cout << "Error commit" << e << std::endl;
+ delete dtokp;
+ THROW_STORE_EXCEPTION("Error commit" + e.to_string(str));
+ }
+
}
+ }
+ sync();
deleteXidRecord();
- sync();
}
public:
- TxnCtxt(bool _loggedtx=false) : loggedtx(_loggedtx), txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
if (loggedtx) tid = "rhm-tid" + getCount();
}
@@ -94,13 +104,22 @@
allWritten = true;
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end();
i++) {
JournalImpl* jc =
static_cast<JournalImpl*>((*i)->getExternalQueueStore());
- if (jc && !(jc->is_txn_synced(getXid())))
+
+ try
{
- if (firstloop)
- jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
+ if (jc && !(jc->is_txn_synced(getXid())))
+ {
+ if (firstloop)
+ jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ }catch (rhm::journal::jexception& e) {
+ std::string str;
+std::cout << "Error sync" << e << std::endl;
+
+ THROW_STORE_EXCEPTION("Error sync" + e.to_string(str));
+ }
}
firstloop = false;
}
@@ -108,8 +127,8 @@
virtual ~TxnCtxt() { if(txn) abort(); }
void begin(DbEnv& env){ env.txn_begin(0, &txn, 0); }
- void commit(){ txn->commit(0); completeTXN(true); txn = 0; sync();}
- void abort(){ txn->abort(); completeTXN(false); txn = 0; sync();}
+ void commit(){ txn->commit(0); completeTXN(true); txn = 0; }
+ void abort(){ txn->abort(); completeTXN(false); txn = 0; }
DbTxn* get(){ return txn; }
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
@@ -124,7 +143,7 @@
{
const std::string xid;
public:
- TPCTxnCtxt(const std::string& _xid) : xid(_xid) {}
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx),
xid(_xid) {}
virtual bool isTPC() { return true; }
virtual const std::string& getXid() { return xid; }
};
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-12 22:20:26 UTC (rev 1033)
@@ -80,6 +80,7 @@
// Decode used for recover
const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw
(jexception);
+ inline const u_int64_t rid() const { return _deq_hdr._hdr._rid; }
inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-12 22:20:26 UTC (rev 1033)
@@ -367,13 +367,20 @@
{
u_int16_t fid = rd._ffid;
std::ifstream ifs;
- while (rcvr_get_next_record(fid, &ifs, rd, prep_txn_list));
+ while (rcvr_get_next_record(fid, &ifs, rd));
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr !=
xid_list.end(); itr++)
+ {
+ std::vector<std::string>::const_iterator pitr =
std::find(prep_txn_list.begin(), prep_txn_list.end(), *itr);
+ if (pitr == prep_txn_list.end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
const bool
-jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
- const std::vector<std::string>& prep_txn_list) throw (jexception)
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
throw (jexception)
{
size_t cum_size_read = 0;
bool done = false;
@@ -385,24 +392,22 @@
{
case RHM_JDAT_ENQ_MAGIC:
{
-//std::cout << " e" << h._rid << std::flush;
+std::cout << " e" << h._rid << std::flush;
enq_rec er;
while (!done)
{
+std::cout << "*" << std::flush;
done = er.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
rd._enq_cnt_list[fid]++;
if (er.xid_size())
{
-//std::cout << "$" << std::flush;
+std::cout << "$" << std::flush;
er.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, er.xid_size());
- std::vector<std::string>::const_iterator cit =
std::find(prep_txn_list.begin(),
- prep_txn_list.end(), xid);
- if (cit != prep_txn_list.end())
- _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+ _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
free(xidp);
}
else
@@ -413,26 +418,24 @@
break;
case RHM_JDAT_DEQ_MAGIC:
{
-//std::cout << " d" << h._rid << std::flush;
+std::cout << " d" << h._rid << std::flush;
deq_rec dr;
while (!done)
{
+std::cout << "*" << std::flush;
done = dr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
if (dr.xid_size())
{
-//std::cout << "$" << std::flush;
+std::cout << "$" << std::flush;
// If the enqueue is part of a pending txn, it will not yet be in
emap
try { _emap.lock(h._rid); }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e; }
dr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, dr.xid_size());
- std::vector<std::string>::const_iterator cit =
std::find(prep_txn_list.begin(),
- prep_txn_list.end(), xid);
- if (cit != prep_txn_list.end())
- _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
+ _tmap.insert_txn_data(xid, txn_data(dr.deq_rid(), fid, false));
free(xidp);
}
else
@@ -454,10 +457,11 @@
break;
case RHM_JDAT_TXA_MAGIC:
{
-//std::cout << " a" << h._rid << std::flush;
+std::cout << " a" << h._rid << std::flush;
txn_rec ar;
while (!done)
{
+std::cout << "*" << std::flush;
done = ar.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
@@ -465,22 +469,17 @@
ar.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, ar.xid_size());
- std::vector<std::string>::const_iterator cit =
std::find(prep_txn_list.begin(),
- prep_txn_list.end(), xid);
- if (cit != prep_txn_list.end())
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ try { _emap.unlock(itr->_rid); }
+ catch(jexception e)
{
- try { _emap.unlock(itr->_rid); }
- catch(jexception e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw e;
- }
- if (itr->_enq_flag)
- _wrfc.decr_enqcnt(itr->_fid);
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw e;
}
+ if (itr->_enq_flag)
+ _wrfc.decr_enqcnt(itr->_fid);
}
free(xidp);
if (rd._h_rid < h._rid)
@@ -489,10 +488,11 @@
break;
case RHM_JDAT_TXC_MAGIC:
{
-//std::cout << " c" << h._rid << std::flush;
+std::cout << " c" << h._rid << std::flush;
txn_rec cr;
while (!done)
{
+std::cout << "*" << std::flush;
done = cr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
@@ -500,22 +500,20 @@
cr.get_xid(&xidp);
assert(xidp != NULL);
std::string xid((char*)xidp, cr.xid_size());
- std::vector<std::string>::const_iterator cit =
std::find(prep_txn_list.begin(),
- prep_txn_list.end(), xid);
- if (cit != prep_txn_list.end())
+std::cout << "@" << std::flush;
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+std::cout << " enq_flag=" << itr->_enq_flag <<
std::flush;
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
{
- if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
- else // txn dequeue
- {
- u_int16_t fid = _emap.get_remove_fid(h._rid, true);
- _wrfc.decr_enqcnt(fid);
- }
+ u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
+ _wrfc.decr_enqcnt(fid);
}
}
+
free(xidp);
if (rd._h_rid < h._rid)
rd._h_rid = h._rid;
@@ -523,18 +521,18 @@
break;
case RHM_JDAT_EMPTY_MAGIC:
{
-//std::cout << " x" << std::flush;
+std::cout << " x" << std::flush;
u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
break;
case 0:
-//std::cout << " z" << std::flush;
+std::cout << " z" << std::flush;
rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
default:
-//std::cout << " ?" << std::flush;
+std::cout << " ?" << std::flush;
std::stringstream ss;
ss << std::hex << std::setfill('0') <<
"Magic=0x" << std::setw(8) << h._magic;
throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(),
"jcntl",
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-12 22:20:26 UTC (rev 1033)
@@ -614,8 +614,7 @@
void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>&
prep_txn_list)
throw (jexception);
- const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp,
rcvdat& rd,
- const std::vector<std::string>& prep_txn_list) throw
(jexception);
+ const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp,
rcvdat& rd) throw (jexception);
const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
const bool jump_fro);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-12 21:58:53 UTC (rev 1032)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-12 22:20:26 UTC (rev 1033)
@@ -263,8 +263,15 @@
}
// TODO: Tidy this up!
- u_int64_t rid = initialize_rid(cont, dtokp);
+// u_int64_t rid = initialize_rid(cont, dtokp);
+// _deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
+ u_int64_t rid = dtokp->getSourceMessage() ? dtokp->rid() : (cont ? _wrfc.rid()
- 1 : _wrfc.get_incr_rid());
u_int64_t dequeue_rid = dtokp->getSourceMessage() ? dtokp->dequeue_rid() :
dtokp->rid();
+ if (!dtokp->getSourceMessage())
+ {
+ dtokp->set_rid(rid);
+ dtokp->set_dequeue_rid(dequeue_rid);
+ }
_deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len);
if (!cont)
{
@@ -303,11 +310,11 @@
try { _emap.lock(rid); }
catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e; }
std::string xid((char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), false));
+ _tmap.insert_txn_data(xid, txn_data(dequeue_rid, dtokp->fid(),
false));
}
else
{
- u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
+ u_int16_t fid = _emap.get_remove_fid(dtokp->dequeue_rid());
_wrfc.decr_enqcnt(fid);
}
#endif
@@ -399,7 +406,7 @@
else
_abort_busy = true;
- u_int64_t rid = initialize_rid(cont, dtokp);
+ u_int64_t rid = dtokp->rid() ? dtokp->rid() : (cont ? _wrfc.rid() - 1 :
_wrfc.get_incr_rid());
_txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len);
if (!cont)
{
@@ -538,7 +545,7 @@
else
_commit_busy = true;
- u_int64_t rid = initialize_rid(cont, dtokp);
+ u_int64_t rid = dtokp->rid() ? dtokp->rid() : (cont ? _wrfc.rid() - 1 :
_wrfc.get_incr_rid());
_txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len);
if (!cont)
{
@@ -806,7 +813,7 @@
break;
default:
std::stringstream ss;
- ss << "dtok_state=" << dtp->wstate_str();
+ ss << "dtok_id=" << dtp->id() <<
"dtok_state=" << dtp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(),
"wmgr",
"get_events");
}