[rhmessaging-commits] rhmessaging commits: r1523 - store/trunk/cpp/lib/jrnl.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Dec 19 15:36:48 EST 2007


Author: kpvdr
Date: 2007-12-19 15:36:48 -0500 (Wed, 19 Dec 2007)
New Revision: 1523

Modified:
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/nlfh.cpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
Log:
Fix for BZ 425211. Large files exposed incorrect initialization of read manger counters.

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-19 20:36:48 UTC (rev 1523)
@@ -150,8 +150,8 @@
     if (_rcvdat._full)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
 
-    // Debug info; may be useful to print with a flag
-    // std::cout << _rcvdat.to_string(_jid) << std::endl;
+    // Debug info; should be sent to log file
+    // std::cout << _rcvdat.to_string(_jid) << std::endl << std::flush;
 
     if (_datafh)
     {
@@ -519,14 +519,14 @@
     rec_hdr h;
     if (!jfile_cycle(fid, ifsp, lowi, rd, true))
         return false;
-    std::streampos read_pos = ifsp->tellg();
+    std::streampos file_pos = ifsp->tellg();
     ifsp->read((char*)&h, sizeof(rec_hdr));
     switch(h._magic)
     {
         case RHM_JDAT_ENQ_MAGIC:
             {
                 enq_rec er;
-                if (!decode(er, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+                if (!decode(er, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
                     return false;
                 if (!er.is_transient()) // Ignore transient msgs
                 {
@@ -547,7 +547,7 @@
         case RHM_JDAT_DEQ_MAGIC:
             {
                 deq_rec dr;
-                if (!decode(dr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+                if (!decode(dr, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
                     return false;
                 if (dr.xid_size())
                 {
@@ -580,7 +580,7 @@
         case RHM_JDAT_TXA_MAGIC:
             {
                 txn_rec ar;
-                if (!decode(ar, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+                if (!decode(ar, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
                     return false;
                 // Delete this txn from tmap, unlock any locked records in emap
                 ar.get_xid(&xidp);
@@ -607,7 +607,7 @@
         case RHM_JDAT_TXC_MAGIC:
             {
                 txn_rec cr;
-                if (!decode(cr, fid, ifsp, cum_size_read, h, lowi, rd, read_pos))
+                if (!decode(cr, fid, ifsp, cum_size_read, h, lowi, rd, file_pos))
                     return false;
                 // Delete this txn from tmap, process records into emap
                 cr.get_xid(&xidp);
@@ -638,27 +638,21 @@
             rd._eo = ifsp->tellg();
             return false;
         default:
-            // Is this the last file, if so, stop as this is the overwrite boundary.
-            if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
-            {
-                rd._lfid = fid;
-                rd._eo = read_pos;
-                return false;
-            }
-            std::ostringstream oss;
-            oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-            oss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
-            throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, oss.str(), "jcntl",
-                    "rcvr_get_next_record");
+            // Stop as this is the overwrite boundary.
+            rd._lfid = fid;
+            rd._eo = file_pos;
+            return false;
     }
     return true;
 }
 
 const bool
 jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read, rec_hdr& h,
-        bool& lowi, rcvdat& rd, std::streampos& rec_offset)
+        bool& lowi, rcvdat& rd, std::streampos& file_offs)
 {
-    if (!check_owi(fid, h, lowi, rd, rec_offset))
+    u_int16_t start_fid = fid;
+    std::streampos start_file_offs = file_offs;
+    if (!check_owi(fid, h, lowi, rd, file_offs))
         return false;
     bool done = false;
     while (!done)
@@ -668,9 +662,9 @@
         {
             if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
                     fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
-            check_journal_alignment(fid, rec_offset);
-            rd._lfid = fid;
-            rd._eo = rec_offset;
+            check_journal_alignment(start_fid, start_file_offs);
+            rd._lfid = start_fid;
+            rd._eo = start_file_offs;
             return false;
         }
         if (!jfile_cycle(fid, ifsp, lowi, rd, false))
@@ -728,22 +722,22 @@
 }
         
 const bool
-jcntl::check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& read_pos)
+jcntl::check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& file_pos)
 {
     if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator changed
     {
         u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
         if (fid == expected_fid)
         {
-            check_journal_alignment(fid, read_pos);
+            check_journal_alignment(fid, file_pos);
             rd._lfid = fid;
-            rd._eo = read_pos;
+            rd._eo = file_pos;
             return false;
         }
         std::ostringstream oss;
         oss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
         oss << " fid=0x" << std::setw(4) << fid << " rid=0x" << std::setw(8) << h._rid;
-        oss << " foffs=0x" << std::setw(8) << read_pos;
+        oss << " foffs=0x" << std::setw(8) << file_pos;
         oss << " expected_fid=0x" << std::setw(4) << expected_fid;
         throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, oss.str(), "jcntl",
                 "check_owi");
@@ -755,14 +749,14 @@
         
 
 void
-jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& file_pos)
 {
-    unsigned sblk_offs = rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+    unsigned sblk_offs = file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
     if (sblk_offs)
     {
         // TODO: Connect the following with logger:
         std::cout << std::hex  << "INFO: Bad record alignment found at fid=0x" << fid <<
-                " offs=0x" << rec_offset << " (likely journal overwrite boundary); " <<
+                " offs=0x" << file_pos << " (likely journal overwrite boundary); " <<
                 (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) <<
                 " filler record(s) required." << std::endl;
         const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
@@ -773,7 +767,7 @@
                 std::ios_base::in | std::ios_base::out | std::ios_base::binary);
         if (!ofsp.good())
             throw jexception(jerrno::JERR__FILEIO, oss.str(), "jcntl", "check_journal_alignment");
-        ofsp.seekp(rec_offset);
+        ofsp.seekp(file_pos);
         void* buff = ::malloc(JRNL_DBLK_SIZE);
         assert(buff != NULL);
         ::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
@@ -782,13 +776,13 @@
         // clear should inspection of the file be required.
         ::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
 
-        while (rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
+        while (file_pos % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
         {
             ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
             assert(!ofsp.fail());
             // TODO: Connect the following with logger:
-            std::cout << "INFO: * Wrote filler record at offs=0x" << rec_offset << std::endl;
-            rec_offset = ofsp.tellp();
+            std::cout << "INFO: * Wrote filler record at offs=0x" << file_pos << std::endl;
+            file_pos = ofsp.tellp();
         }
         ofsp.close();
         ::free(buff);

Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp	2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp	2007-12-19 20:36:48 UTC (rev 1523)
@@ -108,15 +108,19 @@
     if (::stat(_fname.c_str(), &s))
     {
 #endif
-        if (ro)
+        if (ro) // Recovery initialization: set counters only
         {
             if (!ro->_empty)
             {
+                // For first file only, set read counters to ahead of file header
                 if (ro->_ffid == _fid)
                 {
                     _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
                     _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
                 }
+                
+                // For last file only, set write counters to end of last record (the
+                // continuation point); for all others, set to eof.
                 if (ro->_lfid == _fid)
                 {
                     _wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
@@ -127,19 +131,20 @@
                     _wr_subm_cnt_dblks = _ffull_dblks;
                     _wr_cmpl_cnt_dblks = _ffull_dblks;
                 }
+                // Set the number of enqueued records for this file.
                 _rec_enqcnt = ro->_enq_cnt_list[_fid];
             }
         }
-        else
+        else // Normal initialization: create empty journal files
         {
             const size_t sblksize = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
             const size_t writesize = sblksize * JRNL_WMGR_PAGE_SIZE;
 
             // NOTE: The journal file size is always one sblock bigger than the specified journal
-            // file size, which is the data content size. The extra
-            // block is for the journal file header which precedes all data on each file and is
-            // exactly one softblock in size.
+            // file size, which is the data content size. The extra block is for the journal file
+            // header which precedes all data on each file and is exactly one sblock in size.
             u_int32_t nsblks = jfsize_sblks + 1;
+
             void* nullbuf = NULL;
             if (::posix_memalign(&nullbuf, sblksize, writesize))
             {

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-12-19 20:36:48 UTC (rev 1523)
@@ -66,7 +66,15 @@
     _cb = rd_cb;
     initialize();
     if (fro)
-        _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+    {
+        u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+        _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+        u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+        _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+        _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+        _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+    }
 }
 
 const iores
@@ -444,9 +452,16 @@
 void
 rmgr::recover_complete(size_t fro)
 {
-    _pg_index = 0;
-    _pg_cntr = 0;
-    _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+    if (fro)
+    {
+        u_int32_t fro_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
+        _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+        u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        _pg_index = _pg_cntr % JRNL_RMGR_PAGES;
+        _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks;
+        _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
+        _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
+    }
 }
 
 void

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-12-19 14:32:22 UTC (rev 1522)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-12-19 20:36:48 UTC (rev 1523)
@@ -74,7 +74,7 @@
 #ifdef DRHM_TESTVALS
         _rid = u_int64_t(0xffeeddcc) << 32;
 #else
-        _rid = 0;
+        _rid = 0ULL;
 #endif
         _reset_ok = false;
     }




More information about the rhmessaging-commits mailing list