[rhmessaging-commits] rhmessaging commits: r2497 - in store/trunk/cpp/lib: jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Sep 18 13:32:03 EDT 2008
Author: kpvdr
Date: 2008-09-18 13:32:03 -0400 (Thu, 18 Sep 2008)
New Revision: 2497
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/lib/jrnl/rmgr.cpp
Log:
Backport to trunk of r.2496: Fix for BZ458053: "txtest failures when broker killed during transfer phase". Fixed problem with certain out-of-order libaio read returns in the read pipeline which causes this bug.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-09-18 17:32:03 UTC (rev 2497)
@@ -95,7 +95,7 @@
agent->addObject(_mgmtObject);
}
- log(LOG_NOTICE, "Instantiation");
+ log(LOG_NOTICE, "Created");
std::ostringstream oss;
oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\"";
log(LOG_DEBUG, oss.str());
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-09-18 17:32:03 UTC (rev 2497)
@@ -635,11 +635,14 @@
try
{
+ long rcnt = 0L; // recovered msg count
+ long idcnt = 0L; // in-doubt msg count
u_int64_t thisHighestRid = 0;
jQueue->recover(numJrnlFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
+ recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
+ QPID_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -742,7 +745,9 @@
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& prepared,
- message_index& messages)
+ message_index& messages,
+ long& rcnt,
+ long& idcnt)
{
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
@@ -798,6 +803,7 @@
PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid());
if (i == prepared.end()) { // not in prepared list
+ rcnt++;
queue->recover(msg);
} else {
u_int64_t rid = dtok.rid();
@@ -805,13 +811,14 @@
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ // deq present in prepared list: this xid is part of incomplete txn commit/abort
// or this is a 1PC txn that must be rolled forward
if (citr->second.deq_flag || !citr->second.tpc_flag) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
if (!citr->second.commit_flag) {
+ rcnt++;
queue->recover(msg); // recover message in abort case only
}
} else {
@@ -825,10 +832,12 @@
else if (!j->_enq_flag && j->_drid == rid) deq = true;
}
if (enq && !deq && citr->second.commit_flag) {
+ rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
}
} else {
+ idcnt++;
messages[rid] = msg;
}
}
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2008-09-18 17:32:03 UTC (rev 2497)
@@ -140,7 +140,9 @@
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& locked,
- message_index& prepared);
+ message_index& prepared,
+ long& rcnt,
+ long& idcnt);
qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
uint64_t mId,
unsigned& headerSize);
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-09-18 17:32:03 UTC (rev 2497)
@@ -135,7 +135,7 @@
aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
if (_jc->unflushed_dblks() > 0)
_jc->flush();
- else
+ else if (!_aio_evt_rem)
return RHM_IORES_EMPTY;
}
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
@@ -370,7 +370,7 @@
aio_cycle(); // check if any AIOs have returned
if (_jc->unflushed_dblks() > 0)
_jc->flush();
- else
+ else if (!_aio_evt_rem)
return RHM_IORES_EMPTY;
}
More information about the rhmessaging-commits
mailing list