[rhmessaging-commits] rhmessaging commits: r2306 - in store/branches/mrg-1.0/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Aug 14 14:36:13 EDT 2008


Author: kpvdr
Date: 2008-08-14 14:36:13 -0400 (Thu, 14 Aug 2008)
New Revision: 2306

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/lib/JournalImpl.h
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
Log:
Backport of trunk checkin r.2305: Fix for BZ458656 "List of xids from dtx.recover is not as expected". The call to BdbMessageStore::collectPreparedXids() was using the prepared list obtained at recover, but this call needs to provide a current list, no matter when called. Added ability to ignore transactions when reading, and created a subclass of JournalImpl (called TplJournalImpl) which implements the ignore as standard practice when calling read. This enables reads of pending transactions from this list. Now BdbMessageStore::collectPreparedXids() re-reads the TPL before returning the XID list.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-08-14 18:36:13 UTC (rev 2306)
@@ -243,7 +243,7 @@
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
-        tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
+        tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
         txn.commit();
     } catch (const journal::jexception& e) {
         txn.abort();
@@ -910,77 +910,84 @@
     return count;
 }
 
-void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
+void BdbMessageStore::readTplStore()
 {
+    tplRecoverMap.clear();
+    journal::txn_map& tmap = tplStorePtr->get_txn_map();
+    DataTokenImpl dtokp;
+    void* dbuff = NULL; size_t dbuffSize = 0;
+    void* xidbuff = NULL; size_t xidbuffSize = 0;
+    bool transientFlag = false;
+    bool externalFlag = false;
+    bool done = false;
+    try {
+        unsigned aio_sleep_cnt = 0;
+        while (!done) {
+            dtokp.reset();
+            dtokp.set_wstate(DataTokenImpl::ENQ);
+            switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+              case rhm::journal::RHM_IORES_SUCCESS: {
+                // Every TPL record contains both data and an XID
+                assert(dbuffSize>0);
+                assert(xidbuffSize>0);
+                std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+                bool is2PC = *(static_cast<char*>(dbuff)) != 0;
+
+                // Check transaction details; add to recover map
+                journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                unsigned enqCnt = 0;
+                unsigned deqCnt = 0;
+                u_int64_t rid = 0;
+
+                // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
+                // Note: will apply to both 1PC and 2PC transactions.
+                bool commitFlag = true;
+
+                for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+                    if (j->_enq_flag) {
+                        rid = j->_rid;
+                        enqCnt++;
+                    } else {
+                        commitFlag = j->_commit_flag;
+                        deqCnt++;
+                    }
+                }
+                assert(enqCnt == 1);
+                assert(deqCnt <= 1);
+                tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
+
+                ::free(xidbuff);
+                aio_sleep_cnt = 0;
+                break;
+                }
+              case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+                if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                    THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
+                ::usleep(AIO_SLEEP_TIME);
+                break;
+              case rhm::journal::RHM_IORES_EMPTY:
+                done = true;
+                break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+              default:
+                assert("Store Error: Unexpected msg state");
+            } // switch
+        }
+    } catch (const journal::jexception& e) {
+        THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
+    }
+}
+
+void BdbMessageStore::recoverTplStore()
+{
     if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
         u_int64_t thisHighestRid;
         tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
-        journal::txn_map& tmap = tplStorePtr->get_txn_map();
         if (thisHighestRid > highestRid)
             highestRid = thisHighestRid;
 
-        DataTokenImpl dtokp;
-        void* dbuff = NULL; size_t dbuffSize = 0;
-        void* xidbuff = NULL; size_t xidbuffSize = 0;
-        bool transientFlag = false;
-        bool externalFlag = false;
-        bool done = false;
-        try {
-            unsigned aio_sleep_cnt = 0;
-            while (!done) {
-                dtokp.reset();
-                dtokp.set_wstate(DataTokenImpl::ENQ);
-                switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
-                  case rhm::journal::RHM_IORES_SUCCESS: {
-                    // Every TPL record contains both data and an XID
-                    assert(dbuffSize>0);
-                    assert(xidbuffSize>0);
-                    std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
-                    bool is2PC = *(static_cast<char*>(dbuff)) != 0;
+        // Load tplRecoverMap by reading the TPL store
+        readTplStore();
 
-                    // Check transaction details; add to recover map
-                    journal::txn_data_list txnList = tmap.get_tdata_list(xid);
-                    unsigned enqCnt = 0;
-                    unsigned deqCnt = 0;
-                    u_int64_t rid = 0;
-
-                    // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
-                    // Note: will apply to both 1PC and 2PC transactions.
-                    bool commitFlag = true;
-
-                    for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                        if (j->_enq_flag) {
-                            rid = j->_rid;
-                            enqCnt++;
-                        } else {
-                            commitFlag = j->_commit_flag;
-                            deqCnt++;
-                        }
-                    }
-                    assert(enqCnt == 1);
-                    assert(deqCnt <= 1);
-                    tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
-
-                    ::free(xidbuff);
-                    aio_sleep_cnt = 0;
-                    break;
-                    }
-                  case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
-                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                        THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
-                    ::usleep(AIO_SLEEP_TIME);
-                    break;
-                  case rhm::journal::RHM_IORES_EMPTY:
-                    done = true;
-                    break; // done with all messages. (add call in jrnl to test that _emap is empty.)
-                  default:
-                    assert("Store Error: Unexpected msg state");
-                } // switch
-            }
-        } catch (const journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
-        }
-
         tplStorePtr->recover_complete(); // start journal.
     }
 }
@@ -988,7 +995,7 @@
 void BdbMessageStore::recoverLockedMappings(txn_list& txns)
 {
     if (!tplStorePtr->is_ready())
-        recoverTplStore(tplRecoverMap);
+        recoverTplStore();
 
     // Abort unprepaired xids and populate the locked maps
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -1002,8 +1009,12 @@
 
 void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
 {
-    if (!tplStorePtr->is_ready())
-        recoverTplStore(tplRecoverMap);
+    if (tplStorePtr->is_ready()) {
+        tplStorePtr->read_reset();
+        readTplStore();
+    } else {
+        recoverTplStore();
+    }
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
         // Discard all txns that are to be rolled forward/back and 1PC transactions
         if (!i->second.deq_flag && i->second.tpc_flag)

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-08-14 18:36:13 UTC (rev 2306)
@@ -93,7 +93,7 @@
     Db generalDb;
 
     // Pointer to Transaction Prepared List (TPL) journal instance
-    boost::shared_ptr<JournalImpl> tplStorePtr;
+    boost::shared_ptr<TplJournalImpl> tplStorePtr;
     TplRecoverMap tplRecoverMap;
 
     IdSequence queueIdSequence;
@@ -158,7 +158,8 @@
                        queue_index& index,
                        txn_list& locked,
                        message_index& prepared);
-    void recoverTplStore(TplRecoverMap& tplMap);
+    void readTplStore();
+    void recoverTplStore();
     void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
     void store(const qpid::broker::PersistableQueue* queue,

Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.h	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.h	2008-08-14 18:36:13 UTC (rev 2306)
@@ -207,6 +207,28 @@
             }
         }; // class JournalImpl
 
+        class TplJournalImpl : public JournalImpl
+        {
+        public:
+            TplJournalImpl(const std::string& journalId,
+                           const std::string& journalDirectory,
+                           const std::string& journalBaseFilename,
+                           const qpid::sys::Duration getEventsTimeout,
+                           const qpid::sys::Duration flushTimeout) :
+                           JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout)
+            {}
+
+            virtual ~TplJournalImpl() {}
+
+            // Special version of read_data_record that ignores transactions - needed when reading the TPL
+            inline journal::iores read_data_record(void** const datapp, std::size_t& dsize,
+                    void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+                    journal::data_tok* const dtokp) {
+                return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+            }
+            inline void read_reset() { _rmgr.invalidate(); }
+        };
+
     } // namespace bdbstore
 } // namespace rhm
 

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-08-14 18:36:13 UTC (rev 2306)
@@ -316,14 +316,14 @@
 #define RCINVALID_SLEEP_TIME_MS 5
 iores
 jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
-        bool& transient, bool& external, data_tok* const dtokp)
+        bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns)
 {
     check_rstatus("read_data");
     unsigned cnt = 0;
     iores res;
     do
     {
-        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
+        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
         if (res == RHM_IORES_RCINVALID)
         {
             get_wr_events(); // check for outstanding write events

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-14 18:36:13 UTC (rev 2306)
@@ -415,7 +415,8 @@
         * \exception TODO
         */
         iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
-                std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp);
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp,
+                bool ignore_pending_txns = false);
 
         /**
         * \brief Dequeues (marks as no longer needed) data record in journal.

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp	2008-08-14 18:36:13 UTC (rev 2306)
@@ -90,7 +90,7 @@
 
 iores
 rmgr::read(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
-        bool& transient, bool& external, data_tok* dtokp)
+        bool& transient, bool& external, data_tok* dtokp,  bool ignore_pending_txns)
 {
     iores res = pre_read_check(dtokp);
     if (res != RHM_IORES_SUCCESS)
@@ -164,8 +164,9 @@
                 }
                 catch (const jexception& e)
                 {
+                    bool enforce_txns = !_jc->is_read_only() && !ignore_pending_txns;
                     // Block read for transactionally locked record (only when not recovering)
-                    if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
+                    if (e.err_code() == jerrno::JERR_MAP_LOCKED && enforce_txns)
                         return RHM_IORES_TXPENDING;
 
                     // (Recover mode only) Ok, not in emap - now search tmap, if present then read
@@ -184,7 +185,7 @@
                                 is_enq = ditr->_drid == _hdr._rid;
                         }
                     }
-                    if (!_jc->is_read_only() && is_enq)
+                    if (enforce_txns && is_enq)
                         return RHM_IORES_TXPENDING;
                 }
 #endif

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.hpp	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.hpp	2008-08-14 18:36:13 UTC (rev 2306)
@@ -77,7 +77,8 @@
 
         void initialize(const rd_aio_cb rd_cb);
         iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
-                std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp);
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp,
+                bool ignore_pending_txns);
         u_int32_t get_events(page_state state = AIO_COMPLETE);
         void recover_complete();
         inline bool is_valid() const {return _valid; }

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-08-14 17:49:28 UTC (rev 2305)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-08-14 18:36:13 UTC (rev 2306)
@@ -834,7 +834,6 @@
 #endif
                         break;
                     case data_tok::COMMIT_SUBM:
-//std::cout << " * commit-ret, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
                         dtokl.push_back(dtokp);
                         tot_data_toks++;
                         dtokp->set_wstate(data_tok::COMMITTED);




More information about the rhmessaging-commits mailing list