[rhmessaging-commits] rhmessaging commits: r2305 - in store/trunk/cpp/lib: jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Aug 14 13:49:28 EDT 2008


Author: kpvdr
Date: 2008-08-14 13:49:28 -0400 (Thu, 14 Aug 2008)
New Revision: 2305

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
Log:
Fix for 458656 "List of xids from dtx.recover is not as expected". The call to BdbMessageStore::collectPreparedXids() was using the prepared list obtained at recover, but this call needs to provide a current list, no matter when called. Added ability to ignore transactions when reading, and created a subclass of JournalImpl (called TplJournalImpl) which implements the ignore as standard practice when calling read. This enables reads of pending transactions from this list. Now BdbMessageStore::collectPreparedXids() re-reads the TPL before returning the XID list.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-08-14 17:49:28 UTC (rev 2305)
@@ -249,7 +249,7 @@
         open(mappingDb, txn.get(), "mappings.db", true);
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
-        tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
+        tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
         txn.commit();
     } catch (const journal::jexception& e) {
         txn.abort();
@@ -930,77 +930,84 @@
     return count;
 }
 
-void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
+void BdbMessageStore::readTplStore()
 {
+    tplRecoverMap.clear();
+    journal::txn_map& tmap = tplStorePtr->get_txn_map();
+    DataTokenImpl dtokp;
+    void* dbuff = NULL; size_t dbuffSize = 0;
+    void* xidbuff = NULL; size_t xidbuffSize = 0;
+    bool transientFlag = false;
+    bool externalFlag = false;
+    bool done = false;
+    try {
+        unsigned aio_sleep_cnt = 0;
+        while (!done) {
+            dtokp.reset();
+            dtokp.set_wstate(DataTokenImpl::ENQ);
+            switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+              case rhm::journal::RHM_IORES_SUCCESS: {
+                // Every TPL record contains both data and an XID
+                assert(dbuffSize>0);
+                assert(xidbuffSize>0);
+                std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+                bool is2PC = *(static_cast<char*>(dbuff)) != 0;
+
+                // Check transaction details; add to recover map
+                journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                unsigned enqCnt = 0;
+                unsigned deqCnt = 0;
+                u_int64_t rid = 0;
+
+                // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
+                // Note: will apply to both 1PC and 2PC transactions.
+                bool commitFlag = true;
+
+                for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+                    if (j->_enq_flag) {
+                        rid = j->_rid;
+                        enqCnt++;
+                    } else {
+                        commitFlag = j->_commit_flag;
+                        deqCnt++;
+                    }
+                }
+                assert(enqCnt == 1);
+                assert(deqCnt <= 1);
+                tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
+
+                ::free(xidbuff);
+                aio_sleep_cnt = 0;
+                break;
+                }
+              case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+                if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                    THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
+                ::usleep(AIO_SLEEP_TIME);
+                break;
+              case rhm::journal::RHM_IORES_EMPTY:
+                done = true;
+                break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+              default:
+                assert("Store Error: Unexpected msg state");
+            } // switch
+        }
+    } catch (const journal::jexception& e) {
+        THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
+    }
+}
+
+void BdbMessageStore::recoverTplStore()
+{
     if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
         u_int64_t thisHighestRid;
         tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
-        journal::txn_map& tmap = tplStorePtr->get_txn_map();
         if (thisHighestRid > highestRid)
             highestRid = thisHighestRid;
 
-        DataTokenImpl dtokp;
-        void* dbuff = NULL; size_t dbuffSize = 0;
-        void* xidbuff = NULL; size_t xidbuffSize = 0;
-        bool transientFlag = false;
-        bool externalFlag = false;
-        bool done = false;
-        try {
-            unsigned aio_sleep_cnt = 0;
-            while (!done) {
-                dtokp.reset();
-                dtokp.set_wstate(DataTokenImpl::ENQ);
-                switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
-                  case rhm::journal::RHM_IORES_SUCCESS: {
-                    // Every TPL record contains both data and an XID
-                    assert(dbuffSize>0);
-                    assert(xidbuffSize>0);
-                    std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
-                    bool is2PC = *(static_cast<char*>(dbuff)) != 0;
+        // Load tplRecoverMap by reading the TPL store
+        readTplStore();
 
-                    // Check transaction details; add to recover map
-                    journal::txn_data_list txnList = tmap.get_tdata_list(xid);
-                    unsigned enqCnt = 0;
-                    unsigned deqCnt = 0;
-                    u_int64_t rid = 0;
-
-                    // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
-                    // Note: will apply to both 1PC and 2PC transactions.
-                    bool commitFlag = true;
-
-                    for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                        if (j->_enq_flag) {
-                            rid = j->_rid;
-                            enqCnt++;
-                        } else {
-                            commitFlag = j->_commit_flag;
-                            deqCnt++;
-                        }
-                    }
-                    assert(enqCnt == 1);
-                    assert(deqCnt <= 1);
-                    tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
-
-                    ::free(xidbuff);
-                    aio_sleep_cnt = 0;
-                    break;
-                    }
-                  case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
-                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-                        THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
-                    ::usleep(AIO_SLEEP_TIME);
-                    break;
-                  case rhm::journal::RHM_IORES_EMPTY:
-                    done = true;
-                    break; // done with all messages. (add call in jrnl to test that _emap is empty.)
-                  default:
-                    assert("Store Error: Unexpected msg state");
-                } // switch
-            }
-        } catch (const journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
-        }
-
         tplStorePtr->recover_complete(); // start journal.
     }
 }
@@ -1008,7 +1015,7 @@
 void BdbMessageStore::recoverLockedMappings(txn_list& txns)
 {
     if (!tplStorePtr->is_ready())
-        recoverTplStore(tplRecoverMap);
+        recoverTplStore();
 
     // Abort unprepaired xids and populate the locked maps
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -1022,8 +1029,12 @@
 
 void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
 {
-    if (!tplStorePtr->is_ready())
-        recoverTplStore(tplRecoverMap);
+    if (tplStorePtr->is_ready()) {
+        tplStorePtr->read_reset();
+        readTplStore();
+    } else {
+        recoverTplStore();
+    }
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
         // Discard all txns that are to be rolled forward/back and 1PC transactions
         if (!i->second.deq_flag && i->second.tpc_flag)

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-08-14 17:49:28 UTC (rev 2305)
@@ -93,7 +93,7 @@
     Db generalDb;
 
     // Pointer to Transaction Prepared List (TPL) journal instance
-    boost::shared_ptr<JournalImpl> tplStorePtr;
+    boost::shared_ptr<TplJournalImpl> tplStorePtr;
     TplRecoverMap tplRecoverMap;
 
     IdSequence queueIdSequence;
@@ -158,7 +158,8 @@
                        queue_index& index,
                        txn_list& locked,
                        message_index& prepared);
-    void recoverTplStore(TplRecoverMap& tplMap);
+    void readTplStore();
+    void recoverTplStore();
     void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
     void store(const qpid::broker::PersistableQueue* queue,

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-08-14 17:49:28 UTC (rev 2305)
@@ -75,7 +75,7 @@
             boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
             pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
             
-            u_int64_t lastReadRid; // rid of last read msg
+            u_int64_t lastReadRid; // rid of last read msg for loadMsgContent()
 
             bool writeActivityFlag;
             bool flushTriggeredFlag;
@@ -207,6 +207,28 @@
             }
         }; // class JournalImpl
 
+        class TplJournalImpl : public JournalImpl
+        {
+        public:
+            TplJournalImpl(const std::string& journalId,
+                           const std::string& journalDirectory,
+                           const std::string& journalBaseFilename,
+                           const qpid::sys::Duration getEventsTimeout,
+                           const qpid::sys::Duration flushTimeout) :
+                           JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout)
+            {}
+
+            virtual ~TplJournalImpl() {}
+
+            // Special version of read_data_record that ignores transactions - needed when reading the TPL
+            inline journal::iores read_data_record(void** const datapp, std::size_t& dsize,
+                    void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+                    journal::data_tok* const dtokp) {
+                return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+            }
+            inline void read_reset() { _rmgr.invalidate(); }
+        };
+
     } // namespace bdbstore
 } // namespace rhm
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-08-14 17:49:28 UTC (rev 2305)
@@ -310,14 +310,14 @@
 #define RCINVALID_SLEEP_TIME_MS 5
 iores
 jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
-        bool& transient, bool& external, data_tok* const dtokp)
+        bool& transient, bool& external, data_tok* const dtokp, bool ignore_pending_txns)
 {
     check_rstatus("read_data");
     unsigned cnt = 0;
     iores res;
     do
     {
-        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
+        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns);
         if (res == RHM_IORES_RCINVALID)
         {
             get_wr_events(); // check for outstanding write events

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-08-14 17:49:28 UTC (rev 2305)
@@ -415,7 +415,8 @@
         * \exception TODO
         */
         iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
-                std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp);
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp,
+                bool ignore_pending_txns = false);
 
         /**
         * \brief Dequeues (marks as no longer needed) data record in journal.

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-08-14 17:49:28 UTC (rev 2305)
@@ -90,7 +90,7 @@
 
 iores
 rmgr::read(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
-        bool& transient, bool& external, data_tok* dtokp)
+        bool& transient, bool& external, data_tok* dtokp,  bool ignore_pending_txns)
 {
     iores res = pre_read_check(dtokp);
     if (res != RHM_IORES_SUCCESS)
@@ -161,8 +161,9 @@
                 }
                 catch (const jexception& e)
                 {
+                    bool enforce_txns = !_jc->is_read_only() && !ignore_pending_txns;
                     // Block read for transactionally locked record (only when not recovering)
-                    if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
+                    if (e.err_code() == jerrno::JERR_MAP_LOCKED && enforce_txns)
                         return RHM_IORES_TXPENDING;
 
                     // (Recover mode only) Ok, not in emap - now search tmap, if present then read
@@ -181,7 +182,7 @@
                                 is_enq = ditr->_drid == _hdr._rid;
                         }
                     }
-                    if (!_jc->is_read_only() && is_enq)
+                    if (enforce_txns && is_enq)
                         return RHM_IORES_TXPENDING;
                 }
 

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-08-14 17:49:28 UTC (rev 2305)
@@ -77,7 +77,8 @@
 
         void initialize(const rd_aio_cb rd_cb);
         iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
-                std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp);
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp,
+                bool ignore_pending_txns);
         u_int32_t get_events(page_state state = AIO_COMPLETE);
         void recover_complete();
         inline bool is_valid() const {return _valid; }

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2008-08-14 17:49:28 UTC (rev 2305)
@@ -102,9 +102,9 @@
     * <pre>
     *   key      data
     *
-    *   xid1 --- vector< [ rid, drid, fid, enq_flag, aio_compl ] >
-    *   xid2 --- vector< [ rid, drid, fid, enq_flag, aio_compl ] >
-    *   xid3 --- vector< [ rid, drid, fid, enq_flag, aio_compl ] >
+    *   xid1 --- vector< [ rid, drid, fid, enq_flag, commit_flag, aio_compl ] >
+    *   xid2 --- vector< [ rid, drid, fid, enq_flag, commit_flag, aio_compl ] >
+    *   xid3 --- vector< [ rid, drid, fid, enq_flag, commit_flag, aio_compl ] >
     *   ...
     * </pre>
     */




More information about the rhmessaging-commits mailing list