[rhmessaging-commits] rhmessaging commits: r2341 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Aug 21 16:24:28 EDT 2008


Author: kpvdr
Date: 2008-08-21 16:24:28 -0400 (Thu, 21 Aug 2008)
New Revision: 2341

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/jrnl/deq_rec.cpp
   store/trunk/cpp/lib/jrnl/enq_rec.cpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rrfc.cpp
   store/trunk/cpp/lib/jrnl/rrfc.hpp
   store/trunk/cpp/lib/jrnl/txn_rec.cpp
   store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Fix for BZ459627 "Journal recover fails if last record of last file coincides with eof". Also removed the non-TPL transaction abort code, and corrected jcntl logic shortcommings for handling these without explicit aborts.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -746,7 +746,7 @@
     size_t preambleLength = sizeof(u_int32_t)/*header size*/;
 
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-    DataTokenImpl dtokp;
+    DataTokenImpl dtok;
     size_t readSize = 0;
     unsigned msg_count = 0;
 
@@ -761,14 +761,14 @@
     bool transientFlag = false;
     bool externalFlag = false;
 
-    dtokp.set_wstate(DataTokenImpl::ENQ);
+    dtok.set_wstate(DataTokenImpl::ENQ);
 
     // Read the message from the Journal.
     try {
         unsigned aio_sleep_cnt = 0;
         while (read) {
-            rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
-            readSize = dtokp.dsize();
+            rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+            readSize = dtok.dsize();
 
             switch (res)
             {
@@ -779,13 +779,13 @@
 
                 unsigned headerSize;
                 if (externalFlag) {
-                    msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+                    msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
                 } else {
                     headerSize = Buffer(data, preambleLength).getLong();
                     Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
                     msg = recovery.recoverMessage(headerBuff);
                 }
-                msg->setPersistenceId(dtokp.rid());
+                msg->setPersistenceId(dtok.rid());
 
                 u_int32_t contentOffset = headerSize + preambleLength;
                 u_int64_t contentSize = readSize - contentOffset;
@@ -795,11 +795,11 @@
                     msg->decodeContent(contentBuff);
                 }
 
-                PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
+                PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid());
                 if (i == prepared.end()) { // not in prepared list
                     queue->recover(msg);
                 } else {
-                    u_int64_t rid = dtokp.rid();
+                    u_int64_t rid = dtok.rid();
                     std::string xid(i->xid);
                     TplRecoverMapCitr citr = tplRecoverMap.find(xid);
                     if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
@@ -832,8 +832,8 @@
                     }
                 }
 
-                dtokp.reset();
-                dtokp.set_wstate(DataTokenImpl::ENQ);
+                dtok.reset();
+                dtok.set_wstate(DataTokenImpl::ENQ);
 
                 if (xidbuff)
                     ::free(xidbuff);
@@ -934,7 +934,7 @@
 {
     tplRecoverMap.clear();
     journal::txn_map& tmap = tplStorePtr->get_txn_map();
-    DataTokenImpl dtokp;
+    DataTokenImpl dtok;
     void* dbuff = NULL; size_t dbuffSize = 0;
     void* xidbuff = NULL; size_t xidbuffSize = 0;
     bool transientFlag = false;
@@ -943,9 +943,9 @@
     try {
         unsigned aio_sleep_cnt = 0;
         while (!done) {
-            dtokp.reset();
-            dtokp.set_wstate(DataTokenImpl::ENQ);
-            switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+            dtok.reset();
+            dtok.set_wstate(DataTokenImpl::ENQ);
+            switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok)) {
               case rhm::journal::RHM_IORES_SUCCESS: {
                 // Every TPL record contains both data and an XID
                 assert(dbuffSize>0);

Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -351,6 +351,8 @@
         if (size_read < _deq_hdr._xidsize - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }
@@ -365,6 +367,8 @@
         if (size_read < sizeof(rec_tail) - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }

Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -469,6 +469,8 @@
         if (size_read < _enq_hdr._xidsize - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }
@@ -484,6 +486,8 @@
             if (size_read < _enq_hdr._dsize - offs)
             {
                 assert(ifsp->eof());
+                // As we may have read past eof, turn off fail bit
+                ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
                 return false;
             }
         }
@@ -501,6 +505,8 @@
         if (size_read < sizeof(rec_tail) - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -203,7 +203,7 @@
     }
 
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
-    _rrfc.initialize(_num_jfiles, _fc_arr, _rcvdat._ffid);
+    _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
     _rmgr.initialize(rd_cb);
     _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
             JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
@@ -220,7 +220,7 @@
     for (u_int16_t i=0; i<_num_jfiles; i++)
         _fc_arr[i]->reset(&_rcvdat);
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
-    _rrfc.initialize(_num_jfiles, _fc_arr, _rcvdat._ffid);
+    _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
     _rmgr.recover_complete();
     _readonly_flag = false;
 }
@@ -622,11 +622,7 @@
         bool lowi = rd._owi; // local copy of owi to be used during analysis
         while (rcvr_get_next_record(fid, &ifs, lowi, rd)) ;
         
-        // Check for journal full condition
-        u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
-        if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
-            rd._full = true;
-        
+        // Remove all txns from tmap that are not in the prepared list
         if (!rd._empty && prep_txn_list_ptr)
         {
             std::vector<std::string> xid_list;
@@ -645,11 +641,14 @@
                         else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
                             _emap.unlock(i->_drid);
                     }
-                    // Write abort record to disk
-                    rcvr_write_abort(rd, *itr);
                 }
             }
         }
+
+        // Check for journal full condition
+        u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
+        if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
+            rd._full = true;
     }
 }
 
@@ -726,7 +725,7 @@
                 {
                     try
                     {
-                        u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+                        u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid(), true);
                         rd._enq_cnt_list[enq_fid]--;
                     }
                     catch(const jexception& e)
@@ -810,6 +809,8 @@
             {
                 u_int32_t rec_dblks = jrec::size_dblks(sizeof(rec_hdr));
                 ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(rec_hdr));
+                if (!jfile_cycle(fid, ifsp, lowi, rd, false))
+                    return false;
             }
             break;
         case 0:
@@ -862,6 +863,7 @@
         if (ifsp->eof() || !ifsp->good())
         {
             rd._eo = ifsp->tellg(); // remember file offset before closing
+            assert(rd._eo != 0xffffffffffffffff); // Check for error code -1
             ifsp->close();
             if (++fid >= _num_jfiles)
             {
@@ -973,107 +975,5 @@
     }
 }
 
-// TODO - FIXME - Unify the recover and normal aio write methods.
-// Normally, writes are not performed during recover mode (journal is in read-only
-// mode) so initialization of the aio write controllers is deferred until recover
-// is complete. Currenlty because journal is still in recover mode when
-// rcvr_write_abort() is called, normal writes are not possible, so std::ofstream
-// writes are used instead. Lots of logic duplication!
-void
-jcntl::rcvr_write_abort(rcvdat& rd, std::string xid)
-{
-    const u_int32_t sblk_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
-    const u_int32_t amagic = RHM_JDAT_TXA_MAGIC;
-    const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
-
-    // Check last record ends on sblk boundary
-    assert(rd._eo % sblk_size == 0);
-    
-    // Find fid and posn to write
-    u_int16_t fid = rd._lfid;
-    std::streampos file_pos = rd._eo;
-    if (rd._eo/sblk_size >= _jfsize_sblks) // file full, use next file
-    {
-        if (++fid >= _num_jfiles)
-        {
-            fid = 0;
-            rd._owi = !rd._owi;
-        }
-        file_pos = 0;
-    }
-
-    // Prepare a buffer of at least 1 sblock
-    u_int32_t abort_dblks = txn_rec::size_dblks(sizeof(txn_hdr) + xid.size());
-    std::size_t buffsize = abort_dblks < JRNL_SBLK_SIZE ? JRNL_SBLK_SIZE : abort_dblks;
-    void* buff = std::malloc(buffsize * JRNL_DBLK_SIZE);
-    assert(buff != 0);
-
-    // Initialize file stream
-    std::ostringstream fn;
-    fn << _jdir.dirname() << "/" << _base_filename << ".";
-    fn << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
-    std::ofstream ofs(fn.str().c_str(),
-            std::ios_base::in | std::ios_base::out | std::ios_base::binary);
-    if (!ofs.good())
-        throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl", "rcvr_write_abort");
-    if (file_pos)
-        ofs.seekp(file_pos);
-    else
-    {
-        // New file, write new file header
-        file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, ++rd._h_rid, fid, sblk_size, rd._owi, true);
-        ofs.write((const char*)&fh, sizeof(file_hdr));
-        assert(!ofs.fail());
-        // fill remainder of sblk with fill char
-        std::memset((char*)buff, RHM_CLEAN_CHAR, sblk_size - sizeof(file_hdr));
-        ofs.write((const char*)buff, sblk_size - sizeof(file_hdr));
-        assert(!ofs.fail());
-        // log write action
-        std::ostringstream oss;
-        oss << "Recover phase write: File header in fid=" << fid << " at offs=0x0 for txn abort record";
-        this->log(LOG_NOTICE, oss.str());
-        file_pos = ofs.tellp();
-    }
-    
-    // Write abort record
-    txn_rec ar(amagic, ++rd._h_rid, xid.data(), xid.size(), rd._owi);
-    ar.encode(buff, 0, abort_dblks);
-    ofs.write((const char*)buff, JRNL_DBLK_SIZE);
-    assert(!ofs.fail());
-    // log write action
-    std::ostringstream oss;
-    oss << std::hex << std::setfill('0') << "Recover phase write: Aborted unprepared transaction xid=\"";
-    for (std::size_t i=0; i<xid.size(); i++)
-    {
-        int c = xid.at(i);
-        if (std::isprint(c))
-            oss << (char)c;
-        else 
-            oss << "\\x" << std::setw(2) << c;
-    }
-    oss << "\" at offs=0x" << file_pos;
-    this->log(LOG_NOTICE, oss.str());
-    file_pos = ofs.tellp();
-
-    // Prepare filler record
-    std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
-    std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
-    // Write filler as many times as needed to get to next sblk boundary
-    while (file_pos % sblk_size)
-    {
-        ofs.write((const char*)buff, JRNL_DBLK_SIZE);
-        assert(!ofs.fail());
-        std::ostringstream oss;
-        oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
-        this->log(LOG_NOTICE, oss.str());
-        file_pos = ofs.tellp();
-    }
-    rd._eo = file_pos;
-
-    // Clean up
-    ofs.close();
-    std::free(buff);
-}
-
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -624,7 +624,7 @@
         virtual void log(log_level level, const std::string& log_stmt) const;
         virtual void log(log_level level, const char* const log_stmt) const;
 
-        // these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:        
+        // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:        
         void chk_wr_frot();
         inline u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }
 
@@ -678,8 +678,6 @@
                 std::streampos& read_pos);
         
         void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
-
-        void rcvr_write_abort(rcvdat& rd, std::string xid);
     };
 
 } // namespace journal

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -371,7 +371,7 @@
         if (_jc->unflushed_dblks() > 0)
             _jc->flush();
         else
-        return RHM_IORES_EMPTY;
+            return RHM_IORES_EMPTY;
     }
 
     // Check write state of this token is ENQ - required for read

Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -50,15 +50,27 @@
 rrfc::~rrfc() {}
 
 void
-rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr, u_int32_t fc_index)
+rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp)
 {
     _nfiles = nfiles;
     _fc_arr = fc_arr;
-    reset(fc_index);
+    if (rdp)
+    {
+        // Find first file with enqueued records
+        u_int16_t index = rdp->_ffid;
+        while (index != rdp->_lfid && rdp->_enq_cnt_list[index] == 0)
+        {
+            if (++index >= _nfiles)
+                index = 0;
+        }
+        reset(index);
+    }
+    else
+        reset(0);
 }
 
 void
-rrfc::reset(u_int32_t fc_index)
+rrfc::reset(u_int16_t fc_index)
 {
     _fc_index = fc_index;
     _curr_fc = _fc_arr[_fc_index];

Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -70,11 +70,12 @@
         * \param nfiles Number of files in the rotating file group.
         * \param fc_arr Pointer to an array of file controller, each of which correspond to one of
         *     the physical journal files.
-        * \param fc_index Initial index of journal file. Default = 0.
+        * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
+        *     0 (NULL).
         */
-        virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr, u_int32_t fc_index = 0);
+        virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp = 0);
 
-        void reset(u_int32_t fc_index = 0);
+        void reset(u_int16_t fc_index = 0);
 
         /**
         * \brief Rotate active file controller to next file in rotating file group.

Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp	2008-08-21 20:24:28 UTC (rev 2341)
@@ -339,6 +339,8 @@
         if (size_read < _txn_hdr._xidsize - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }
@@ -352,6 +354,8 @@
         if (size_read < sizeof(rec_tail) - offs)
         {
             assert(ifsp->eof());
+            // As we may have read past eof, turn off fail bit
+            ifsp->clear(ifsp->rdstate()&(~std::ifstream::failbit));
             return false;
         }
     }

Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-21 19:15:52 UTC (rev 2340)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-21 20:24:28 UTC (rev 2341)
@@ -104,19 +104,20 @@
         raise Exception('Inconsistent data size: dsize=%d, data(%d)=\"%s\"' % (dsize, len(data), datastr))
     return 'data(%d)=\"%s\" ' % (dsize, datastr)
 
-def hex_split_str(s):
-    hex_str = ''
-    if len(s) > 10:
-        for i in range(0, 5):
-            hex_str += '%02x ' % ord(s[i])
-        hex_str += '... '
-        for i in range(len(s)-5, len(s)):
-            hex_str += '%02x ' % ord(s[i])
-    else:
-        for i in range(0, len(s)):
-            hex_str += '%02x ' % ord(s[i])
-    return hex_str
+def hex_split_str(s, split_size = 50):
+    if len(s) <= split_size:
+        return hex_str(s, 0, len(s))
+    return hex_str(s, 0, 10) + ' ... ' + hex_str(s, len(s)-10, len(s))
 
+def hex_str(s, b, e):
+    o = ''
+    for i in range(b, e):
+        if isprintable(s[i]):
+            o += s[i]
+        else:
+            o += '\\%02x' % ord(s[i])
+    return o
+
 def split_str(s):
     if len(s) > 25:
         return s[:12] + ' ... ' + s[-10:]
@@ -540,7 +541,7 @@
                             if hdr.xid == None:
                                 if hdr.deq_rid in self.emap:
                                     if self.emap[hdr.deq_rid][2]:
-                                        warn = ' (WARNING: dequeue rid %d dequeues locked enqueue records %d)' % (hdr.rid, hdr.deq_rid)
+                                        warn = ' (WARNING: dequeue rid 0x%x dequeues locked enqueue record 0x%x)' % (hdr.rid, hdr.deq_rid)
                                     del self.emap[hdr.deq_rid]
                                 else:
                                     warn = ' (WARNING: rid being dequeued %d not found in enqueued records)' % hdr.deq_rid




More information about the rhmessaging-commits mailing list