[rhmessaging-commits] rhmessaging commits: r1442 - store/trunk/cpp/lib/jrnl.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Dec 7 12:09:35 EST 2007


Author: kpvdr
Date: 2007-12-07 12:09:35 -0500 (Fri, 07 Dec 2007)
New Revision: 1442

Modified:
   store/trunk/cpp/lib/jrnl/data_tok.cpp
   store/trunk/cpp/lib/jrnl/data_tok.hpp
   store/trunk/cpp/lib/jrnl/deq_rec.cpp
   store/trunk/cpp/lib/jrnl/enq_rec.cpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jrec.cpp
   store/trunk/cpp/lib/jrnl/txn_rec.cpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Fix for BZ413021 - Journal recovery fails...  This includes new decode functions that examine the record tails for incomplete and corrupt records, and a new check for sblk alignment of the last record. The recover process now writes filler records to the journal to get it back to a recoverable and usable state.

Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -135,8 +135,8 @@
             return "SKIP_PART";
         case READ:
             return "READ";
+    // Not using default: forces compiler to ensure all cases are covered.
     }
-    // Not using default: forces compiler to ensure all cases are covered.
     return "<rstate unknown>";
 }
 

Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -33,8 +33,6 @@
 #ifndef rhm_journal_data_tok_hpp
 #define rhm_journal_data_tok_hpp
 
-#include <qpid/RefCounted.h>
-
 namespace rhm
 {
 namespace journal
@@ -54,6 +52,7 @@
 #include <boost/intrusive_ptr.hpp>
 #include <pthread.h>
 #include <qpid/broker/PersistableMessage.h>
+#include <qpid/RefCounted.h>
 #include <sys/types.h>
 #include <jrnl/jexception.hpp>
 
@@ -68,7 +67,7 @@
     * \brief Data block token (data_tok) used to track wstate of a data block through asynchronous
     *     I/O process
     */
-class data_tok : public qpid::RefCounted
+    class data_tok : public qpid::RefCounted
     {
     public:
         // TODO: Fix this, separate write state from operation

Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -315,23 +315,12 @@
     }
     return size_dblks(rd_cnt);
 }
-
+        
 const bool 
 deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
 {
-    if (rec_offs) // Contunue decoding xid from previous decode call
+    if (rec_offs == 0)
     {
-        ifsp->read((char*)_buff + rec_offs, _deq_hdr._xidsize - rec_offs);
-        size_t size_read = ifsp->gcount();
-        if (size_read < _deq_hdr._xidsize - rec_offs)
-        {
-            assert(ifsp->eof());
-            rec_offs += size_read;
-            return false;
-        }
-    }
-    else // Start at beginning of record
-    {
         _deq_hdr._hdr.copy(h);
         ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t));
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
@@ -341,22 +330,44 @@
 #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
         ifsp->ignore(sizeof(u_int32_t)); // _filler0
 #endif
+        rec_offs = sizeof(_deq_hdr);
+        // Read header, allocate (if req'd) for xid
         if (_deq_hdr._xidsize)
         {
             _buff = ::malloc(_deq_hdr._xidsize);
-            MALLOC_CHK(_buff, "_buff", "deq_rec", "rcv_decode");
-            // Decode xid
-            ifsp->read((char*)_buff, _deq_hdr._xidsize);
-            size_t size_read = ifsp->gcount();
-            if (size_read < _deq_hdr._xidsize)
-            {
-                assert(ifsp->eof());
-                rec_offs = size_read;
-                return false;
-            }
+            MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) - _deq_hdr._xidsize);
+    if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
+    {
+        // Read xid (or continue reading xid)
+        size_t offs = rec_offs - sizeof(_deq_hdr);
+        ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+        size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < _deq_hdr._xidsize - offs)
+        {
+            assert(ifsp->eof());
+            return false;
+        }
+    }
+    if (rec_offs < sizeof(_deq_hdr) +
+            (_deq_hdr._xidsize ? _deq_hdr._xidsize + sizeof(rec_tail) : 0))
+    {
+        // Read tail (or continue reading tail)
+        size_t offs = rec_offs - sizeof(_deq_hdr) - _deq_hdr._xidsize;
+        ifsp->read((char*)&_deq_tail + offs, sizeof(rec_tail) - offs);
+        size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < sizeof(rec_tail) - offs)
+        {
+            assert(ifsp->eof());
+            return false;
+        }
+    }
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+    if (_deq_hdr._xidsize)
+        chk_tail(); // Throws if tail invalid or record incomplete
     return true;
 }
 

Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -434,19 +434,9 @@
 const bool
 enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
 {
-    if (rec_offs) // Contunue decoding xid from previous decode call
+    if (rec_offs == 0)
     {
-        ifsp->read((char*)_buff + rec_offs, _enq_hdr._xidsize - rec_offs);
-        size_t size_read = ifsp->gcount();
-        if (size_read < _enq_hdr._xidsize - rec_offs)
-        {
-            assert(ifsp->eof());
-            rec_offs += size_read;
-            return false;
-        }
-    }
-    else // Start at beginning of record
-    {
+        // Read header, allocate (if req'd) for xid
         _enq_hdr._hdr.copy(h);
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
         ifsp->ignore(sizeof(u_int32_t)); // _filler0
@@ -462,22 +452,60 @@
 #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
         ifsp->ignore(sizeof(u_int32_t)); // _filler1
 #endif
+        rec_offs = sizeof(_enq_hdr);
         if (_enq_hdr._xidsize)
         {
             _buff = ::malloc(_enq_hdr._xidsize);
             MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
-            // Decode xid
-            ifsp->read((char*)_buff, _enq_hdr._xidsize);
+        }
+    }
+    if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
+    {
+        // Read xid (or continue reading xid)
+        size_t offs = rec_offs - sizeof(_enq_hdr);
+        ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+        size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < _enq_hdr._xidsize - offs)
+        {
+            assert(ifsp->eof());
+            return false;
+        }
+    }
+    if (!_enq_hdr.is_external())
+    {
+        if (rec_offs < sizeof(hdr) + _enq_hdr._xidsize +  _enq_hdr._dsize)
+        {
+            // Ignore data (or continue ignoring data)
+            size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+            ifsp->ignore(_enq_hdr._dsize - offs);
             size_t size_read = ifsp->gcount();
-            if (size_read < _enq_hdr._xidsize)
+            rec_offs += size_read;
+            if (size_read < _enq_hdr._dsize - offs)
             {
                 assert(ifsp->eof());
-                rec_offs = size_read;
                 return false;
             }
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) - _enq_hdr._xidsize);
+    if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +
+            (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize) + sizeof(rec_tail))
+    {
+        // Read tail (or continue reading tail)
+        size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
+        if (!_enq_hdr.is_external())
+            offs -= _enq_hdr._dsize;
+        ifsp->read((char*)&_enq_tail + offs, sizeof(rec_tail) - offs);
+        size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < sizeof(rec_tail) - offs)
+        {
+            assert(ifsp->eof());
+            return false;
+        }
+    }
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+    chk_tail(); // Throws if tail invalid or record incomplete
     return true;
 }
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -1,34 +1,34 @@
 /**
- * \file jcntl.cpp
- *
- * Red Hat Messaging - Message Journal
- *
- * Messaging journal top-level control and interface class
- * rhm::journal::jcntl.  See comments in file jcntl.hpp for details.
- *
- * \author Kim van der Riet
- *
- * Copyright 2007 Red Hat, Inc.
- *
- * This file is part of Red Hat Messaging.
- *
- * Red Hat Messaging is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
- * USA
- *
- * The GNU Lesser General Public License is available in the file COPYING.
- */
+* \file jcntl.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* Messaging journal top-level control and interface class
+* rhm::journal::jcntl.  See comments in file jcntl.hpp for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
 
 
 #include <jrnl/jcntl.hpp>
@@ -51,7 +51,7 @@
 // Functions
 
 jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
-             const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
+        const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
     _jid(jid),
     _jdir(jdir, base_filename),
     _base_filename(base_filename),
@@ -90,7 +90,7 @@
 
 void
 jcntl::initialize(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb,
-                  std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
+        std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb) throw (jexception)
 {
     // Prepare journal dir, journal files and file handles
     _jdir.clear_dir();
@@ -132,21 +132,22 @@
 
 void
 jcntl::recover(std::deque<data_tok*>* rdtoklp, const aio_cb rd_cb, std::deque<data_tok*>* wdtoklp,
-               const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
-    throw (jexception)
+        const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
+        throw (jexception)
 {
     // Verify journal dir and journal files
     _jdir.verify_dir();
     _rcvdat.reset();
     _emap.clear();
     _tmap.clear();
+//std::cout << "Starting journal analysis..." << std::endl;
     rcvr_janalyze(_rcvdat, prep_txn_list);
     highest_rid = _rcvdat._h_rid;
     if (_rcvdat._full)
         throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
 
-    // Debug info, but may be useful to print with a flag
-    //_rcvdat.print(_jid);
+// Debug info, but may be useful to print with a flag
+//_rcvdat.print(_jid);
 
     if (_datafh)
     {
@@ -177,6 +178,7 @@
     
     _readonly_flag = true;
     _init_flag = true;
+//std::cout << "Journal analysis complete." << std::endl;
 }
 
 void
@@ -190,7 +192,7 @@
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
     _rmgr.recover_complete(_rcvdat._fro);
     _readonly_flag = false;
-    //std::cout << "Journal revovery complete." << std::endl;
+//std::cout << "Journal revovery complete." << std::endl;
 }
 
 void 
@@ -203,8 +205,8 @@
 
 const iores
 jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
-                           const size_t this_data_len, data_tok* dtokp, const bool transient)
-    throw (jexception)
+        const size_t this_data_len, data_tok* dtokp, const bool transient)
+        throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_data_record");
@@ -212,7 +214,7 @@
     try
     {
         res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
-                            false);
+                false);
     }
     catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
     pthread_mutex_unlock(&_mutex);
@@ -221,7 +223,7 @@
 
 const iores
 jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
-    throw (jexception)
+        throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_extern_data_record");
@@ -237,8 +239,8 @@
 
 const iores
 jcntl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
-                               const size_t this_data_len, data_tok* dtokp, const std::string& xid,
-                               const bool transient) throw (jexception)
+        const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+        const bool transient) throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_tx_data_record");
@@ -246,7 +248,7 @@
     try
     {
         res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
-                            transient, false);
+                transient, false);
     }
     catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
     pthread_mutex_unlock(&_mutex);
@@ -255,7 +257,7 @@
 
 const iores
 jcntl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
-                                      const std::string& xid, const bool transient) throw (jexception)
+        const std::string& xid, const bool transient) throw (jexception)
 {
     iores res;
     check_wstatus("enqueue_extern_txn_data_record");
@@ -271,7 +273,7 @@
 
 const iores
 jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
-                       const void** const data, bool auto_discard) throw (jexception)
+        const void** const data, bool auto_discard) throw (jexception)
 {
     check_rstatus("get_data_record");
     return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
@@ -286,7 +288,7 @@
 
 const iores
 jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
-                        bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
+        bool& transient, bool& external, data_tok* const dtokp) throw (jexception)
 {
     check_rstatus("read_data");
     return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
@@ -466,14 +468,14 @@
         _num_jfiles = ji.num_jfiles();
         _rcvdat._enq_cnt_list.resize(_num_jfiles);
         std::cout << "WARNING: Recovery found " << _num_jfiles <<
-            " files (different from --num-jfiles parameter value)." << std::endl;
+                " files (different from --num-jfiles parameter value)." << std::endl;
     }
     if (_jfsize_sblks != ji.jfsize_sblks())
     {
         _jfsize_sblks = ji.jfsize_sblks();
         std::cout << "WARNING: Recovery found file size = " <<
-            (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
-            " (different from --jfile-size-pgs parameter value)." << std::endl;
+                (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+                " (different from --jfile-size-pgs parameter value)." << std::endl;
     }
 
     try
@@ -499,16 +501,16 @@
         if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
             rd._full = true;
         
-        std::vector<std::string> xid_list;
-        _tmap.xid_list(xid_list);
-        for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
-             itr++)
-        {
-            std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
-                                                                      prep_txn_list.end(), *itr);
-            if (pitr == prep_txn_list.end())
-                _tmap.get_remove_tdata_list(*itr);
-        } 
+		std::vector<std::string> xid_list;
+		_tmap.xid_list(xid_list);
+		for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
+                itr++)
+		{
+		    std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list.begin(),
+                    prep_txn_list.end(), *itr);
+			if (pitr == prep_txn_list.end())
+				_tmap.get_remove_tdata_list(*itr);
+		} 
     }
 }
 
@@ -516,7 +518,6 @@
 jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception)
 {
     size_t cum_size_read = 0;
-    bool done = false;
     void* xidp = NULL;
     hdr h;
     if (!jfile_cycle(fid, ifsp, rd, true))
@@ -525,159 +526,159 @@
     ifsp->read((char*)&h, sizeof(hdr));
     switch(h._magic)
     {
-      case RHM_JDAT_ENQ_MAGIC:
-        {
-            if (!check_owi(fid, h, rd, read_pos))
-                return false;
-            enq_rec er;
-            while (!done)
+        case RHM_JDAT_ENQ_MAGIC:
             {
-                done = er.rcv_decode(h, ifsp, cum_size_read);
-                if (!jfile_cycle(fid, ifsp, rd, true))
+                enq_rec er;
+                if (!decode(er, fid, ifsp, cum_size_read, h, rd, read_pos))
                     return false;
+                if (!er.is_transient()) // Ignore transient msgs
+                {
+                    rd._enq_cnt_list[fid]++;
+                    if (er.xid_size())
+                    {
+                        er.get_xid(&xidp);
+                        assert(xidp != NULL);
+                        std::string xid((char*)xidp, er.xid_size());
+                        _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+                        ::free(xidp);
+                    }
+                    else
+                        _emap.insert_fid(h._rid, fid);
+                }
             }
-            if (!er.is_transient()) // Ignore transient msgs
+            break;
+        case RHM_JDAT_DEQ_MAGIC:
             {
-                rd._enq_cnt_list[fid]++;
-                if (er.xid_size())
+                deq_rec dr;
+                if (!decode(dr, fid, ifsp, cum_size_read, h, rd, read_pos))
+                    return false;
+                if (dr.xid_size())
                 {
-                    er.get_xid(&xidp);
+                    // If the enqueue is part of a pending txn, it will not yet be in emap
+                    try { _emap.lock(dr.deq_rid()); }
+                    catch(const jexception& e)
+                    {
+                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                    }
+                    dr.get_xid(&xidp);
                     assert(xidp != NULL);
-                    std::string xid((char*)xidp, er.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
+                    std::string xid((char*)xidp, dr.xid_size());
+                    _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
                     ::free(xidp);
                 }
                 else
-                    _emap.insert_fid(h._rid, fid);
+                {
+                    try
+                    {
+                        u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
+                        rd._enq_cnt_list[enq_fid]--;
+                    }
+                    catch(const jexception& e)
+                    {
+                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                    }
+                }
             }
-        }
-        break;
-      case RHM_JDAT_DEQ_MAGIC:
-        {
-            if (!check_owi(fid, h, rd, read_pos))
-                return false;
-            deq_rec dr;
-            while (!done)
+            break;
+        case RHM_JDAT_TXA_MAGIC:
             {
-                done = dr.rcv_decode(h, ifsp, cum_size_read);
-                if (!jfile_cycle(fid, ifsp, rd, true))
+                txn_rec ar;
+                if (!decode(ar, fid, ifsp, cum_size_read, h, rd, read_pos))
                     return false;
-            }
-            if (dr.xid_size())
-            {
-                // If the enqueue is part of a pending txn, it will not yet be in emap
-                try { _emap.lock(dr.deq_rid()); }
-                catch(const jexception& e)
+                // Delete this txn from tmap, unlock any locked records in emap
+                ar.get_xid(&xidp);
+                assert(xidp != NULL);
+                std::string xid((char*)xidp, ar.xid_size());
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                 {
-                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                    try 
+					{ 
+						if (!itr->_enq_flag)
+							_emap.unlock(itr->_drid);
+					}
+                    catch(const jexception& e)
+                    {
+                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                    }
+                    if (itr->_enq_flag)
+                        rd._enq_cnt_list[itr->_fid]--;
                 }
-                dr.get_xid(&xidp);
-                assert(xidp != NULL);
-                std::string xid((char*)xidp, dr.xid_size());
-                _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
                 ::free(xidp);
             }
-            else
+            break;
+        case RHM_JDAT_TXC_MAGIC:
             {
-                try
+                txn_rec cr;
+                if (!decode(cr, fid, ifsp, cum_size_read, h, rd, read_pos))
+                    return false;
+                // Delete this txn from tmap, process records into emap
+                cr.get_xid(&xidp);
+                assert(xidp != NULL);
+                std::string xid((char*)xidp, cr.xid_size());
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                 {
-                    u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
-                    rd._enq_cnt_list[enq_fid]--;
+                    if (itr->_enq_flag) // txn enqueue
+                        _emap.insert_fid(itr->_rid, itr->_fid);
+                    else // txn dequeue
+                    {
+                        u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+                        rd._enq_cnt_list[fid]--;
+                    }
                 }
-                catch(const jexception& e)
-                {
-                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
-                }
+                ::free(xidp);
             }
-        }
-        break;
-      case RHM_JDAT_TXA_MAGIC:
-        {
-            if (!check_owi(fid, h, rd, read_pos))
-                return false;
-            txn_rec ar;
-            while (!done)
+            break;
+        case RHM_JDAT_EMPTY_MAGIC:
             {
-                done = ar.rcv_decode(h, ifsp, cum_size_read);
-                if (!jfile_cycle(fid, ifsp, rd, true))
-                    return false;
+                u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+                ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
             }
-            // Delete this txn from tmap, unlock any locked records in emap
-            ar.get_xid(&xidp);
-            assert(xidp != NULL);
-            std::string xid((char*)xidp, ar.xid_size());
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            break;
+        case 0:
+            rd._lfid = fid;
+            rd._eo = ifsp->tellg();
+            return false;
+        default:
+            // Is this the last file, if so, stop as this is the overwrite boundary.
+            if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
             {
-                try 
-                { 
-                    if (!itr->_enq_flag)
-                        _emap.unlock(itr->_drid);
-                }
-                catch(const jexception& e)
-                {
-                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
-                }
-                if (itr->_enq_flag)
-                    rd._enq_cnt_list[itr->_fid]--;
-            }
-            ::free(xidp);
-        }
-        break;
-      case RHM_JDAT_TXC_MAGIC:
-        {
-            if (!check_owi(fid, h, rd, read_pos))
+                rd._lfid = fid;
+                rd._eo = read_pos;
                 return false;
-            txn_rec cr;
-            while (!done)
-            {
-                done = cr.rcv_decode(h, ifsp, cum_size_read);
-                if (!jfile_cycle(fid, ifsp, rd, true))
-                    return false;
             }
-            // Delete this txn from tmap, process records into emap
-            cr.get_xid(&xidp);
-            assert(xidp != NULL);
-            std::string xid((char*)xidp, cr.xid_size());
-            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
-            {
-                if (itr->_enq_flag) // txn enqueue
-                    _emap.insert_fid(itr->_rid, itr->_fid);
-                else // txn dequeue
-                {
-                    u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
-                    rd._enq_cnt_list[fid]--;
-                }
-            }
-            ::free(xidp);
-        }
-        break;
-      case RHM_JDAT_EMPTY_MAGIC:
-        {
-            u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
-            ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
-        }
-        break;
-      case 0:
-        rd._lfid = fid;
-        rd._eo = ifsp->tellg();
+            std::stringstream ss;
+            ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+            ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
+            throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
+                    "rcvr_get_next_record");
+    }
+    return true;
+}
+
+const bool
+jcntl::decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read, hdr& h,
+        rcvdat& rd, std::streampos& rec_offset)
+{
+    if (!check_owi(fid, h, rd, rec_offset))
         return false;
-      default:
-        // Is this the last file, if so, stop as this is the overwrite boundary.
-        if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
+    bool done = false;
+    while (!done)
+    {
+        try { done = rec.rcv_decode(h, ifsp, cum_size_read); }
+        catch (const jexception& e)
         {
+            if (e.err_code() != jerrno::JERR_JREC_BADRECTAIL ||
+                    fid != (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1)) throw;
+            check_journal_alignment(fid, rec_offset);
             rd._lfid = fid;
-            rd._eo = read_pos;
+            rd._eo = rec_offset;
             return false;
         }
-        std::stringstream ss;
-        ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-        ss << " fid=" << fid << " foffs=0x" << std::setw(8) << read_pos;
-        throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str().c_str(), "jcntl",
-                         "rcvr_get_next_record");
+        if (!jfile_cycle(fid, ifsp, rd, false))
+            return false;
     }
-
     return true;
 }
 
@@ -705,7 +706,7 @@
         std::stringstream ss;
         ss << _jdir.dirname() << "/" << _base_filename << ".";
         ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
-        ifsp->open(ss.str().c_str());
+        ifsp->open(ss.str().c_str(), std::ios_base::in | std::ios_base::binary);
         if (!ifsp->good())
             throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl", "jfile_cycle");
 
@@ -730,13 +731,15 @@
 }
         
 const bool
-jcntl::check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos) throw (jexception)
+jcntl::check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
+        throw (jexception)
 {
     if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
     {
         u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
         if (fid == expected_fid)
         {
+            check_journal_alignment(fid, read_pos);
             rd._lfid = fid;
             rd._eo = read_pos;
             return false;
@@ -747,43 +750,90 @@
         ss << " foffs=0x" << std::setw(8) << read_pos;
         ss << " expected_fid=0x" << std::setw(4) << expected_fid;
         throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, ss.str().c_str(), "jcntl",
-                         "check_owi");
+                "check_owi");
     }
     if (rd._h_rid < h._rid)
         rd._h_rid = h._rid;
     return true;
 }
+        
 
 void
+jcntl::check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+        throw (jexception)
+{
+    unsigned sblk_offs = rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE);
+    if (sblk_offs)
+    {
+        // TODO: Connect the following with logger:
+        std::cout << std::hex  << "INFO: Bad record alignment found at fid=0x" << fid <<
+                " offs=0x" << rec_offset << " (likely journal overwrite boundary); " <<
+                (JRNL_SBLK_SIZE - (sblk_offs/JRNL_DBLK_SIZE)) <<
+                " filler record(s) required." << std::endl;
+        const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+        std::stringstream ss;
+        ss << _jdir.dirname() << "/" << _base_filename << ".";
+        ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+        std::ofstream ofsp(ss.str().c_str(),
+                std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+        if (!ofsp.good())
+            throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl",
+                    "check_journal_alignment");
+        ofsp.seekp(rec_offset);
+        void* buff = ::malloc(JRNL_DBLK_SIZE);
+        assert(buff != NULL);
+        ::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+        // Normally, RHM_CLEAN must be set before these fills are done, but this is a recover
+        // situation (i.e. performance is not an issue), and it makes the location of the write
+        // clear should inspection of the file be required.
+        ::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+
+        while (rec_offset % (JRNL_DBLK_SIZE * JRNL_SBLK_SIZE))
+        {
+            ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
+            assert(!ofsp.fail());
+            // TODO: Connect the following with logger:
+            std::cout << "INFO: * Wrote filler record at offs=0x" << rec_offset << std::endl;
+            rec_offset = ofsp.tellp();
+        }
+        ofsp.close();
+        ::free(buff);
+        // TODO: Connect the following with logger:
+        std::cout << "INFO: Bad record alignment fixed." << std::endl;
+    }
+}
+
+
+void
 jcntl::aio_wr_callback(jcntl*  journal, u_int32_t num_dtoks)
 {
-    //kpvdr TODO -- this list needs to be mutexed...???
+//kpvdr TODO -- this list needs to be mutexed...???
     std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
-                                                       journal->_aio_wr_cmpl_dtok_list.end());
+            journal->_aio_wr_cmpl_dtok_list.end());
 	    
     journal->_aio_wr_cmpl_dtok_list.clear();
     for (u_int32_t i=0; i<num_dtoks; i++)
     {
         data_tok*& dtokp = this_dtok_list.front();
-        if (!journal->is_stopped() && dtokp->getSourceMessage())
-        {
-            switch (dtokp->wstate())
-            {
-              case data_tok::ENQ:
-                dtokp->getSourceMessage()->enqueueComplete();
-                break;
-              case data_tok::DEQ:
-                /* Don't need to signal until we have a way to ack completion of dequeue in AMQP
-                   dtokp->getSourceMessage()->dequeueComplete();
-                   if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
-                   dtokp->getSourceMessage()->setPersistenceId(0);
-                */
-                break;
-              default:
-                ;
-            }
-        }
-        dtokp->release();
+		if (!journal->is_stopped() && dtokp->getSourceMessage())
+		{
+			switch (dtokp->wstate())
+			{
+ 				case data_tok::ENQ:
+     	         	dtokp->getSourceMessage()->enqueueComplete();
+ 					break;
+				case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+     	        	dtokp->getSourceMessage()->dequeueComplete();
+		     		if ( dtokp->getSourceMessage()->isDequeueComplete()  ) // clear id after last dequeue
+		         		dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+					break;
+				default:
+					;
+			}
+		}
+		dtokp->release();
         this_dtok_list.pop_front();
     }
 }
@@ -792,21 +842,21 @@
 jcntl::aio_rd_callback(jcntl*  journal, u_int32_t num_dtoks)
 {
 
-    //kpvdr TODO -- can we get rid of the copy???
+//kpvdr TODO -- can we get rid of the copy???
     std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
-                                                       journal->_aio_rd_cmpl_dtok_list.end());
+            journal->_aio_rd_cmpl_dtok_list.end());
     journal->_aio_rd_cmpl_dtok_list.clear();
     for (u_int32_t i=0; i<num_dtoks; i++)
     {
         data_tok*& dtokp = this_dtok_list.front();
-        if (!journal->is_stopped() && dtokp->getSourceMessage())
-        {
-            if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
-            {
+		if (!journal->is_stopped() && dtokp->getSourceMessage())
+		{
+        	if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+         	{
                 // cct call the recovery manager. / lazyload.. 
-            }
-        }
-        dtokp->release();
+        	}
+		}
+	    dtokp->release();
         this_dtok_list.pop_front();
     }
     

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -644,14 +644,22 @@
         void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
                 throw (jexception);
 
-        const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd) throw (jexception);
+        const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd)
+                throw (jexception);
 
+        const bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, size_t& cum_size_read,
+                hdr& h, rcvdat& rd, std::streampos& rec_offset);
+
         const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
                 const bool jump_fro);
         
-        const bool check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos)
+        const bool check_owi(const u_int16_t fid, hdr& h, rcvdat& rd, std::streampos& read_pos)
                 throw (jexception);
+        
+        void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset)
+                throw (jexception);
 
+
         /**
         * \brief Analyze a particular journal file for recovery.
         *

Modified: store/trunk/cpp/lib/jrnl/jrec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.cpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/jrec.cpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -52,14 +52,14 @@
     {
         std::stringstream ss;
         ss << std::hex << std::setfill('0');
-        ss << "enq magic NULL: rid=0x" << std::setw(16) << hdr._rid;
+        ss << "enq magic NULL: rid=0x" << hdr._rid;
         throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
     }
     if (hdr._version != RHM_JDAT_VERSION)
     {
         std::stringstream ss;
         ss << std::hex << std::setfill('0');
-        ss << "version: rid=0x" << std::setw(16) << hdr._rid;
+        ss << "version: rid=0x" << hdr._rid;
         ss << ": expected=0x" << std::setw(2) << (int)RHM_JDAT_VERSION;
         ss << " read=0x" << std::setw(2) << (int)hdr._version;
         throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
@@ -73,7 +73,7 @@
     {
         std::stringstream ss;
         ss << std::hex << std::setfill('0');
-        ss << "endian_flag: rid=" << std::setw(16) << hdr._rid;
+        ss << "endian_flag: rid=" << hdr._rid;
         ss << ": expected=0x" << std::setw(2) << (int)endian_flag;
         ss << " read=0x" << std::setw(2) << (int)hdr._eflag;
         throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
@@ -87,8 +87,8 @@
     {
         std::stringstream ss;
         ss << std::hex << std::setfill('0');
-        ss << "rid mismatch: expected=0x" << std::setw(16) << rid;
-        ss << " read=0x" << std::setw(16) << hdr._rid;
+        ss << "rid mismatch: expected=0x" << rid;
+        ss << " read=0x" << hdr._rid;
         throw jexception(jerrno::JERR_JREC_BADRECHDR, ss.str().c_str(), "jrec", "chk_hdr");
     }
 }
@@ -100,17 +100,17 @@
     {
         std::stringstream ss;
         ss << std::hex << std::setfill('0');
-        ss << "magic: rid=0x" << std::setw(16) << hdr._rid;
-        ss << ": expected=0x" << std::setw(8) << ~hdr._magic;
-        ss << " read=0x" << std::setw(8) << tail._xmagic;
+        ss << "magic: rid=0x" << hdr._rid;
+        ss << ": expected=0x" << ~hdr._magic;
+        ss << " read=0x" << tail._xmagic;
         throw jexception(jerrno::JERR_JREC_BADRECTAIL, ss.str().c_str(), "jrec", "chk_tail");
     }
     if (tail._rid != hdr._rid)
     {
         std::stringstream ss;
         ss << std::hex << std::setfill('0');
-        ss << "rid: rid=0x" << std::setw(16) << hdr._rid;
-        ss << ": read=0x" << std::setw(16) << tail._rid;
+        ss << "rid: rid=0x" << hdr._rid;
+        ss << ": read=0x" << tail._rid;
         throw jexception(jerrno::JERR_JREC_BADRECTAIL, ss.str().c_str(), "jrec", "chk_tail");
     }
 }

Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -314,19 +314,9 @@
 const bool 
 txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
 {
-    if (rec_offs) // Contunue decoding xid from previous decode call
+    if (rec_offs == 0)
     {
-        ifsp->read((char*)_buff + rec_offs, _txn_hdr._xidsize - rec_offs);
-        size_t size_read = ifsp->gcount();
-        if (size_read < _txn_hdr._xidsize - rec_offs)
-        {
-            assert(ifsp->eof());
-            rec_offs += size_read;
-            return false;
-        }
-    }
-    else // Start at beginning of record
-    {
+        // Read header, allocate for xid
         _txn_hdr._hdr.copy(h);
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
         ifsp->ignore(sizeof(u_int32_t)); // _filler0
@@ -335,19 +325,38 @@
 #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
         ifsp->ignore(sizeof(u_int32_t)); // _filler0
 #endif
+        rec_offs = sizeof(_txn_hdr);
         _buff = ::malloc(_txn_hdr._xidsize);
         MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
-        // Decode xid
-        ifsp->read((char*)_buff, _txn_hdr._xidsize);
+    }
+    if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize)
+    {
+        // Read xid (or continue reading xid)
+        size_t offs = rec_offs - sizeof(_txn_hdr);
+        ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
         size_t size_read = ifsp->gcount();
-        if (size_read < _txn_hdr._xidsize)
+        rec_offs += size_read;
+        if (size_read < _txn_hdr._xidsize - offs)
         {
             assert(ifsp->eof());
-            rec_offs = size_read;
             return false;
         }
     }
-    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
+    if (rec_offs < sizeof(_txn_hdr) + _txn_hdr._xidsize + sizeof(rec_tail))
+    {
+        // Read tail (or continue reading tail)
+        size_t offs = rec_offs - sizeof(_txn_hdr) - _txn_hdr._xidsize;
+        ifsp->read((char*)&_txn_tail + offs, sizeof(rec_tail) - offs);
+        size_t size_read = ifsp->gcount();
+        rec_offs += size_read;
+        if (size_read < sizeof(rec_tail) - offs)
+        {
+            assert(ifsp->eof());
+            return false;
+        }
+    }
+    ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - rec_size());
+    chk_tail(); // Throws if tail invalid or record incomplete
     return true;
 }
 
@@ -387,7 +396,7 @@
 const size_t
 txn_rec::rec_size() const
 {
-    return deq_hdr::size() + _txn_hdr._xidsize + rec_tail::size();
+    return txn_hdr::size() + _txn_hdr._xidsize + rec_tail::size();
 }
 
 void

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-07 16:34:34 UTC (rev 1441)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-07 17:09:35 UTC (rev 1442)
@@ -1003,9 +1003,9 @@
     while (_cached_offset_dblks < wdblks)
     {
         void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
-        ::memcpy(wptr, (void*)&xmagic, 4);
+        ::memcpy(wptr, (void*)&xmagic, sizeof(xmagic));
 #ifdef RHM_CLEAN
-        ::memset((char*)wptr + 4, RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - 4);
+        ::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
 #endif
         _pg_offset_dblks++;
         _cached_offset_dblks++;




More information about the rhmessaging-commits mailing list