[rhmessaging-commits] rhmessaging commits: r2298 - in store/trunk/cpp: lib/jrnl and 3 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Aug 13 16:27:31 EDT 2008
Author: kpvdr
Date: 2008-08-13 16:27:30 -0400 (Wed, 13 Aug 2008)
New Revision: 2298
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/PreparedTransaction.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/jrnl/_st_basic.cpp
store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp
store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
store/trunk/cpp/tests/persistence.py
Log:
Backport from store-1.0 branch: Fix for BZ458053: r.2279 "txtest failures when broker killed during transfer phase". Modified message recovery to correctly predict outcome of to-be-rolled-forward/back transactions. Access to jcntl::_emap was required for this, so some accessers were added to class jcntl. Includes fix to python file check program jfile_chk.py which incorrectly detected owi on last-to-first file transition and message content overflowed. r.2281 (minor), r.2297 Additional fixes for BZ458053 "txtest failures when broker killed during transfer phase".
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -52,10 +52,12 @@
BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
- const bool _commit_flag) :
+ const bool _commit_flag,
+ const bool _tpc_flag) :
rid(_rid),
deq_flag(_deq_flag),
- commit_flag(_commit_flag)
+ commit_flag(_commit_flag),
+ tpc_flag(_tpc_flag)
{}
BdbMessageStore::BdbMessageStore(const char* envpath) :
@@ -361,7 +363,7 @@
// TODO: Is this mutex necessary?
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
- string("JournalData"), defJournalGetEventsTimeout,
+ std::string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout);
}
@@ -535,49 +537,69 @@
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
- TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
- std::auto_ptr<TPCTransactionContext> txn(tpcc);
-
- tpcc->prepare(tplStorePtr.get());
if (mgmtObject != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
- }
+ }
+ std::string xid = i->xid;
+
// Restore data token state in TxnCtxt
- TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- tpcc->recoverDtok(citr->second.rid, i->xid);
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
- bool incomplTxnFlag = citr->second.deq_flag;
+ bool incomplTplTxnFlag = citr->second.deq_flag;
- RecoverableTransaction::shared_ptr dtx;
- if (!incomplTxnFlag) dtx = registry.recoverTransaction(i->xid, txn);
- if (i->enqueues.get()) {
- for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+ if (citr->second.tpc_flag) {
+ // Dtx (2PC) transaction
+ TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
+ std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ tpcc->recoverDtok(citr->second.rid, xid);
+ tpcc->prepare(tplStorePtr.get());
+
+ RecoverableTransaction::shared_ptr dtx;
+ if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
+ if (i->enqueues.get()) {
+ for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+ }
}
- }
- if (i->dequeues.get()) {
- for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
- tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+ if (i->dequeues.get()) {
+ for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+ }
}
- }
- if (incomplTxnFlag) {
- tpcc->complete(citr->second.commit_flag);
+ if (incomplTplTxnFlag) {
+ tpcc->complete(citr->second.commit_flag);
+ }
} else {
- if (mgmtObject != 0) {
- mgmtObject->inc_tplTransactionDepth();
- mgmtObject->inc_tplTxnPrepares();
- }
+ // Local (1PC) transaction
+ boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
+ opcc->recoverDtok(citr->second.rid, xid);
+ opcc->prepare(tplStorePtr.get());
+
+ if (i->enqueues.get()) {
+ for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ }
+ }
+ if (i->dequeues.get()) {
+ for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+ }
+ }
+ if (incomplTplTxnFlag) {
+ opcc->complete(citr->second.commit_flag);
+ } else {
+ completed(*opcc.get(), citr->second.commit_flag);
+ }
}
}
-
registry.recoveryComplete();
}
@@ -606,7 +628,7 @@
JournalImpl* jQueue = 0;
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
@@ -675,8 +697,8 @@
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
- string queueName;
- string routingkey;
+ std::string queueName;
+ std::string routingkey;
FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
@@ -774,15 +796,39 @@
}
PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
- if (i == prepared.end()) { // not locked
+ if (i == prepared.end()) { // not in prepared list
queue->recover(msg);
} else {
- TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ u_int64_t rid = dtokp.rid();
+ std::string xid(i->xid);
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
- if (citr->second.commit_flag) queue->recover(msg); // treat as non-tx, roll forward (else aborted, throw away)
+
+ // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ // or this is a 1PC txn that must be rolled forward
+ if (citr->second.deq_flag || !citr->second.tpc_flag) {
+ if (jc->is_enqueued(rid, true)) {
+ // Enqueue is non-tx, dequeue tx
+ assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
+ if (!citr->second.commit_flag) {
+ queue->recover(msg); // recover message in abort case only
+ }
+ } else {
+ // Enqueue and/or dequeue tx
+ journal::txn_map& tmap = jc->get_txn_map();
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ bool enq = false;
+ bool deq = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag && j->_rid == rid) enq = true;
+ else if (!j->_enq_flag && j->_drid == rid) deq = true;
+ }
+ if (enq && !deq && citr->second.commit_flag) {
+ queue->recover(msg); // recover txn message in commit case only
+ }
+ }
} else {
- messages[dtokp.rid()] = msg;
+ messages[rid] = msg;
}
}
@@ -845,7 +891,7 @@
txn.commit();
} catch (const DbException& e) {
txn.abort();
- THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + string(e.what()));
+ THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + std::string(e.what()));
} catch (...) {
txn.abort();
throw;
@@ -884,54 +930,85 @@
return count;
}
-void BdbMessageStore::recoverTplStore()
+void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
{
if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- tplStorePtr->recover_complete(); // start journal.
- }
-}
-void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
-{
- if (tplStorePtr.get()) {
- if (!tplStorePtr->is_ready())
- recoverTplStore();
+ DataTokenImpl dtokp;
+ void* dbuff = NULL; size_t dbuffSize = 0;
+ void* xidbuff = NULL; size_t xidbuffSize = 0;
+ bool transientFlag = false;
+ bool externalFlag = false;
+ bool done = false;
+ try {
+ unsigned aio_sleep_cnt = 0;
+ while (!done) {
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+ switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+ case rhm::journal::RHM_IORES_SUCCESS: {
+ // Every TPL record contains both data and an XID
+ assert(dbuffSize>0);
+ assert(xidbuffSize>0);
+ std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+ bool is2PC = *(static_cast<char*>(dbuff)) != 0;
- // TODO: The journal will return a const txn_map and the txn_map will support
- // const operations at some point. Using non-const txn_map this way is ugly...
- journal::txn_map& tmap = tplStorePtr->get_txn_map();
- std::vector<std::string> xidList;
- tmap.xid_list(xidList);
- for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
- journal::txn_data_list txnList = tmap.get_tdata_list(*i);
- unsigned enqCnt = 0;
- unsigned deqCnt = 0;
- u_int64_t rid = 0;
- bool commitFlag = false;
- for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->_enq_flag) {
- rid = j->_rid;
- enqCnt++;
- } else {
- commitFlag = j->_commit_flag;
- deqCnt++;
- }
+ // Check transaction details; add to recover map
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ unsigned enqCnt = 0;
+ unsigned deqCnt = 0;
+ u_int64_t rid = 0;
+
+ // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
+ // Note: will apply to both 1PC and 2PC transactions.
+ bool commitFlag = true;
+
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag) {
+ rid = j->_rid;
+ enqCnt++;
+ } else {
+ commitFlag = j->_commit_flag;
+ deqCnt++;
+ }
+ }
+ assert(enqCnt == 1);
+ assert(deqCnt <= 1);
+ tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
+
+ ::free(xidbuff);
+ aio_sleep_cnt = 0;
+ break;
+ }
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ done = true;
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+ default:
+ assert("Store Error: Unexpected msg state");
+ } // switch
}
- assert(enqCnt == 1);
- assert(deqCnt <= 1);
- tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
+
+ tplStorePtr->recover_complete(); // start journal.
}
}
void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
if (!tplStorePtr->is_ready())
- getTplRecoverMap(tplRecoverMap);
+ recoverTplStore(tplRecoverMap);
// Abort unprepaired xids and populate the locked maps
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -943,12 +1020,13 @@
}
}
-void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
+void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
{
if (!tplStorePtr->is_ready())
- getTplRecoverMap(tplRecoverMap);
+ recoverTplStore(tplRecoverMap);
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
- if (!i->second.deq_flag) // Discard all txns that are to be rolled forward/back
+ // Discard all txns that are to be rolled forward/back and 1PC transactions
+ if (!i->second.deq_flag && i->second.tpc_flag)
xids.insert(i->first);
}
}
@@ -1021,7 +1099,7 @@
try {
int status = db.get(txn, &key, &peek, 0);
if (status != DB_BUFFER_SMALL) {
- THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + string(DbEnv::strerror(status)));
+ THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + std::string(DbEnv::strerror(status)));
}
} catch (const DbMemoryException& expected) {
//api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
@@ -1257,7 +1335,7 @@
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
- string tid;
+ std::string tid;
if (ctxt) {
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
@@ -1369,11 +1447,16 @@
try {
chkTplStoreInit(); // Late initialize (if needed)
+ // This sync is requred to ensure multi-queue atomicity - ie all txn data
+ // must hit the disk on *all* queues before the TPL prepare (enq) is written.
+ ctxt->sync();
+
ctxt->incrDtokRef();
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ char tpcFlag = static_cast<char>(ctxt->isTPC());
+ tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false);
ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
@@ -1447,7 +1530,7 @@
} else if (status == DB_NOTFOUND) {
return false;
} else {
- THROW_STORE_EXCEPTION("Deletion failed: " + string(DbEnv::strerror(status)));
+ THROW_STORE_EXCEPTION("Deletion failed: " + std::string(DbEnv::strerror(status)));
}
}
@@ -1527,33 +1610,33 @@
}
}
-string BdbMessageStore::getJrnlBaseDir()
+std::string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
-string BdbMessageStore::getBdbBaseDir()
+std::string BdbMessageStore::getBdbBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/dat/" ;
return dir.str();
}
-string BdbMessageStore::getTplBaseDir()
+std::string BdbMessageStore::getTplBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/tpl/" ;
return dir.str();
}
-string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
{
return getJrnlDir(queue.getName().c_str());
}
-string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
{
std::stringstream dir;
dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-08-13 20:27:30 UTC (rev 2298)
@@ -63,10 +63,11 @@
// Structs for Transaction Recover List (TPL) recover state
struct TplRecoverStruct {
- u_int64_t rid;
+ u_int64_t rid; // rid of TPL record
bool deq_flag;
bool commit_flag;
- TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ bool tpc_flag;
+ TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
};
typedef TplRecoverStruct TplRecover;
typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
@@ -157,8 +158,7 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverTplStore();
- void getTplRecoverMap(TplRecoverMap& tplMap);
+ void recoverTplStore(TplRecoverMap& tplMap);
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -214,7 +214,7 @@
}
}
std::ostringstream oss2;
- oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
+ oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
oss2 << "; journal now read-only.";
log(LOG_DEBUG, oss2.str());
@@ -232,7 +232,7 @@
JournalImpl::recover_complete()
{
jcntl::recover_complete();
- log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
+ log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
}
#define MAX_AIO_SLEEPS 1000 // 10 sec
Modified: store/trunk/cpp/lib/PreparedTransaction.h
===================================================================
--- store/trunk/cpp/lib/PreparedTransaction.h 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/PreparedTransaction.h 2008-08-13 20:27:30 UTC (rev 2298)
@@ -47,6 +47,7 @@
void add(queue_id queue, message_id message);
bool isLocked(queue_id queue, message_id message);
+ std::size_t size() { return locked.size(); }
iterator begin() { return locked.begin(); }
iterator end() { return locked.end(); }
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-08-13 20:27:30 UTC (rev 2298)
@@ -86,7 +86,7 @@
try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
- if (isTPC()) sync();
+ sync();
} else {
jc->txn_abort(dtokp.get(), getXid());
}
@@ -106,6 +106,8 @@
}
}
+ TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
/**
* Call to make sure all the data for this txn is written to safe store
*
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -130,16 +130,13 @@
}
bool
-enq_map::is_enqueued(const u_int64_t rid)
+enq_map::is_enqueued(const u_int64_t rid, bool ignore_lock)
{
- emap_itr itr;
- {
- slock s(&_mutex);
- itr = _map.find(rid);
- }
+ slock s(&_mutex);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
return false;
- if (itr->second.second) // locked
+ if (!ignore_lock && itr->second.second) // locked
return false;
return true;
}
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -95,7 +95,7 @@
void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked);
u_int16_t get_fid(const u_int64_t rid);
u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false);
- bool is_enqueued(const u_int64_t rid);
+ bool is_enqueued(const u_int64_t rid, bool ignore_lock = false);
void lock(const u_int64_t rid);
void unlock(const u_int64_t rid);
bool is_locked(const u_int64_t rid);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -634,8 +634,20 @@
for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
{
std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
- if (pitr == prep_txn_list_ptr->end())
- _tmap.get_remove_tdata_list(*itr);
+ if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
+ {
+ txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
+ // Unlock any affected enqueues in emap
+ for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
+ {
+ if (i->_enq_flag) // enq op - decrement enqueue count
+ rd._enq_cnt_list[i->_fid]--;
+ else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
+ _emap.unlock(i->_drid);
+ }
+ // Write abort record to disk
+ rcvr_write_abort(rd, *itr);
+ }
}
}
}
@@ -951,7 +963,7 @@
ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
assert(!ofsp.fail());
std::ostringstream oss;
- oss << "Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+ oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
this->log(LOG_NOTICE, oss.str());
file_pos = ofsp.tellp();
}
@@ -961,5 +973,99 @@
}
}
+// TODO - FIXME - Unify the recover and normal aio write methods.
+// Normally, writes are not performed during recover mode (journal is in read-only
+// mode) so initialization of the aio write controllers is deferred until recover
+// is complete. Currenlty because journal is still in recover mode when
+// rcvr_write_abort() is called, normal writes are not possible, so std::ofstream
+// writes are used instead. Lots of logic duplication!
+void
+jcntl::rcvr_write_abort(rcvdat& rd, std::string xid)
+{
+ const u_int32_t sblk_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+ const u_int32_t amagic = RHM_JDAT_TXA_MAGIC;
+ const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+
+ // Check last record ends on sblk boundary
+ assert(rd._eo % sblk_size == 0);
+
+ // Find fid and posn to write
+ u_int16_t fid = rd._lfid;
+ std::streampos file_pos = rd._eo;
+ if (rd._eo/sblk_size >= _jfsize_sblks) // file full, use next file
+ {
+ if (++fid >= _num_jfiles)
+ {
+ fid = 0;
+ rd._owi = !rd._owi;
+ }
+ file_pos = 0;
+ }
+
+ // Prepare a buffer of at least 1 sblock
+ u_int32_t abort_dblks = txn_rec::size_dblks(sizeof(txn_hdr) + xid.size());
+ std::size_t buffsize = abort_dblks < JRNL_SBLK_SIZE ? JRNL_SBLK_SIZE : abort_dblks;
+ void* buff = std::malloc(buffsize * JRNL_DBLK_SIZE);
+ assert(buff != 0);
+
+ // Initialize file stream
+ std::ostringstream fn;
+ fn << _jdir.dirname() << "/" << _base_filename << ".";
+ fn << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+ std::ofstream ofs(fn.str().c_str(),
+ std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ if (!ofs.good())
+ throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl", "rcvr_write_abort");
+ if (file_pos)
+ ofs.seekp(file_pos);
+ else
+ {
+ // New file, write new file header
+ file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, ++rd._h_rid, fid, sblk_size, rd._owi, true);
+ ofs.write((const char*)&fh, sizeof(file_hdr));
+ assert(!ofs.fail());
+ // fill remainder of sblk with fill char
+ std::memset((char*)buff, RHM_CLEAN_CHAR, sblk_size - sizeof(file_hdr));
+ ofs.write((const char*)buff, sblk_size - sizeof(file_hdr));
+ assert(!ofs.fail());
+ // log write action
+ std::ostringstream oss;
+ oss << "Recover phase write: File header in fid=" << fid << " at offs=0x0 for txn abort record";
+ this->log(LOG_NOTICE, oss.str());
+ file_pos = ofs.tellp();
+ }
+
+ // Write abort record
+ txn_rec ar(amagic, ++rd._h_rid, xid.data(), xid.size(), rd._owi);
+ u_int32_t res = ar.encode(buff, 0, abort_dblks);
+ assert(res == abort_dblks);
+ ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+ assert(!ofs.fail());
+ // log write action
+ std::ostringstream oss;
+ oss << "Recover phase write: Aborted unprepared transaction xid=" << xid << " at offs=0x" << std::hex << file_pos << std::dec;
+ this->log(LOG_NOTICE, oss.str());
+ file_pos = ofs.tellp();
+
+ // Prepare filler record
+ std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+ std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+ // Write filler as many times as needed to get to next sblk boundary
+ while (file_pos % sblk_size)
+ {
+ ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+ assert(!ofs.fail());
+ std::ostringstream oss;
+ oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+ this->log(LOG_NOTICE, oss.str());
+ file_pos = ofs.tellp();
+ }
+ rd._eo = file_pos;
+
+ // Clean up
+ ofs.close();
+ std::free(buff);
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -557,7 +557,15 @@
* false if the rid is transactionally enqueued and is not committed, or if it is
* locked (i.e. transactionally dequeued, but the dequeue has not been committed).
*/
- inline bool is_enqueued(const u_int64_t rid) { return _emap.is_enqueued(rid); }
+ inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false)
+ { return _emap.is_enqueued(rid, ignore_lock); }
+ inline bool is_locked(const u_int64_t rid)
+ { if (_emap.is_enqueued(rid, true)) return _emap.is_locked(rid); return false; }
+ inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); }
+ inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+ inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
+ // TODO Make this a const, but txn_map must support const first.
+ inline txn_map& get_txn_map() { return _tmap; }
/**
* \brief Check if the journal is stopped.
@@ -606,10 +614,6 @@
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
- inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
- // TODO Make this a const, but txn_map must support const first.
- inline txn_map& get_txn_map() { return _tmap; }
-
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
virtual void log(log_level level, const char* const log_stmt) const;
@@ -668,6 +672,8 @@
std::streampos& read_pos);
void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
+
+ void rcvr_write_abort(rcvdat& rd, std::string xid);
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -94,6 +94,7 @@
const u_int32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802;
const u_int32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803;
const u_int32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804;
+const u_int32_t jerrno::JERR_WMGR_DEQRIDNOTENQ = 0x0805;
// class rmgr
const u_int32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
@@ -187,6 +188,7 @@
"Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART).";
_err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: "
"Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART).";
+ _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
// class rmgr
_err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -112,6 +112,7 @@
static const u_int32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal state.
static const u_int32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when previous part compl.
static const u_int32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when previous part compl.
+ static const u_int32_t JERR_WMGR_DEQRIDNOTENQ; ///< Deq. rid not enqueued
// class rmgr
static const u_int32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -1000,13 +1000,34 @@
wmgr::dequeue_check(const std::string& xid, const u_int64_t drid)
{
// First check emap
- try { _emap.get_fid(drid); }
+ bool found = false;
+ try
+ {
+ _emap.get_fid(drid);
+ found = true;
+ }
catch(const jexception& e)
{
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw;
- _tmap.get_data(xid, drid); // not in emap, try tmap
+ if (xid.size())
+ try
+ {
+ _tmap.get_data(xid, drid); // not in emap, try tmap
+ found = true;
+ }
+ catch (const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw;
+ }
}
+ if (!found)
+ {
+ std::ostringstream oss;
+ oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
+ throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
+ }
}
void
Modified: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -408,7 +408,7 @@
u_int64_t rid = enq_msg(jc, 0, create_msg(msg, 0, MSG_SIZE), false);
deq_msg(jc, rid);
try{ deq_msg(jc, rid); BOOST_ERROR("Did not throw exception on second dequeue."); }
- catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_MAP_NOTFOUND); }
+ catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_WMGR_DEQRIDNOTENQ); }
rid = enq_msg(jc, 1, create_msg(msg, 1, MSG_SIZE), false);
deq_msg(jc, rid);
}
Modified: store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp 2008-08-13 20:27:30 UTC (rev 2298)
@@ -87,9 +87,9 @@
try
{
deq_msg(jc, m);
- BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+ BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
}
- catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+ catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
}
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
@@ -138,9 +138,9 @@
try
{
deq_msg(jc, 3*m);
- BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+ BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
}
- catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+ catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
}
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-13 20:27:30 UTC (rev 2298)
@@ -445,6 +445,7 @@
self.first_rec = False
self.last_file = False
self.last_rid = -1
+ self.fhdr_owi_at_msg_start = None
self.proc_args(argv)
self.proc_csv()
@@ -474,6 +475,7 @@
stop = True;
else:
self.rec_cnt += 1
+ self.fhdr_owi_at_msg_start = self.fhdr.owi()
if self.first_rec:
if self.fhdr.fro != hdr.foffs:
raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
@@ -577,7 +579,8 @@
for rec in self.tmap[hdr.xid]:
if isinstance(rec[1], DeqHdr):
if self.emap[rec[1].deq_rid] != None:
- self.emap[rec[1].deq_rid][2] = False # Unlock enq record
+ t = self.emap[rec[1].deq_rid]
+ self.emap[rec[1].deq_rid] = (t[0], t[1], False) # Unlock enq record
del self.tmap[hdr.xid]
if len(mismatched_rids) > 0:
warn = ' (WARNING: transactional dequeues not found in enqueue map; rids=%s)' % mismatched_rids
@@ -656,7 +659,7 @@
return self.file_num
def check_owi(self, hdr):
- return self.fhdr.owi() == hdr.owi()
+ return self.fhdr_owi_at_msg_start == hdr.owi()
def check_rid(self, hdr):
if self.last_rid != -1 and hdr.rid <= self.last_rid:
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/persistence.py 2008-08-13 20:27:30 UTC (rev 2298)
@@ -231,7 +231,7 @@
if txc.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
if txd.global_id not in ids:
- self.fail("Recovered xids not as expected. missing: %s" % (txc))
+ self.fail("Recovered xids not as expected. missing: %s" % (txd))
self.assertEqual(2, len(xids))
More information about the rhmessaging-commits
mailing list