[rhmessaging-commits] rhmessaging commits: r2497 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Sep 18 13:32:03 EDT 2008


Author: kpvdr
Date: 2008-09-18 13:32:03 -0400 (Thu, 18 Sep 2008)
New Revision: 2497

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
   store/trunk/cpp/lib/jrnl/rmgr.cpp
Log:
Backport to trunk of r.2496: Fix for BZ458053: "txtest failures when broker killed during transfer phase". Fixed problem with certain out-of-order libaio read returns in the read pipeline which causes this bug.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-09-18 17:32:03 UTC (rev 2497)
@@ -95,7 +95,7 @@
         agent->addObject(_mgmtObject);
     }
 
-    log(LOG_NOTICE, "Instantiation");
+    log(LOG_NOTICE, "Created");
     std::ostringstream oss;
     oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\"";
     log(LOG_DEBUG, oss.str());

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2008-09-18 17:32:03 UTC (rev 2497)
@@ -635,11 +635,14 @@
 
         try
         {
+            long rcnt = 0L;     // recovered msg count
+            long idcnt = 0L;    // in-doubt msg count
             u_int64_t thisHighestRid = 0;
             jQueue->recover(numJrnlFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
             if (thisHighestRid > highestRid)
                 highestRid = thisHighestRid;
-            recoverMessages(txn, registry, queue, prepared, messages);
+            recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
+            QPID_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
             jQueue->recover_complete(); // start journal.
         } catch (const journal::jexception& e) {
             THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -742,7 +745,9 @@
                                       qpid::broker::RecoveryManager& recovery,
                                       qpid::broker::RecoverableQueue::shared_ptr& queue,
                                       txn_list& prepared,
-                                      message_index& messages)
+                                      message_index& messages,
+                                      long& rcnt,
+                                      long& idcnt)
 {
     size_t preambleLength = sizeof(u_int32_t)/*header size*/;
 
@@ -798,6 +803,7 @@
 
                 PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid());
                 if (i == prepared.end()) { // not in prepared list
+                    rcnt++;
                     queue->recover(msg);
                 } else {
                     u_int64_t rid = dtok.rid();
@@ -805,13 +811,14 @@
                     TplRecoverMapCitr citr = tplRecoverMap.find(xid);
                     if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
 
-                    // deq present in prepared list, this xid is part of incomplete txn commit/abort
+                    // deq present in prepared list: this xid is part of incomplete txn commit/abort
                     // or this is a 1PC txn that must be rolled forward
                     if (citr->second.deq_flag || !citr->second.tpc_flag) {
                         if (jc->is_enqueued(rid, true)) {
                             // Enqueue is non-tx, dequeue tx
                             assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
                             if (!citr->second.commit_flag) {
+                                rcnt++;
                                 queue->recover(msg); // recover message in abort case only
                             }
                         } else {
@@ -825,10 +832,12 @@
                                 else if (!j->_enq_flag && j->_drid == rid) deq = true;
                             }
                             if (enq && !deq && citr->second.commit_flag) {
+                                rcnt++;
                                 queue->recover(msg); // recover txn message in commit case only
                             }
                         }
                     } else {
+                        idcnt++;
                         messages[rid] = msg;
                     }
                 }

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2008-09-18 17:32:03 UTC (rev 2497)
@@ -140,7 +140,9 @@
                          qpid::broker::RecoveryManager& recovery,
                          qpid::broker::RecoverableQueue::shared_ptr& queue,
                          txn_list& locked,
-                         message_index& prepared);
+                         message_index& prepared, 
+                         long& rcnt,
+                         long& idcnt);
     qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
                                                                   uint64_t mId,
                                                                   unsigned& headerSize);

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-09-18 17:31:21 UTC (rev 2496)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-09-18 17:32:03 UTC (rev 2497)
@@ -135,7 +135,7 @@
             aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
             if (_jc->unflushed_dblks() > 0)
                 _jc->flush();
-            else
+            else if (!_aio_evt_rem)
                 return RHM_IORES_EMPTY;
         }
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
@@ -370,7 +370,7 @@
         aio_cycle();   // check if any AIOs have returned
         if (_jc->unflushed_dblks() > 0)
             _jc->flush();
-        else
+        else if (!_aio_evt_rem)
             return RHM_IORES_EMPTY;
     }
 




More information about the rhmessaging-commits mailing list