[rhmessaging-commits] rhmessaging commits: r2279 - in store/branches/mrg-1.0/cpp: lib/jrnl and 3 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Aug 12 11:27:18 EDT 2008
Author: kpvdr
Date: 2008-08-12 11:27:18 -0400 (Tue, 12 Aug 2008)
New Revision: 2279
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp
store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
store/branches/mrg-1.0/cpp/tests/persistence.py
Log:
Fix for BZ458053: "txtest failures when broker killed during transfer phase". Modified message recovery to correctly predict outcome of to-be-rolled-forward/back transactions. Access to jcntl::_emap was required for this, so some accessers were added to class jcntl. Includes fix to python file check program jfile_chk.py which incorrectly detected owi on last-to-first file transition and message content overflowed.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -52,10 +52,12 @@
BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
- const bool _commit_flag) :
+ const bool _commit_flag,
+ const bool _tpc_flag) :
rid(_rid),
deq_flag(_deq_flag),
- commit_flag(_commit_flag)
+ commit_flag(_commit_flag),
+ tpc_flag(_tpc_flag)
{}
BdbMessageStore::BdbMessageStore(const char* envpath) :
@@ -347,7 +349,7 @@
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
- string("JournalData"), defJournalGetEventsTimeout,
+ std::string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout);
}
@@ -521,36 +523,61 @@
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
- std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ std::string xid = i->xid;
- tpcc->prepare(tplStorePtr.get());
-
// Restore data token state in TxnCtxt
- TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- tpcc->recoverDtok(citr->second.rid, i->xid);
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTxnFlag = citr->second.deq_flag;
+ bool incomplTplTxnFlag = citr->second.deq_flag;
- RecoverableTransaction::shared_ptr dtx;
- if (!incomplTxnFlag) dtx = registry.recoverTransaction(i->xid, txn);
- if (i->enqueues.get()) {
- for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+ if (citr->second.tpc_flag) {
+ // Dtx (2PC) transaction
+ TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
+ std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ tpcc->recoverDtok(citr->second.rid, xid);
+ tpcc->prepare(tplStorePtr.get());
+
+ RecoverableTransaction::shared_ptr dtx;
+ if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
+ if (i->enqueues.get()) {
+ for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+ }
}
- }
- if (i->dequeues.get()) {
- for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+ if (i->dequeues.get()) {
+ for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+ }
}
+
+ if (incomplTplTxnFlag) {
+ tpcc->complete(citr->second.commit_flag);
+ }
+ } else {
+ // Local (1PC) transaction
+ boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
+ opcc->recoverDtok(citr->second.rid, xid);
+ opcc->prepare(tplStorePtr.get());
+
+ if (i->enqueues.get()) {
+ for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ }
+ }
+ if (i->dequeues.get()) {
+ for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ }
+ }
+ if (incomplTplTxnFlag) {
+ opcc->complete(citr->second.commit_flag);
+ }
}
-
- if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
}
registry.recoveryComplete();
}
@@ -580,7 +607,7 @@
JournalImpl* jQueue = 0;
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
@@ -649,8 +676,8 @@
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
- string queueName;
- string routingkey;
+ std::string queueName;
+ std::string routingkey;
FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
@@ -695,6 +722,7 @@
txn_list& prepared,
message_index& messages)
{
+//std::cout << "***** recoverMessages(): queue=" << queue->getName() << std::endl;
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -748,15 +776,36 @@
}
PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
- if (i == prepared.end()) { // not locked
+ if (i == prepared.end()) { // not in prepared list
queue->recover(msg);
} else {
- TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ u_int64_t rid = dtokp.rid();
+ std::string xid(i->xid);
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
- if (citr->second.commit_flag) queue->recover(msg); // treat as non-tx, roll forward (else aborted, throw away)
+ if (jc->is_enqueued(rid, true)) {
+ // Enqueue is non-tx, dequeue tx
+ assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
+ if (!citr->second.commit_flag) {
+ queue->recover(msg); // recover message in abort case only
+ }
+ } else {
+ // Enqueue and/or dequeue tx
+ journal::txn_map& tmap = jc->get_txn_map();
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ bool enq = false;
+ bool deq = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag && j->_rid == rid) enq = true;
+ else if (!j->_enq_flag && j->_drid == rid) deq = true;
+ }
+ if (enq && !deq && citr->second.commit_flag) {
+ queue->recover(msg); // recover txn message in commit case only
+ }
+ }
} else {
- messages[dtokp.rid()] = msg;
+ messages[rid] = msg;
}
}
@@ -818,7 +867,7 @@
txn.commit();
} catch (const DbException& e) {
txn.abort();
- THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + string(e.what()));
+ THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + std::string(e.what()));
} catch (...) {
txn.abort();
throw;
@@ -857,54 +906,81 @@
return count;
}
-void BdbMessageStore::recoverTplStore()
+void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
{
if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- tplStorePtr->recover_complete(); // start journal.
- }
-}
-void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
-{
- if (tplStorePtr.get()) {
- if (!tplStorePtr->is_ready())
- recoverTplStore();
+ DataTokenImpl dtokp;
+ void* dbuff = NULL; size_t dbuffSize = 0;
+ void* xidbuff = NULL; size_t xidbuffSize = 0;
+ bool transientFlag = false;
+ bool externalFlag = false;
+ bool done = false;
+ try {
+ unsigned aio_sleep_cnt = 0;
+ while (!done) {
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+ switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+ case rhm::journal::RHM_IORES_SUCCESS: {
+ // Every TPL record contains both data and an XID
+ assert(dbuffSize>0);
+ assert(xidbuffSize>0);
+ std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+ bool is2PC = *(static_cast<char*>(dbuff)) != 0;
- // TODO: The journal will return a const txn_map and the txn_map will support
- // const operations at some point. Using non-const txn_map this way is ugly...
- journal::txn_map& tmap = tplStorePtr->get_txn_map();
- std::vector<std::string> xidList;
- tmap.xid_list(xidList);
- for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
- journal::txn_data_list txnList = tmap.get_tdata_list(*i);
- unsigned enqCnt = 0;
- unsigned deqCnt = 0;
- u_int64_t rid = 0;
- bool commitFlag = false;
- for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->_enq_flag) {
- rid = j->_rid;
- enqCnt++;
- } else {
- commitFlag = j->_commit_flag;
- deqCnt++;
- }
+ // Check transaction details; add to recover map
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ unsigned enqCnt = 0;
+ unsigned deqCnt = 0;
+ u_int64_t rid = 0;
+ bool commitFlag = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag) {
+ rid = j->_rid;
+ enqCnt++;
+ } else {
+ commitFlag = j->_commit_flag;
+ deqCnt++;
+ }
+ }
+ assert(enqCnt == 1);
+ assert(deqCnt <= 1);
+ tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
+
+ ::free(xidbuff);
+ aio_sleep_cnt = 0;
+ break;
+ }
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ done = true;
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+ default:
+ assert("Store Error: Unexpected msg state");
+ } // switch
}
- assert(enqCnt == 1);
- assert(deqCnt <= 1);
- tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
+
+ tplStorePtr->recover_complete(); // start journal.
}
}
void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
if (!tplStorePtr->is_ready())
- getTplRecoverMap(tplRecoverMap);
+ recoverTplStore(tplRecoverMap);
// Abort unprepaired xids and populate the locked maps
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -916,12 +992,13 @@
}
}
-void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
+void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
{
if (!tplStorePtr->is_ready())
- getTplRecoverMap(tplRecoverMap);
+ recoverTplStore(tplRecoverMap);
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- if (!i->second.deq_flag) // Discard all txns that are to be rolled forward/back
+ // Discard all txns that are to be rolled forward/back and 1PC transactions
+ if (!i->second.deq_flag && i->second.tpc_flag)
xids.insert(i->first);
}
}
@@ -994,7 +1071,7 @@
try {
int status = db.get(txn, &key, &peek, 0);
if (status != DB_BUFFER_SMALL) {
- THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + string(DbEnv::strerror(status)));
+ THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + std::string(DbEnv::strerror(status)));
}
} catch (const DbMemoryException& expected) {
//api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
@@ -1230,7 +1307,7 @@
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
- string tid;
+ std::string tid;
if (ctxt) {
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
@@ -1339,7 +1416,8 @@
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ char tpcFlag = static_cast<char>(ctxt->isTPC());
+ tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false);
ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
@@ -1409,7 +1487,7 @@
} else if (status == DB_NOTFOUND) {
return false;
} else {
- THROW_STORE_EXCEPTION("Deletion failed: " + string(DbEnv::strerror(status)));
+ THROW_STORE_EXCEPTION("Deletion failed: " + std::string(DbEnv::strerror(status)));
}
}
@@ -1489,33 +1567,33 @@
}
}
-string BdbMessageStore::getJrnlBaseDir()
+std::string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
-string BdbMessageStore::getBdbBaseDir()
+std::string BdbMessageStore::getBdbBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/dat/" ;
return dir.str();
}
-string BdbMessageStore::getTplBaseDir()
+std::string BdbMessageStore::getTplBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/tpl/" ;
return dir.str();
}
-string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
{
return getJrnlDir(queue.getName().c_str());
}
-string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
{
std::stringstream dir;
dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-08-12 15:27:18 UTC (rev 2279)
@@ -63,10 +63,11 @@
// Structs for Transaction Recover List (TPL) recover state
struct TplRecoverStruct {
- u_int64_t rid;
+ u_int64_t rid; // rid of TPL record
bool deq_flag;
bool commit_flag;
- TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ bool tpc_flag;
+ TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
};
typedef TplRecoverStruct TplRecover;
typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
@@ -157,8 +158,7 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverTplStore();
- void getTplRecoverMap(TplRecoverMap& tplMap);
+ void recoverTplStore(TplRecoverMap& tplMap);
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -199,7 +199,7 @@
}
}
std::ostringstream oss2;
- oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
+ oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
oss2 << "; journal now read-only.";
log(LOG_DEBUG, oss2.str());
@@ -219,7 +219,7 @@
JournalImpl::recover_complete()
{
jcntl::recover_complete();
- log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
+ log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
}
#define MAX_AIO_SLEEPS 1000 // 10 sec
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-08-12 15:27:18 UTC (rev 2279)
@@ -86,7 +86,7 @@
try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
- if (isTPC()) sync();
+ sync();
} else {
jc->txn_abort(dtokp.get(), getXid());
}
@@ -106,6 +106,8 @@
}
}
+ TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
/**
* Call to make sure all the data for this txn is written to safe store
*
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -130,16 +130,13 @@
}
bool
-enq_map::is_enqueued(const u_int64_t rid)
+enq_map::is_enqueued(const u_int64_t rid, bool ignore_lock)
{
- emap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(rid);
- }
+ slock s(&_mutex);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
return false;
- if (itr->second.second) // locked
+ if (!ignore_lock && itr->second.second) // locked
return false;
return true;
}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -95,7 +95,7 @@
void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked);
u_int16_t get_fid(const u_int64_t rid);
u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false);
- bool is_enqueued(const u_int64_t rid);
+ bool is_enqueued(const u_int64_t rid, bool ignore_lock = false);
void lock(const u_int64_t rid);
void unlock(const u_int64_t rid);
bool is_locked(const u_int64_t rid);
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -557,8 +557,15 @@
* false if the rid is transactionally enqueued and is not committed, or if it is
* locked (i.e. transactionally dequeued, but the dequeue has not been committed).
*/
- inline bool is_enqueued(const u_int64_t rid) { return _emap.is_enqueued(rid); }
-
+ inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false)
+ { return _emap.is_enqueued(rid, ignore_lock); }
+ inline bool is_locked(const u_int64_t rid)
+ { if (_emap.is_enqueued(rid, true)) return _emap.is_locked(rid); return false; }
+ inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); }
+ inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+ // TODO Make this a const, but txn_map must support const first.
+ inline txn_map& get_txn_map() { return _tmap; }
+
/**
* \brief Check if the journal is stopped.
*
@@ -607,8 +614,6 @@
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
- // TODO Make this a const, but txn_map must support const first.
- inline txn_map& get_txn_map() { return _tmap; }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -94,6 +94,7 @@
const u_int32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802;
const u_int32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803;
const u_int32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804;
+const u_int32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
// class rmgr
const u_int32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
@@ -187,7 +188,9 @@
"Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART).";
_err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: "
"Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART).";
+ _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
+
// class rmgr
_err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
_err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: "
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -112,6 +112,7 @@
static const u_int32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state.
static const u_int32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl.
static const u_int32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl.
+ static const u_int32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued
// class rmgr
static const u_int32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -39,6 +39,8 @@
#include "jrnl/jerrno.hpp"
#include <sstream>
+//#include <iostream> // debug
+
namespace rhm
{
namespace journal
@@ -575,6 +577,7 @@
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
_commit_busy = true;
}
+//std::cout << " * commit, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
bool done = false;
while (!done)
{
@@ -605,9 +608,12 @@
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
+//{ std::cout << " * commit enq - add enq rid=0x" << std::hex << itr->_rid << std::dec << std::endl << std::flush;
_emap.insert_fid(itr->_rid, itr->_fid);
+//}
else // txn dequeue
{
+//std::cout << " * commit deq - remove enq rid=0x" << std::hex << itr->_drid << std::dec << std::endl << std::flush;
u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
_wrfc.decr_enqcnt(fid);
}
@@ -834,6 +840,7 @@
#endif
break;
case data_tok::COMMIT_SUBM:
+//std::cout << " * commit-ret, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::COMMITTED);
@@ -1018,13 +1025,34 @@
wmgr::dequeue_check(const std::string& xid, const u_int64_t drid)
{
// First check emap
- try { _emap.get_fid(drid); }
+ bool found = false;
+ try
+ {
+ _emap.get_fid(drid);
+ found = true;
+ }
catch(const jexception& e)
{
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw;
- _tmap.get_data(xid, drid); // not in emap, try tmap
+ if (xid.size())
+ try
+ {
+ _tmap.get_data(xid, drid); // not in emap, try tmap
+ found = true;
+ }
+ catch (const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw;
+ }
}
+ if (!found)
+ {
+ std::ostringstream oss;
+ oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
+ throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
+ }
}
void
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -408,7 +408,7 @@
u_int64_t rid = enq_msg(jc, 0, create_msg(msg, 0, MSG_SIZE), false);
deq_msg(jc, rid);
try{ deq_msg(jc, rid); BOOST_ERROR("Did not throw exception on second dequeue."); }
- catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_MAP_NOTFOUND); }
+ catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_WMGR_DEQRIDNOTENQ); }
rid = enq_msg(jc, 1, create_msg(msg, 1, MSG_SIZE), false);
deq_msg(jc, rid);
}
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp 2008-08-12 15:27:18 UTC (rev 2279)
@@ -87,9 +87,9 @@
try
{
deq_msg(jc, m);
- BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+ BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
}
- catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+ catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
}
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
@@ -138,9 +138,9 @@
try
{
deq_msg(jc, 3*m);
- BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+ BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
}
- catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+ catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
}
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-12 15:27:18 UTC (rev 2279)
@@ -445,6 +445,7 @@
self.first_rec = False
self.last_file = False
self.last_rid = -1
+ self.fhdr_owi_at_msg_start = None
self.proc_args(argv)
self.proc_csv()
@@ -474,6 +475,7 @@
stop = True;
else:
self.rec_cnt += 1
+ self.fhdr_owi_at_msg_start = self.fhdr.owi()
if self.first_rec:
if self.fhdr.fro != hdr.foffs:
raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
@@ -656,7 +658,7 @@
return self.file_num
def check_owi(self, hdr):
- return self.fhdr.owi() == hdr.owi()
+ return self.fhdr_owi_at_msg_start == hdr.owi()
def check_rid(self, hdr):
if self.last_rid != -1 and hdr.rid <= self.last_rid:
Modified: store/branches/mrg-1.0/cpp/tests/persistence.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/persistence.py 2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/persistence.py 2008-08-12 15:27:18 UTC (rev 2279)
@@ -231,7 +231,7 @@
if txc.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
if txd.global_id not in ids:
- self.fail("Recovered xids not as expected. missing: %s" % (txc))
+ self.fail("Recovered xids not as expected. missing: %s" % (txd))
self.assertEqual(2, len(xids))
More information about the rhmessaging-commits
mailing list