[rhmessaging-commits] rhmessaging commits: r2342 - in store/branches/mrg-1.0/cpp: tests/jrnl/jtt and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Aug 21 16:24:57 EDT 2008


Author: kpvdr
Date: 2008-08-21 16:24:57 -0400 (Thu, 21 Aug 2008)
New Revision: 2342

Modified:
   store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp
   store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Fix for BZ459627 "Journal recover fails if last record of last file coincides with eof". Also removed the non-TPL transaction abort code, and corrected jcntl logic shortcommings for handling these without explicit aborts.

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -351,6 +351,8 @@
         if (size_read < _deq_hdr._xidsize - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }
@@ -365,6 +367,8 @@
         if (size_read < sizeof(rec_tail) - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_rec.cpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -469,6 +469,8 @@
         if (size_read < _enq_hdr._xidsize - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }
@@ -484,6 +486,8 @@
             if (size_read < _enq_hdr._dsize - offs)
             {
                 assert(ifsp->eof());
+                // As we may have read past eof, turn off fail bit
+                ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
                 return false;
             }
         }
@@ -501,6 +505,8 @@
         if (size_read < sizeof(rec_tail) - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -209,7 +209,7 @@
     // (lfh) counters and pointers for both read and write, since write activity
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
-    _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
+    _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
     _rmgr.initialize(rd_cb);
     _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
             JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
@@ -226,7 +226,7 @@
     for (u_int16_t i=0; i<_num_jfiles; i++)
         _datafh[i]->reset(&_rcvdat);
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
-    _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
+    _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
     _rmgr.recover_complete();
     _readonly_flag = false;
 }
@@ -627,12 +627,7 @@
         std::ifstream ifs;
         bool lowi = rd._owi; // local copy of owi to be used during analysis
         while (rcvr_get_next_record(fid, &ifs, lowi, rd)) ;
-        
-        // Check for journal full condition
-        u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
-        if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
-            rd._full = true;
-        
+                
         if (!rd._empty && prep_txn_list_ptr)
         {
             std::vector<std::string> xid_list;
@@ -651,11 +646,14 @@
                         else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
                             _emap.unlock(i->_drid);
                     }
-                    // Write abort record to disk
-                    rcvr_write_abort(rd, *itr);
                 }
             }
         }
+
+        // Check for journal full condition
+        u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
+        if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
+            rd._full = true;
     }
 }
 
@@ -732,7 +730,7 @@
                 {
                     try
                     {
-                        u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+                        u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid(), true);
                         rd._enq_cnt_list[enq_fid]--;
                     }
                     catch(const jexception& e)
@@ -816,6 +814,8 @@
             {
                 u_int32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr));
                 ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(rec_hdr));
+                if (!jfile_cycle(fid, ifsp, lowi, rd, false))
+                    return false;
             }
             break;
         case 0:
@@ -868,6 +868,7 @@
         if (ifsp->eof() || !ifsp->good())
         {
             rd._eo = ifsp->tellg(); // remember file offset before closing
+            assert(rd._eo != 0xffffffffffffffff); // Check for error code -1
             ifsp->close();
             if (++fid >= _num_jfiles)
             {
@@ -979,107 +980,5 @@
     }
 }
 
-// TODO - FIXME - Unify the recover and normal aio write methods.
-// Normally, writes are not performed during recover mode (journal is in read-only
-// mode) so initialization of the aio write controllers is deferred until recover
-// is complete. Currenlty because journal is still in recover mode when
-// rcvr_write_abort() is called, normal writes are not possible, so std::ofstream
-// writes are used instead. Lots of logic duplication!
-void
-jcntl::rcvr_write_abort(rcvdat& rd, std::string xid)
-{
-    const u_int32_t sblk_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
-    const u_int32_t amagic = RHM_JDAT_TXA_MAGIC;
-    const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
-
-    // Check last record ends on sblk boundary
-    assert(rd._eo % sblk_size == 0);
-    
-    // Find fid and posn to write
-    u_int16_t fid = rd._lfid;
-    std::streampos file_pos = rd._eo;
-    if (rd._eo/sblk_size >= _jfsize_sblks) // file full, use next file
-    {
-        if (++fid >= _num_jfiles)
-        {
-            fid = 0;
-            rd._owi = !rd._owi;
-        }
-        file_pos = 0;
-    }
-
-    // Prepare a buffer of at least 1 sblock
-    u_int32_t abort_dblks = txn_rec::size_dblks(sizeof(txn_hdr) + xid.size());
-    std::size_t buffsize = abort_dblks < JRNL_SBLK_SIZE ? JRNL_SBLK_SIZE : abort_dblks;
-    void* buff = std::malloc(buffsize * JRNL_DBLK_SIZE);
-    assert(buff != 0);
-
-    // Initialize file stream
-    std::ostringstream fn;
-    fn << _jdir.dirname() << "/" << _base_filename << ".";
-    fn << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
-    std::ofstream ofs(fn.str().c_str(),
-            std::ios_base::in | std::ios_base::out | std::ios_base::binary);
-    if (!ofs.good())
-        throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl", "rcvr_write_abort");
-    if (file_pos)
-        ofs.seekp(file_pos);
-    else
-    {
-        // New file, write new file header
-        file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, ++rd._h_rid, fid, sblk_size, rd._owi, true);
-        ofs.write((const char*)&fh, sizeof(file_hdr));
-        assert(!ofs.fail());
-        // fill remainder of sblk with fill char
-        std::memset((char*)buff, RHM_CLEAN_CHAR, sblk_size - sizeof(file_hdr));
-        ofs.write((const char*)buff, sblk_size - sizeof(file_hdr));
-        assert(!ofs.fail());
-        // log write action
-        std::ostringstream oss;
-        oss << "Recover phase write: File header in fid=" << fid << " at offs=0x0 for txn abort record";
-        this->log(LOG_NOTICE, oss.str());
-        file_pos = ofs.tellp();
-    }
-    
-    // Write abort record
-    txn_rec ar(amagic, ++rd._h_rid, xid.data(), xid.size(), rd._owi);
-    ar.encode(buff, 0, abort_dblks);
-    ofs.write((const char*)buff, JRNL_DBLK_SIZE);
-    assert(!ofs.fail());
-    // log write action
-    std::ostringstream oss;
-    oss << std::hex << std::setfill('0') << "Recover phase write: Aborted unprepared transaction xid=\"";
-    for (std::size_t i=0; i<xid.size(); i++)
-    {
-        int c = xid.at(i);
-        if (std::isprint(c))
-            oss << (char)c;
-        else 
-            oss << "\\x" << std::setw(2) << c;
-    }
-    oss << "\" at offs=0x" << file_pos;
-    this->log(LOG_NOTICE, oss.str());
-    file_pos = ofs.tellp();
-
-    // Prepare filler record
-    std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
-    std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
-    // Write filler as many times as needed to get to next sblk boundary
-    while (file_pos % sblk_size)
-    {
-        ofs.write((const char*)buff, JRNL_DBLK_SIZE);
-        assert(!ofs.fail());
-        std::ostringstream oss;
-        oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
-        this->log(LOG_NOTICE, oss.str());
-        file_pos = ofs.tellp();
-    }
-    rd._eo = file_pos;
-
-    // Clean up
-    ofs.close();
-    std::free(buff);
-}
-
 } // namespace journal
 } // namespace rhm

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -679,8 +679,6 @@
                 std::streampos& read_pos);
         
         void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
-
-        void rcvr_write_abort(rcvdat& rd, std::string xid);
     };
 
 } // namespace journal

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rmgr.cpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -374,7 +374,7 @@
         if (_jc->unflushed_dblks() > 0)
             _jc->flush();
         else
-        return RHM_IORES_EMPTY;
+            return RHM_IORES_EMPTY;
     }
 
     // Check write state of this token is ENQ - required for read

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.cpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -50,15 +50,27 @@
 rrfc::~rrfc() {}
 
 void
-rrfc::initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index)
+rrfc::initialize(const u_int16_t nfiles, nlfh** fh_arr, rcvdat* rdp)
 {
     _nfiles = nfiles;
     _fh_arr = fh_arr;
-    reset(fh_index);
+    if (rdp)
+    {
+        // Find first file with enqueued records
+        u_int16_t index = rdp->_ffid;
+        while (index != rdp->_lfid && rdp->_enq_cnt_list[index] == 0)
+        {
+            if (++index >= _nfiles)
+                index = 0;
+        }
+        reset(index);
+    }
+    else
+        reset(0);
 }
 
 void
-rrfc::reset(u_int32_t fh_index)
+rrfc::reset(u_int16_t fh_index)
 {
     _fh_index = fh_index;
     _curr_fh = _fh_arr[_fh_index];

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/rrfc.hpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -70,11 +70,12 @@
         * \param nfiles Number of files in the rotating file group.
         * \param fh_arr Pointer to an array of file handles (nlogging_fh or subclasses),
         *     each of which correspond to one of the physical files.
-        * \param fh_index Initial index of journal file. Default = 0.
+        * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
+        *     0 (NULL).
         */
-        virtual void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
+        virtual void initialize(const u_int16_t nfiles, nlfh** fh_arr, rcvdat* rdp = 0);
 
-        void reset(u_int32_t fh_index = 0);
+        void reset(u_int16_t fh_index = 0);
 
         /**
         * \brief Rotate active file handle to next file in rotating file group.

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_rec.cpp	2008-08-21 20:24:57 UTC (rev 2342)
@@ -339,6 +339,8 @@
         if (size_read < _txn_hdr._xidsize - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }
@@ -352,6 +354,8 @@
         if (size_read < sizeof(rec_tail) - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-21 20:24:28 UTC (rev 2341)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-21 20:24:57 UTC (rev 2342)
@@ -104,19 +104,20 @@
         raise Exception('Inconsistent data size: dsize=%d, data(%d)=\"%s\"' % (dsize, len(data), datastr))
     return 'data(%d)=\"%s\" ' % (dsize, datastr)
 
-def hex_split_str(s):
-    hex_str = ''
-    if len(s) > 10:
-        for i in range(0, 5):
-            hex_str += '%02x ' % ord(s[i])
-        hex_str += '... '
-        for i in range(len(s)-5, len(s)):
-            hex_str += '%02x ' % ord(s[i])
-    else:
-        for i in range(0, len(s)):
-            hex_str += '%02x ' % ord(s[i])
-    return hex_str
+def hex_split_str(s, split_size = 50):
+    if len(s) <= split_size:
+        return hex_str(s, 0, len(s))
+    return hex_str(s, 0, 10) + ' ... ' + hex_str(s, len(s)-10, len(s))
 
+def hex_str(s, b, e):
+    o = ''
+    for i in range(b, e):
+        if isprintable(s[i]):
+            o += s[i]
+        else:
+            o += '\\%02x' % ord(s[i])
+    return o
+
 def split_str(s):
     if len(s) > 25:
         return s[:12] + ' ... ' + s[-10:]
@@ -540,7 +541,7 @@
                             if hdr.xid == None:
                                 if hdr.deq_rid in self.emap:
                                     if self.emap[hdr.deq_rid][2]:
-                                        warn = ' (WARNING: dequeue rid %d dequeues locked enqueue records %d)' % (hdr.rid, hdr.deq_rid)
+                                        warn = ' (WARNING: dequeue rid 0x%x dequeues locked enqueue record 0x%x)' % (hdr.rid, hdr.deq_rid)
                                     del self.emap[hdr.deq_rid]
                                 else:
                                     warn = ' (WARNING: rid being dequeued %d not found in enqueued records)' % hdr.deq_rid




More information about the rhmessaging-commits mailing list