[rhmessaging-commits] rhmessaging commits: r1994 - in store/trunk/cpp: lib and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon May 5 10:58:54 EDT 2008


Author: kpvdr
Date: 2008-05-05 10:58:54 -0400 (Mon, 05 May 2008)
New Revision: 1994

Modified:
   store/trunk/cpp/configure.ac
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/arr_cnt.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/nlfh.cpp
   store/trunk/cpp/lib/jrnl/nlfh.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/rrfc.cpp
   store/trunk/cpp/lib/jrnl/rrfc.hpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
   store/trunk/cpp/tests/jrnl/_ut_jinf.cpp
Log:
Bugfix for 444592: "Store fails to recover large dequeued journals owing to RHM_IORES_PAGE_AIOWAIT". Some tidy-up too.

Modified: store/trunk/cpp/configure.ac
===================================================================
--- store/trunk/cpp/configure.ac	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/configure.ac	2008-05-05 14:58:54 UTC (rev 1994)
@@ -127,7 +127,7 @@
 AC_SEARCH_LIBS([__db_open], [db_cxx-4.6 db_cxx-4.5 db_cxx-4.4 db_cxx-4.3],
 		 [test "$ac_cv_search___db_open" = "none required" ||
 		  LIB_BERKELEY_DB=$ac_cv_search___db_open],
-		  AC_MSG_ERROR([couldn't find required library: db_cxx-4.3]))
+		  AC_MSG_ERROR([Couldn't find required library in range db_cxx-4.3 through db_cxx-4.6]))
 AC_SUBST([LIB_BERKELEY_DB])
 LIBS=$gl_saved_libs
 

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -561,6 +561,11 @@
     DataTokenImpl dtokp;
     size_t readSize = 0;
     unsigned msg_count=0;
+
+    // TODO: This optimization to skip reading if there are no enqueued messages to read
+    // breaks the python system test in phase 6 with "Exception: Cannot write lock file"
+    // Figure out what is breaking.
+    //bool read = jc->get_enq_cnt() > 0;
     bool read = true;
 
     void* dbuff = NULL; size_t dbuffSize = 0;

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -183,6 +183,7 @@
     std::ostringstream oss2;
     oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
     oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
+    oss2 << "; journal now read-only.";
     log(LOG_DEBUG, oss2.str());
 
     if (_mgmtObject.get() != 0)
@@ -194,6 +195,13 @@
     }
 }
 
+void
+JournalImpl::recover_complete()
+{
+    jcntl::recover_complete();
+    log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
+}
+
 #define MAX_AIO_SLEEPS 500
 #define AIO_SLEEP_TIME 1000000
 bool

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/JournalImpl.h	2008-05-05 14:58:54 UTC (rev 1994)
@@ -113,6 +113,8 @@
                 recover(num_jfiles, jfsize_sblks, 0, &aio_wr_callback, prep_tx_list, highest_rid,
                         queue_id);
             }
+
+            void recover_complete();
             
             // Temporary fn to read and save last msg read from journal so it can be assigned
             // in chunks. To be replaced when coding to do this direct from the journal is ready.

Modified: store/trunk/cpp/lib/jrnl/arr_cnt.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/arr_cnt.hpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/arr_cnt.hpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -43,7 +43,7 @@
     /**
     * \class arr_cnt
     * \brief Class which implements a dynamically allocated array of u_int32_t counters.
-    *     This is ideal where it is necessary to increment and decrement counts for an entuty
+    *     This is ideal where it is necessary to increment and decrement counts for an entity
     *     for which the number of elements is unknown, but for which the efficiency of a static
     *     array is required. None of the counts may go below zero.
     */

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -139,7 +139,7 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
-    _rmgr.initialize(rd_cb, 0);
+    _rmgr.initialize(rd_cb);
     _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
 
     // Write info file (<basename>.jinf) to disk
@@ -187,7 +187,7 @@
     if (_rcvdat._full)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
 
-    this->log(LOG_DEBUG, _rcvdat.to_string(_jid));
+    this->log(LOG_DEBUG, _rcvdat.to_log(_jid));
     
     _datafh = new lfh*[_num_jfiles];
     // 0 the pointer array first because new() can throw exceptions
@@ -205,7 +205,7 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
-    _rmgr.initialize(rd_cb, _rcvdat._fro);
+    _rmgr.initialize(rd_cb);
     _wmgr.initialize(wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
     
     _readonly_flag = true;
@@ -221,9 +221,8 @@
         _datafh[i]->reset(&_rcvdat);
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
-    _rmgr.recover_complete(_rcvdat._fro);
+    _rmgr.recover_complete();
     _readonly_flag = false;
-    this->log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
 }
 
 void 
@@ -413,6 +412,21 @@
         flush(block_till_aio_cmpl);
 }
 
+u_int16_t
+jcntl::get_earliest_fid()
+{
+    u_int16_t ffid = _wrfc.earliest_index();
+    u_int16_t fid = _wrfc.index();
+    while ( _emap.get_enq_cnt(ffid) == 0 && _tmap.get_txn_fid_cnt(ffid) == 0 && ffid != fid)
+    {
+        if (++ffid >= _num_jfiles)
+            ffid = 0; 
+    }
+    if (ffid != _rrfc.fid())
+        _rrfc.reset(ffid);
+    return ffid;
+}
+
 iores
 jcntl::flush(const bool block_till_aio_cmpl)
 {
@@ -602,7 +616,7 @@
         u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
         if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
             rd._full = true;
-
+        
         // Remove all transactions not in prep_txn_list      
         std::vector<std::string> xid_list;
         _tmap.xid_list(xid_list);
@@ -760,7 +774,7 @@
             }
             break;
         case 0:
-            rd._eo = ifsp->tellg();
+            rd._eo = file_pos;
             return false;
         default:
             // Stop as this is the overwrite boundary.

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -534,7 +534,7 @@
 
         inline u_int16_t get_rd_fid() const { return _rrfc.index(); }
         inline u_int16_t get_wr_fid() const { return _wrfc.index(); }
-        inline u_int16_t get_earliest_fid() const { return _wrfc.earliest_index(); }
+        u_int16_t get_earliest_fid();
 
         /**
         * \brief Check if a particular rid is enqueued. Note that this function will return

Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -113,13 +113,6 @@
         {
             if (!ro->_empty)
             {
-                // For first file only, set read counters to ahead of file header
-                if (ro->_ffid == _fid)
-                {
-                    _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
-                    _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
-                }
-                
                 // For last file only, set write counters to end of last record (the
                 // continuation point); for all others, set to eof.
                 if (ro->_lfid == _fid)
@@ -199,15 +192,24 @@
 bool
 nlfh::reset(const rcvdat* const ro)
 {
+    rd_reset();
+    return wr_reset(ro);
+}
+
+void
+nlfh::rd_reset()
+{
+    _rd_subm_cnt_dblks = 0;
+    _rd_cmpl_cnt_dblks = 0;
+}
+
+bool
+nlfh::wr_reset(const rcvdat* const ro)
+{
     if (ro)
     {
         if (!ro->_empty)
         {
-            if (ro->_ffid == _fid)
-            {
-                _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
-                _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
-            }
             if (ro->_lfid == _fid)
             {
                 _wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
@@ -232,8 +234,6 @@
     _wr_subm_cnt_dblks = 0;
     _wr_cmpl_cnt_dblks = 0;
 #endif
-    _rd_subm_cnt_dblks = 0;
-    _rd_cmpl_cnt_dblks = 0;
     return true;
 }
 

Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -79,6 +79,8 @@
         virtual void initialize(const std::string& fbasename, const u_int16_t fid,
                 const u_int32_t jfsize_sblks, const rcvdat* const ro);
         virtual bool reset(const rcvdat* const ro = 0);
+        virtual void rd_reset();
+        virtual bool wr_reset(const rcvdat* const ro = 0);
 
         inline const std::string& fname() const { return _fname; }
         inline u_int16_t fid() const { return _fid; }

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -107,6 +107,31 @@
                             std::endl;
                 return oss.str();
             }
+
+            std::string to_log(std::string& jid)
+            {
+                std::ostringstream oss;
+                oss << "Recover file analysis (jid=\"" << jid << "\"):";
+                oss << " owi=" << (_owi ? "TRUE" : "FALSE");
+                oss << " frot=" << (_frot ? "TRUE" : "FALSE");
+                oss << " empty=" << (_empty ? "TRUE" : "FALSE");
+                oss << " ffid=" << _ffid;
+                oss << " fro=0x" << std::hex << _fro << std::dec << " (" <<
+                        (_fro/JRNL_DBLK_SIZE) << " dblks)";
+                oss << " lfid=" << _lfid;
+                oss << " eo=0x" << std::hex << _eo << std::dec << " ("  <<
+                        (_eo/JRNL_DBLK_SIZE) << " dblks)";
+                oss << " h_rid=0x" << std::hex << _h_rid << std::dec;
+                oss << " full=" << (_full ? "TRUE" : "FALSE");
+                oss << " Enqueued records (txn & non-txn): [ ";
+                for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+                {
+                    if (i) oss << " ";
+                    oss << "fid_" << std::setw(2) << std::setfill('0') << i << "=" << _enq_cnt_list[i];
+                }
+                oss << " ]";
+                return oss.str();
+            }
         };
 }
 }

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -58,21 +58,10 @@
 }
 
 void
-rmgr::initialize(const rd_aio_cb rd_cb, const std::size_t fro)
+rmgr::initialize(const rd_aio_cb rd_cb)
 {
     _cb = rd_cb;
     initialize();
-    if (fro)
-    {
-        u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
-        _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
-        u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
-        _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
-        _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
-        _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
-        _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
-    }
-
     clean();
     // Allocate memory for reading file header
     if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
@@ -313,8 +302,17 @@
         {
             std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
             _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
-            std::size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
-            _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
+
+//             std::size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
+//             _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
+            u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+            _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+            u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+            _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+            _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+            _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+            _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+
             _fhdr_rd_outstanding = false;
             _valid = true;
         }
@@ -327,19 +325,8 @@
 }
 
 void
-rmgr::recover_complete(std::size_t fro)
-{
-    if (fro)
-    {
-        u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
-        _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
-        u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
-        _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
-        _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
-        _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
-        _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
-    }
-}
+rmgr::recover_complete()
+{}
 
 void
 rmgr::invalidate()
@@ -571,26 +558,25 @@
 void
 rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
 {
+    if (_fhdr_rd_outstanding)
+        return;
     for (int16_t i=0; i<num_uninit; i++)
     {
         if (_rrfc.is_void()) // Nothing to do; this file not yet written to
             break;
-            
+
+        if (!_valid)
+        {
+            u_int16_t fid = _jc->get_earliest_fid();
+            init_file_header_read(fid);
+            break;
+        }
+        
         if (_rrfc.subm_offs() == 0)
         {
-            if (_valid)
-            {
-                _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
-                _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
-            }
-            else
-            {
-                u_int16_t fid = _jc->get_earliest_fid();
-                init_file_header_read(fid);
-            }
+            _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+            _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
         }
-        else
-            _valid = true;
 
         // TODO: Future perf improvement: Do a single AIO read for all available file
         // space into all contiguous empty pages in one AIO operation.

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -75,11 +75,11 @@
         rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
         virtual ~rmgr();
 
-        void initialize(const rd_aio_cb rd_cb, const std::size_t fro);
-        iores read(void** const datapp, std::size_t& dsize, void** const xidpp, std::size_t& xidsize,
-                bool& transient, bool& external, data_tok* dtokp);
+        void initialize(const rd_aio_cb rd_cb);
+        iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
+                std::size_t& xidsize, bool& transient, bool& external, data_tok* dtokp);
         u_int32_t get_events(page_state state = AIO_COMPLETE);
-        void recover_complete(std::size_t fro);
+        void recover_complete();
         inline bool is_valid() const {return _valid; }
         inline void synchronize() { if (!_valid) aio_cycle(); }
         void invalidate();

Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -62,6 +62,7 @@
 {
     _fh_index = fh_index;
     _curr_fh = _fh_arr[_fh_index];
+    _curr_fh->rd_reset();
 }
 
 bool

Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -72,7 +72,7 @@
         *     each of which correspond to one of the physical files.
         * \param fh_index Initial index of journal file. Default = 0.
         */
-        void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
+        virtual void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
 
         void reset(u_int32_t fh_index = 0);
 

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -62,7 +62,11 @@
 {
     if (rdp)
     {
-        rrfc::initialize(nfiles, fh_arr, rdp->_lfid);
+        _nfiles = nfiles;
+        _fh_arr = fh_arr;
+        _fh_index = rdp->_lfid;
+        _curr_fh = _fh_arr[rdp->_lfid];
+        _curr_fh->wr_reset(rdp);
         _rid = rdp->_h_rid + 1;
         _reset_ok = true;
         _owi = rdp->_owi;

Modified: store/trunk/cpp/tests/jrnl/_ut_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_jinf.cpp	2008-05-01 18:01:33 UTC (rev 1993)
+++ store/trunk/cpp/tests/jrnl/_ut_jinf.cpp	2008-05-05 14:58:54 UTC (rev 1994)
@@ -38,6 +38,7 @@
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
 #include "jrnl/jinf.hpp"
+#include <vector>
 
 #define NUM_JFILES 4
 #define JFSIZE_SBLKS 128




More information about the rhmessaging-commits mailing list