Author: kpvdr
Date: 2008-09-18 13:31:21 -0400 (Thu, 18 Sep 2008)
New Revision: 2496
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
Log:
Fix for BZ458053: "txtest failures when broker killed during transfer phase".
Fixed problem with certain out-of-order libaio read returns in the read pipeline which
causes this bug.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-09-18 14:49:51 UTC (rev 2495)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-09-18 17:31:21 UTC (rev 2496)
@@ -615,11 +615,14 @@
try
{
+ long rcnt = 0L; // recovered msg count
+ long idcnt = 0L; // in-doubt msg count
u_int64_t thisHighestRid = 0;
jQueue->recover(numJrnlFiles, jrnlFsizeSblks, wCacheNumPages,
wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
+ recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
+ QPID_LOG(info, "Recovered queue \"" << queueName
<< "\": " << rcnt << " messages recovered; "
<< idcnt << " messages in-doubt.");
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ":
recoverQueues() failed: " + e.what());
@@ -722,7 +725,9 @@
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr&
queue,
txn_list& prepared,
- message_index& messages)
+ message_index& messages,
+ long& rcnt,
+ long& idcnt)
{
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
@@ -778,6 +783,7 @@
PreparedTransaction::list::iterator i =
PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(),
dtokp.rid());
if (i == prepared.end()) { // not in prepared list
+ rcnt++;
queue->recover(msg);
} else {
u_int64_t rid = dtokp.rid();
@@ -785,13 +791,14 @@
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not
found in tplRecoverMap");
- // deq present in prepared list, this xid is part of incomplete txn
commit/abort
+ // deq present in prepared list: this xid is part of incomplete txn
commit/abort
// or this is a 1PC txn that must be rolled forward
if (citr->second.deq_flag || !citr->second.tpc_flag) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked
by a txn dequeue
if (!citr->second.commit_flag) {
+ rcnt++;
queue->recover(msg); // recover message in abort case
only
}
} else {
@@ -805,10 +812,12 @@
else if (!j->_enq_flag && j->_drid == rid)
deq = true;
}
if (enq && !deq &&
citr->second.commit_flag) {
+ rcnt++;
queue->recover(msg); // recover txn message in commit
case only
}
}
} else {
+ idcnt++;
messages[rid] = msg;
}
}
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-09-18 14:49:51 UTC (rev 2495)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-09-18 17:31:21 UTC (rev 2496)
@@ -140,7 +140,9 @@
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& locked,
- message_index& prepared);
+ message_index& prepared,
+ long& rcnt,
+ long& idcnt);
qpid::broker::RecoverableMessage::shared_ptr
getExternMessage(qpid::broker::RecoveryManager& recovery,
uint64_t mId,
unsigned&
headerSize);
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-09-18 14:49:51 UTC (rev 2495)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-09-18 17:31:21 UTC (rev 2496)
@@ -88,7 +88,7 @@
agent->addObject(_mgmtObject);
}
- log(LOG_NOTICE, "Instantiation");
+ log(LOG_NOTICE, "Created");
std::ostringstream oss;
oss << "Journal directory = \"" << journalDirectory
<< "\"; Base file name = \"" << journalBaseFilename
<< "\"";
log(LOG_DEBUG, oss.str());
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp 2008-09-18 14:49:51 UTC (rev 2495)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp 2008-09-18 17:31:21 UTC (rev 2496)
@@ -135,7 +135,7 @@
aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
if (_jc->unflushed_dblks() > 0)
_jc->flush();
- else
+ else if (!_aio_evt_rem)
return RHM_IORES_EMPTY;
}
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
@@ -373,7 +373,7 @@
aio_cycle(); // check if any AIOs have returned
if (_jc->unflushed_dblks() > 0)
_jc->flush();
- else
+ else if (!_aio_evt_rem)
return RHM_IORES_EMPTY;
}