[rhmessaging-commits] rhmessaging commits: r1389 - in store/trunk/cpp: tests/jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Nov 29 09:51:02 EST 2007


Author: kpvdr
Date: 2007-11-29 09:51:02 -0500 (Thu, 29 Nov 2007)
New Revision: 1389

Modified:
   store/trunk/cpp/lib/jrnl/deq_rec.cpp
   store/trunk/cpp/lib/jrnl/deq_rec.hpp
   store/trunk/cpp/lib/jrnl/enq_rec.cpp
   store/trunk/cpp/lib/jrnl/enq_rec.hpp
   store/trunk/cpp/lib/jrnl/file_hdr.cpp
   store/trunk/cpp/lib/jrnl/file_hdr.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jerrno.hpp
   store/trunk/cpp/lib/jrnl/jinf.cpp
   store/trunk/cpp/lib/jrnl/jinf.hpp
   store/trunk/cpp/lib/jrnl/pmgr.cpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/txn_rec.cpp
   store/trunk/cpp/lib/jrnl/txn_rec.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
   store/trunk/cpp/lib/jrnl/wrfc.hpp
   store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp
   store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
Log:
BZ401151: Replaced system of relying on RIDs for recovery to the use of an odd/even flag in the headers to detect the correct starting point for recovery and the overwrite boundary. Also fixed bug in reading already dequeued records (via skip) which did not check for eof condition

Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -44,15 +44,15 @@
 {
 
 deq_rec::deq_rec():
-        _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0, 0),
+        _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0, false),
         _xidp(NULL),
         _buff(NULL),
         _deq_tail(_deq_hdr._hdr)
 {}
 
 deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
-        const size_t xidlen):
-        _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, 0, rid, drid, xidlen),
+        const size_t xidlen, const bool owi):
+        _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi),
         _xidp(xidp),
         _buff(NULL),
         _deq_tail(_deq_hdr._hdr)
@@ -74,9 +74,10 @@
 
 void
 deq_rec::reset(const  u_int64_t rid, const  u_int64_t drid, const void* const xidp,
-        const size_t xidlen)
+        const size_t xidlen, const bool owi)
 {
     _deq_hdr._hdr._rid = rid;
+    _deq_hdr.set_owi(owi);
     _deq_hdr._deq_rid = drid;
     _deq_hdr._xidsize = xidlen;
     _deq_tail._rid = rid;

Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -65,14 +65,14 @@
         deq_rec();
         // constructor used for write operations, where xid already exists
         deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
-                const size_t xidlen);
+                const size_t xidlen, const bool owi);
         virtual ~deq_rec();
 
         // Prepare instance for use in reading data from journal
         void reset();
         // Prepare instance for use in writing data to journal
         void reset(const  u_int64_t rid, const  u_int64_t drid, const void* const xidp,
-                const size_t xidlen);
+                const size_t xidlen, const bool owi);
         const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
                 throw (jexception);
         const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,

Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -46,7 +46,7 @@
 // Constructor used for read operations, where buf contains preallocated space to receive data.
 enq_rec::enq_rec():
         jrec(), // superclass
-        _enq_hdr(RHM_JDAT_ENQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0),
+        _enq_hdr(RHM_JDAT_ENQ_MAGIC, RHM_JDAT_VERSION, 0, 0, 0, false, false),
         _xidp(NULL),
         _data(NULL),
         _buff(NULL),
@@ -55,9 +55,9 @@
 
 // Constructor used for transactional write operations, where dbuf contains data to be written.
 enq_rec::enq_rec(const u_int64_t rid, const void* const dbuf, const size_t dlen,
-        const void* const xidp, const size_t xidlen, bool transient):
+        const void* const xidp, const size_t xidlen, const bool owi, const bool transient):
         jrec(), // superclass
-        _enq_hdr(RHM_JDAT_ENQ_MAGIC, RHM_JDAT_VERSION, rid, xidlen, dlen, transient),
+        _enq_hdr(RHM_JDAT_ENQ_MAGIC, RHM_JDAT_VERSION, rid, xidlen, dlen, owi, transient),
         _xidp(xidp),
         _data(dbuf),
         _buff(NULL),
@@ -73,6 +73,7 @@
 enq_rec::reset()
 {
     _enq_hdr._hdr._rid = 0;
+    _enq_hdr.set_owi(false);
     _enq_hdr.set_transient(false);
     _enq_hdr._xidsize = 0;
     _enq_hdr._dsize = 0;
@@ -86,9 +87,11 @@
 // be written.
 void
 enq_rec::reset(const u_int64_t rid, const void* const dbuf, const size_t dlen,
-        const void* const xidp, const size_t xidlen, const bool transient, const bool external)
+        const void* const xidp, const size_t xidlen, const bool owi, const bool transient,
+        const bool external)
 {
     _enq_hdr._hdr._rid = rid;
+    _enq_hdr.set_owi(owi);
     _enq_hdr.set_transient(transient);
     _enq_hdr.set_external(external);
     _enq_hdr._xidsize = xidlen;

Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -71,7 +71,7 @@
         * \brief Constructor used for write operations, where mbuf contains data to be written.
         */
         enq_rec(const u_int64_t rid, const void* const dbuf, const size_t dlen,
-                const void* const xidp, const size_t xidlen, bool transient);
+                const void* const xidp, const size_t xidlen, const bool owi, const bool transient);
 
         /**
         * \brief Destructor
@@ -82,7 +82,7 @@
         void reset();
         // Prepare instance for use in writing data to journal
         void reset(const u_int64_t rid, const void* const dbuf, const size_t dlen,
-                const void* const xidp, const size_t xidlen, const bool transient,
+                const void* const xidp, const size_t xidlen, const bool owi, const bool transient,
                 const bool external);
 
         const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)

Modified: store/trunk/cpp/lib/jrnl/file_hdr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/file_hdr.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/file_hdr.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -52,7 +52,7 @@
         _rid(0)
 {}
 
-hdr::hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag, const u_int64_t rid):
+hdr::hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid, const bool owi):
         _magic(magic),
         _version(version),
 #if defined(JRNL_BIG_ENDIAN)
@@ -60,7 +60,7 @@
 #else
         _eflag(RHM_LENDIAN_FLAG),
 #endif
-        _uflag(uflag),
+        _uflag(owi ? HDR_OVERWRITE_INDICATOR_MASK : 0),
         _rid(rid)
 {}
 
@@ -84,7 +84,18 @@
     _rid = 0;
 }
 
+void
+hdr::set_owi(const bool owi)
+{
+    if (owi)
+        _uflag |= HDR_OVERWRITE_INDICATOR_MASK;
+    else
+        _uflag &= (~HDR_OVERWRITE_INDICATOR_MASK);
+}
 
+const u_int16_t hdr::HDR_OVERWRITE_INDICATOR_MASK = 0x1;
+
+
 // ***** struct rec_tail *****
 
 rec_tail::rec_tail():
@@ -132,10 +143,10 @@
 #endif
 {}
 
-file_hdr::file_hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag,
-        const u_int64_t rid, const u_int32_t fid, const size_t fro, const bool settime)
+file_hdr::file_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+        const u_int32_t fid, const size_t fro, const bool owi, const bool settime)
         throw (jexception):
-        _hdr(magic, version, uflag, rid),
+        _hdr(magic, version, rid, owi),
         _fid(fid),
         _res(0),
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
@@ -165,6 +176,15 @@
 }
 
 void
+file_hdr::set_owi(const bool owi)
+{
+    if (owi)
+        _hdr._uflag |= hdr::HDR_OVERWRITE_INDICATOR_MASK;
+    else
+        _hdr._uflag &= (~hdr::HDR_OVERWRITE_INDICATOR_MASK);
+}
+
+void
 file_hdr::set_time() throw (jexception)
 {
 // TODO: Standardize on a method for getting time that does not requrie a context switch.
@@ -208,8 +228,8 @@
 {}
 
 enq_hdr::enq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
-        const size_t xidsize, const size_t dsize, const bool transient):
-        _hdr(magic, version, transient ? ENQ_HDR_TRANSIENT_MASK : 0, rid),
+        const size_t xidsize, const size_t dsize, const bool owi, const bool transient):
+        _hdr(magic, version, rid, owi),
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
         _filler0(0),
 #endif
@@ -224,9 +244,20 @@
 #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
         , _filler1(0)
 #endif
-{}
+{
+    set_transient(transient);
+}
 
 void
+enq_hdr::set_owi(const bool owi)
+{
+    if (owi)
+        _hdr._uflag |= hdr::HDR_OVERWRITE_INDICATOR_MASK;
+    else
+        _hdr._uflag &= (~hdr::HDR_OVERWRITE_INDICATOR_MASK);
+}
+
+void
 enq_hdr::set_transient(const bool transient)
 {
     if (transient)
@@ -244,8 +275,8 @@
         _hdr._uflag &= (~ENQ_HDR_EXTERNAL_MASK);
 }
 
-const u_int16_t enq_hdr::ENQ_HDR_TRANSIENT_MASK = 0x1;
-const u_int16_t enq_hdr::ENQ_HDR_EXTERNAL_MASK  = 0x2;
+const u_int16_t enq_hdr::ENQ_HDR_TRANSIENT_MASK = 0x10;
+const u_int16_t enq_hdr::ENQ_HDR_EXTERNAL_MASK  = 0x20;
 
 
 // ***** struct deq_hdr *****
@@ -262,9 +293,9 @@
 #endif
 {}
 
-deq_hdr::deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag,
-        const u_int64_t rid, const u_int64_t deq_rid, const size_t xidsize):
-        _hdr(magic, version, uflag, rid),
+deq_hdr::deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+        const u_int64_t deq_rid, const size_t xidsize, const bool owi):
+        _hdr(magic, version, rid, owi),
         _deq_rid(deq_rid),
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
         _filler0(0),
@@ -275,7 +306,16 @@
 #endif
 {}
 
+void
+deq_hdr::set_owi(const bool owi)
+{
+    if (owi)
+        _hdr._uflag |= hdr::HDR_OVERWRITE_INDICATOR_MASK;
+    else
+        _hdr._uflag &= (~hdr::HDR_OVERWRITE_INDICATOR_MASK);
+}
 
+
 // ***** struct txn_hdr *****
 
 
@@ -290,9 +330,9 @@
 #endif
 {}
 
-txn_hdr::txn_hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag,
-        const u_int64_t rid, const size_t xidsize):
-        _hdr(magic, version, uflag, rid),
+txn_hdr::txn_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+        const size_t xidsize, const bool owi):
+        _hdr(magic, version, rid, owi),
 #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
         _filler0(0),
 #endif
@@ -302,5 +342,14 @@
 #endif
 {}
 
+void
+txn_hdr::set_owi(const bool owi)
+{
+    if (owi)
+        _hdr._uflag |= hdr::HDR_OVERWRITE_INDICATOR_MASK;
+    else
+        _hdr._uflag &= (~hdr::HDR_OVERWRITE_INDICATOR_MASK);
+}
+
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/file_hdr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/file_hdr.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/file_hdr.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -82,6 +82,9 @@
         u_int8_t _eflag;        ///< Flag for determining endianness
         u_int16_t _uflag;       ///< User-defined flags
         u_int64_t _rid;         ///< Record ID (rotating 64-bit counter)
+        
+        // Global flags
+        static const u_int16_t HDR_OVERWRITE_INDICATOR_MASK;
 
         // Convenience constructors and methods
         /**
@@ -92,8 +95,7 @@
         /**
         * \brief Convenience constructor which initializes values during construction.
         */
-        hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag,
-                const u_int64_t rid);
+        hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid, const bool owi);
 
         /**
         * \brief Convenience copy method.
@@ -105,6 +107,9 @@
         */
         void reset();
 
+        inline const bool get_owi() const { return _uflag & HDR_OVERWRITE_INDICATOR_MASK; }
+        void set_owi(const bool owi);
+
         /**
         * \brief Returns the size of the header in bytes.
         */
@@ -232,10 +237,13 @@
         /**
         * \brief Convenience constructor which initializes values during construction.
         */
-        file_hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag,
-                const u_int64_t rid, const u_int32_t fid, const size_t fro,
-                const bool settime = false) throw (jexception);
+        file_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+                const u_int32_t fid, const size_t fro, const bool owi, const bool settime = false)
+                throw (jexception);
 
+        inline const bool get_owi() const { return _hdr._uflag & hdr::HDR_OVERWRITE_INDICATOR_MASK; }
+        void set_owi(const bool owi);
+
         /**
         * \brief Gets the current time from the system clock and sets the timestamp in the struct.
         */
@@ -313,8 +321,11 @@
         * \brief Convenience constructor which initializes values during construction.
         */
         enq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
-                const size_t xidsize, const size_t dsize, const bool transient = false);
-        
+                const size_t xidsize, const size_t dsize, const bool owi,
+                const bool transient = false);
+
+        inline const bool get_owi() const { return _hdr._uflag & hdr::HDR_OVERWRITE_INDICATOR_MASK; }
+        void set_owi(const bool owi);
         inline const bool is_transient() const { return _hdr._uflag & ENQ_HDR_TRANSIENT_MASK; }
         void set_transient(const bool transient);
         inline const bool is_external() const { return _hdr._uflag & ENQ_HDR_EXTERNAL_MASK; }
@@ -381,9 +392,12 @@
         /**
         * \brief Convenience constructor which initializes values during construction.
         */
-        deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag,
-                const u_int64_t rid, const u_int64_t deq_rid, const size_t xidsize);
+        deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+                const u_int64_t deq_rid, const size_t xidsize, const bool owi);
 
+        inline const bool get_owi() const { return _hdr._uflag & hdr::HDR_OVERWRITE_INDICATOR_MASK; }
+        void set_owi(const bool owi);
+
         /**
         * \brief Returns the size of the header in bytes.
         */
@@ -441,9 +455,12 @@
         /**
         * \brief Convenience constructor which initializes values during construction.
         */
-        txn_hdr(const u_int32_t magic, const u_int8_t version, const u_int16_t uflag,
-                const u_int64_t rid, const size_t xidsize);
+        txn_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
+                const size_t xidsize, const bool owi);
 
+        inline const bool get_owi() const { return _hdr._uflag & hdr::HDR_OVERWRITE_INDICATOR_MASK; }
+        void set_owi(const bool owi);
+
         /**
         * \brief Returns the size of the header in bytes.
         */

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -456,6 +456,7 @@
     try
     {
         rd._ffid = ji.get_start_file();
+        rd._owi = ji.get_initial_owi();
         rd._empty = false;
     }
     catch (const jexception& e)
@@ -504,7 +505,7 @@
     {
         case RHM_JDAT_ENQ_MAGIC:
             {
-                if (!check_rid(fid, h, rd, read_pos))
+                if (!check_owi(fid, h, rd, read_pos))
                     return false;
                 enq_rec er;
                 while (!done)
@@ -533,7 +534,7 @@
             break;
         case RHM_JDAT_DEQ_MAGIC:
             {
-                if (!check_rid(fid, h, rd, read_pos))
+                if (!check_owi(fid, h, rd, read_pos))
                     return false;
                 deq_rec dr;
                 while (!done)
@@ -576,7 +577,7 @@
             break;
         case RHM_JDAT_TXA_MAGIC:
             {
-                if (!check_rid(fid, h, rd, read_pos))
+                if (!check_owi(fid, h, rd, read_pos))
                     return false;
                 txn_rec ar;
                 while (!done)
@@ -611,7 +612,7 @@
             break;
         case RHM_JDAT_TXC_MAGIC:
             {
-                if (!check_rid(fid, h, rd, read_pos))
+                if (!check_owi(fid, h, rd, read_pos))
                     return false;
                 txn_rec cr;
                 while (!done)
@@ -681,7 +682,10 @@
             rd._lfid = fid++;
             ifsp->close();
             if (fid >= JRNL_NUM_FILES)
+            {
                 fid = 0;
+                rd._owi = !rd._owi; // Flip owi
+            }
             if (fid == rd._ffid) // used up all journal files
                 return false;
         }
@@ -716,11 +720,12 @@
 }
         
 const bool
-jcntl::check_rid(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos) throw (jexception)
+jcntl::check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos) throw (jexception)
 {
-    if (rd._h_rid && rd._h_rid >= h._rid)
+    if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
     {
-        if (fid == (rd._ffid ? rd._ffid - 1 : JRNL_NUM_FILES - 1))
+        u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : JRNL_NUM_FILES - 1;
+        if (fid == expected_fid)
         {
             rd._lfid = fid;
             rd._eo = read_pos;
@@ -728,12 +733,14 @@
         }
         std::stringstream ss;
         ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-        ss << " fid=" << fid << " rid=" << h._rid << " hrid=" << rd._h_rid;
+        ss << " fid=0x" << std::setw(4) << fid << " rid=0x" << std::setw(8) << h._rid;
         ss << " foffs=0x" << std::setw(8) << read_pos;
-        throw jexception(jerrno::JERR_JCNTL_RIDORDERBAD, ss.str().c_str(), "jcntl",
-                "check_rid");
-    }   
-    rd._h_rid = h._rid;
+        ss << " expected_fid=0x" << std::setw(4) << expected_fid;
+        throw jexception(jerrno::JERR_JCNTL_OWIMISMATCH, ss.str().c_str(), "jcntl",
+                "check_owi");
+    }
+    if (rd._h_rid < h._rid)
+        rd._h_rid = h._rid;
     return true;
 }
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -641,7 +641,7 @@
         const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
                 const bool jump_fro);
         
-        const bool check_rid(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos)
+        const bool check_owi(u_int16_t fid, hdr& h, rcvdat& rd, std::streampos read_pos)
                 throw (jexception);
 
         /**

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -59,7 +59,7 @@
 const u_int32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203;
 const u_int32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204;
 const u_int32_t jerrno::JERR_JCNTL_RECOVERJFULL = 0x0205;
-const u_int32_t jerrno::JERR_JCNTL_RIDORDERBAD  = 0x0206;
+const u_int32_t jerrno::JERR_JCNTL_OWIMISMATCH  = 0x0206;
 
 // class jdir
 const u_int32_t jerrno::JERR_JDIR_NOTDIR        = 0x0300;
@@ -143,8 +143,8 @@
             "Operation requires recover() to be run first.";
     _err_map[JERR_JCNTL_RECOVERJFULL] = "JERR_JCNTL_RECOVERJFULL: "
             "Journal data files full, cannot write.";
-    _err_map[JERR_JCNTL_RIDORDERBAD] = "JERR_JCNTL_RIDORDERBAD: "
-            "Record found with RID out-of-order.";
+    _err_map[JERR_JCNTL_OWIMISMATCH] = "JERR_JCNTL_OWIMISMATCH: "
+            "Overwrite Indecator (OWI) change found in unexpected location.";
 
     // class jdir
     _err_map[JERR_JDIR_NOTDIR] = "JERR_JDIR_NOTDIR: Directory name exists but is not a directory.";

Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -76,7 +76,7 @@
         static const u_int32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown magic
         static const u_int32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be called first
         static const u_int32_t JERR_JCNTL_RECOVERJFULL; ///< Journal data files full, cannot write
-        static const u_int32_t JERR_JCNTL_RIDORDERBAD;  ///< RID out-of-order
+        static const u_int32_t JERR_JCNTL_OWIMISMATCH;  ///< OWI change found in unexpected location
 
         // class jdir
         static const u_int32_t JERR_JDIR_NOTDIR;        ///< Exists but is not a directory

Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -56,7 +56,8 @@
         _tm_ptr(NULL),
         _valid_flag(false),
         _analyzed_flag(false),
-        _start_file(0)
+        _start_file(0),
+        _initial_owi(false)
 {
     read(jinf_filename);
     if (validate_flag)
@@ -81,7 +82,8 @@
         _tm_ptr(::localtime(&ts.tv_sec)),
         _valid_flag(false),
         _analyzed_flag(false),
-        _start_file(0)
+        _start_file(0),
+        _initial_owi(false)
 {}
 
 jinf::~jinf()
@@ -155,7 +157,8 @@
 jinf::analyze() throw (jexception)
 {
     u_int16_t fid = 0xffff;
-    u_int64_t rid = (u_int64_t)-1; // TODO: check this, 64-bit literals are a problem!
+    bool owi = false;
+    bool found = false;
     
     if (!_valid_flag)
         validate();
@@ -172,16 +175,23 @@
         jifs.read((char*)&fhdr, sizeof(fhdr));
         if (fhdr._hdr._magic != RHM_JDAT_FILE_MAGIC)
             break;
-        if (fhdr._hdr._rid < rid)
+        if (!fnum) // First file only
         {
+            owi = fhdr.get_owi();
+            _initial_owi = owi;
+            fid = 0;
+        }
+        else if (fhdr.get_owi() != owi && !found)
+        {
             fid = fnum;
-            rid = fhdr._hdr._rid;
+            found = true;
         }
         jifs.close();
     }
-    if (fid == (u_int16_t)-1)
+    if (fid == 0xffff)
         throw jexception(jerrno::JERR_JINF_JDATEMPTY, "jinf", "analyze");
     _start_file = fid;
+    _analyzed_flag = true;
     return _start_file;
 }
 
@@ -205,6 +215,14 @@
     return _start_file;
 }
 
+const bool
+jinf::get_initial_owi() throw (jexception)
+{
+    if (!_analyzed_flag)
+        analyze();
+    return _initial_owi;
+}
+
 const std::string
 jinf::to_string() const
 {

Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -64,6 +64,7 @@
         bool _valid_flag;
         bool _analyzed_flag;
         u_int16_t _start_file;
+        bool _initial_owi;
 
     public:
         // constructor for reading existing jinf file
@@ -91,6 +92,7 @@
         inline const u_int32_t rmgr_page_size_dblks() const { return _rmgr_page_size_dblks; }
         inline const u_int32_t rmgr_num_pages() const { return _rmgr_num_pages; }
         const u_int16_t get_start_file() throw (jexception);
+        const bool get_initial_owi() throw (jexception);
 
         const std::string to_string() const;
         const std::string xml_str() const;

Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -207,19 +207,19 @@
     }
 }
 
-bool
-pmgr::rotate_page(page_state state)
-{
-    _page_cb_arr[_pg_index]._state = state;
-    if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
-    {
-        _pg_offset_dblks = 0;
-        _pg_cntr++;
-    }
-    if (++_pg_index >= _pages)
-        _pg_index = 0;
-    return false;
-}
+// bool
+// pmgr::rotate_page(page_state state)
+// {
+//     _page_cb_arr[_pg_index]._state = state;
+//     if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+//     {
+//         _pg_offset_dblks = 0;
+//         _pg_cntr++;
+//     }
+//     if (++_pg_index >= _pages)
+//         _pg_index = 0;
+//     return false;
+// }
 
 void
 pmgr::clean()

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -150,7 +150,8 @@
 
     protected:
         virtual void initialize() throw (jexception);
-        virtual bool rotate_page(page_state state = UNUSED);
+//         virtual bool rotate_page(page_state state = UNUSED);
+        virtual void rotate_page() = 0;
         virtual void clean();
     };
 

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -42,6 +42,7 @@
 
         struct rcvdat
         {
+            bool _owi;
             bool _empty;        ///< Journal data files empty
             u_int16_t _ffid;    ///< First file id
             size_t _fro;        ///< First record offset in ffid
@@ -52,6 +53,7 @@
             std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
 
             rcvdat():
+                    _owi(false),
                     _empty(true),
                     _ffid(0),
                     _fro(0),
@@ -64,6 +66,7 @@
 
             void reset()
             {
+                _owi=false;
                 _empty=true;
                 _ffid=0;
                 _fro=0;
@@ -86,7 +89,8 @@
                 std::cout << "  Last fid (_lfid) = " << _lfid << std::endl;
                 std::cout << "  End offset (_eo) = 0x" << std::hex << _eo << std::dec << " ("  <<
                         (_eo/JRNL_DBLK_SIZE) << " dblks)" << std::endl;
-                std::cout << "  Highest rid (_h_rid) = " << _h_rid << std::endl;
+                std::cout << "  Highest rid (_h_rid) = 0x" << std::hex << _h_rid << std::dec <<
+                        std::endl;
                 std::cout << "  Journal full (_full) = " << (_full ? "TRUE" : "FALSE") << std::endl;
                 std::cout << "  Enqueued records (txn & non-txn):" << std::endl;
                 for (unsigned i=0; i<_enq_cnt_list.size(); i++)

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -284,6 +284,7 @@
                 bool is_enq = false;
                 try
                 {
+//std::cout << " rid=0x" << std::hex << _hdr._rid << std::dec << std::flush;
 //std::cout << " rid=" << _hdr._rid << std::flush;
                     fid = _emap.get_fid(_hdr._rid);
 //std::cout << ":ok" << std::flush;
@@ -629,6 +630,7 @@
             _pg_offset_dblks += this_dblk_cnt;
             tot_dblk_cnt += this_dblk_cnt;
         }
+        // If skip still incomplete, move to next page and decode again
         if (tot_dblk_cnt < dsize_dblks)
         {
             if (_pg_offset_dblks == JRNL_SBLK_SIZE * JRNL_RMGR_PAGE_SIZE)
@@ -650,6 +652,10 @@
             dtokp->set_dsize(0);
             dtokp->set_dblocks_read(0);
 //std::cout << "]" << std::flush;
+
+            // If we have finished with this page, rotate it
+            if (dblks_rem() == 0)
+                rotate_page();
             return RHM_IORES_SUCCESS;
         }
     }
@@ -747,7 +753,15 @@
 rmgr::rotate_page()
 {
     _page_cb_arr[_pg_index]._rdblks = 0;
-    pmgr::rotate_page();
+//     pmgr::rotate_page();
+    _page_cb_arr[_pg_index]._state = UNUSED;
+    if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+    {
+        _pg_offset_dblks = 0;
+        _pg_cntr++;
+    }
+    if (++_pg_index >= _pages)
+        _pg_index = 0;
     aio_cycle();
     _pg_offset_dblks = 0;
     // This counter is for bookkeeping only, page rotates are handled directly in init_aio_reads()

Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -53,8 +53,8 @@
 }
 
 txn_rec::txn_rec(const u_int32_t magic, const u_int64_t rid, const void* const xidp,
-        const size_t xidlen):
-        _txn_hdr(magic, RHM_JDAT_VERSION, 0, rid, xidlen),
+        const size_t xidlen, const bool owi):
+        _txn_hdr(magic, RHM_JDAT_VERSION, rid, xidlen, owi),
         _xidp(xidp),
         _buff(NULL),
         _txn_tail(_txn_hdr._hdr)
@@ -77,10 +77,11 @@
 
 void
 txn_rec::reset(const u_int32_t magic, const  u_int64_t rid, const void* const xidp,
-        const size_t xidlen)
+        const size_t xidlen, const bool owi)
 {
     _txn_hdr._hdr._magic = magic;
     _txn_hdr._hdr._rid = rid;
+    _txn_hdr.set_owi(owi);
     _txn_hdr._xidsize = xidlen;
     _xidp = xidp;
     _buff = NULL;

Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -65,14 +65,14 @@
         txn_rec();
         // constructor used for write operations, where xid already exists
         txn_rec(const u_int32_t magic, const u_int64_t rid, const void* const xidp,
-                const size_t xidlen);
+                const size_t xidlen, const bool owi);
         virtual ~txn_rec();
 
         // Prepare instance for use in reading data from journal
         void reset(const u_int32_t magic);
         // Prepare instance for use in writing data to journal
         void reset(const u_int32_t magic, const  u_int64_t rid, const void* const xidp,
-                const size_t xidlen);
+                const size_t xidlen, const bool owi);
         const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
                 throw (jexception);
         const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -126,7 +126,8 @@
         _enq_busy = true;
 
     u_int64_t rid = initialize_rid(cont, dtokp);
-    _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient, external);
+    _enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient,
+            external);
     if (!cont)
     {
         dtokp->set_rid(rid);
@@ -274,7 +275,7 @@
 		dtokp->set_rid(rid);
 		dtokp->set_dequeue_rid(dequeue_rid);
 	}
-    _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len);
+    _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi());
     if (!cont)
     {
         if (xid_len)
@@ -409,7 +410,7 @@
         _abort_busy = true;
 
     u_int64_t rid = dtokp->rid() ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
-    _txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len);
+    _txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
     if (!cont)
     {
         dtokp->set_rid(rid);
@@ -552,7 +553,7 @@
         _commit_busy = true;
 
     u_int64_t rid = dtokp->rid() ? dtokp->rid() : (cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid());
-    _txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len);
+    _txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
     if (!cont)
     {
         dtokp->set_rid(rid);
@@ -718,7 +719,8 @@
 //std::cout << "{w^" << _pg_index << "," << _cached_offset_dblks << "}" << std::flush;
             _cached_offset_dblks = 0;
 
-           rotate_page(AIO_PENDING); // increments _pg_index, resets _pg_offset_dblks if req'd
+//           rotate_page(AIO_PENDING); // increments _pg_index, resets _pg_offset_dblks if req'd
+           rotate_page(); // increments _pg_index, resets _pg_offset_dblks if req'd
            if (_page_cb_arr[_pg_index]._state == UNUSED)
                _page_cb_arr[_pg_index]._state = IN_USE;
         }
@@ -1007,7 +1009,7 @@
 void
 wmgr::write_fhdr(u_int64_t rid, u_int32_t fid, size_t fro) throw (jexception)
 {
-    file_hdr fhdr(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, 0, rid, fid, fro, true);
+    file_hdr fhdr(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, rid, fid, fro, _wrfc.owi(), true);
     ::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr));
 #ifdef RHM_CLEAN
     ::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
@@ -1022,6 +1024,19 @@
 }
 
 void
+wmgr::rotate_page()
+{
+    _page_cb_arr[_pg_index]._state = AIO_PENDING;
+    if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+    {
+        _pg_offset_dblks = 0;
+        _pg_cntr++;
+    }
+    if (++_pg_index >= _pages)
+        _pg_index = 0;
+}
+
+void
 wmgr::clean()
 {
     if (_fhdr_base_ptr)

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -124,6 +124,7 @@
         const iores rotate_file();
         void dblk_roundup();
         void write_fhdr(u_int64_t rid, u_int32_t fid, size_t fro) throw (jexception);
+        void rotate_page();
         void clean();
 
         // Special version of libaio's io_prep_pwrite() which preserves the value of the data

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -47,7 +47,8 @@
 #else
         _rid(0),
 #endif
-        _reset_ok(false)
+        _reset_ok(false),
+        _owi(false)
 {}
 
 wrfc::~wrfc() {}
@@ -80,7 +81,10 @@
         throw jexception(jerrno::JERR__NINIT, "wrfc", "rotate");
     _fh_index++;
     if (_fh_index == _nfiles)
+    {
         _fh_index = 0;
+        _owi = !_owi; // flip owi
+    }
     _curr_fh = _fh_arr[_fh_index];
     return reset(); //Checks if file is still in use (ie not fully dequeued yet)
 }

Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -57,6 +57,7 @@
     private:
         u_int64_t _rid;     ///< Master counter for record ID (rid)
         bool _reset_ok;     ///< Flag set when reset succeeds
+        bool _owi;          ///< Overwrite indicator
 
     public:
         wrfc();
@@ -82,6 +83,7 @@
         inline const u_int64_t get_incr_rid() { return _rid++; }
         const bool reset();
         inline const bool is_reset() const { return _reset_ok; }
+        inline const bool owi() const { return _owi; }
 
         // Convenience access methods to current file handle
 

Modified: store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/tests/jrnl/unit_test_file_hdr.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -49,13 +49,15 @@
     BOOST_CHECK_EQUAL(h1._eflag, 0);
     BOOST_CHECK_EQUAL(h1._uflag, 0);
     BOOST_CHECK_EQUAL(h1._rid, 0ULL);
+    BOOST_CHECK(!h1.get_owi());
 
     const u_int32_t magic = 0x89abcdefUL;    
+    const u_int16_t uflag = 0x5537;
     const u_int8_t version = 0xef;    
-    const u_int16_t uflag = 0xabcd;
     const u_int64_t rid = 0x123456789abcdef0ULL;
+    const bool owi = true;
 
-    hdr h2(magic, version, uflag, rid);
+    hdr h2(magic, version, rid, owi);
     BOOST_CHECK_EQUAL(h2._magic, magic);
     BOOST_CHECK_EQUAL(h2._version, version);
 #ifdef JRNL_LITTLE_ENDIAN
@@ -63,8 +65,20 @@
 #else
     BOOST_CHECK_EQUAL(h2._eflag, RHM_BENDIAN_FLAG);
 #endif
+    BOOST_CHECK_EQUAL(h2._uflag, hdr::HDR_OVERWRITE_INDICATOR_MASK);
+    BOOST_CHECK_EQUAL(h2._rid, rid);
+    BOOST_CHECK_EQUAL(h2.get_owi(), owi);
+    h2._uflag = uflag;
+    BOOST_CHECK(h2.get_owi());
+    h2.set_owi(true);
+    BOOST_CHECK(h2.get_owi());
     BOOST_CHECK_EQUAL(h2._uflag, uflag);
-    BOOST_CHECK_EQUAL(h2._rid, rid);
+    h2.set_owi(false);
+    BOOST_CHECK(!h2.get_owi());
+    BOOST_CHECK_EQUAL(h2._uflag, (uflag & ~hdr::HDR_OVERWRITE_INDICATOR_MASK));
+    h2.set_owi(true);
+    BOOST_CHECK(h2.get_owi());
+    BOOST_CHECK_EQUAL(h2._uflag, uflag);
 
     h1.copy(h2);    
     BOOST_CHECK_EQUAL(h1._magic, magic);
@@ -76,6 +90,8 @@
 #endif
     BOOST_CHECK_EQUAL(h1._uflag, uflag);
     BOOST_CHECK_EQUAL(h1._rid, rid);
+    BOOST_CHECK(h1.get_owi());
+    BOOST_CHECK_EQUAL(h1._uflag, uflag);
 
     h1.reset();
     BOOST_CHECK_EQUAL(h1._magic, 0UL);
@@ -83,6 +99,7 @@
     BOOST_CHECK_EQUAL(h1._eflag, 0);
     BOOST_CHECK_EQUAL(h1._uflag, 0);
     BOOST_CHECK_EQUAL(h1._rid, 0ULL);
+    BOOST_CHECK(!h1.get_owi());
 }
 
 void test_rec_tail()
@@ -104,7 +121,7 @@
     }
 
     {
-        hdr h(magic, RHM_JDAT_VERSION, 0, rid);
+        hdr h(magic, RHM_JDAT_VERSION, rid, true);
         rec_tail rt3(h);
         BOOST_CHECK_EQUAL(rt3._xmagic, xmagic);
         BOOST_CHECK_EQUAL(rt3._rid, rid);
@@ -115,7 +132,7 @@
 {
     const u_int32_t magic = 0xfedcba98UL;
     const u_int8_t version = 0xa5;
-    const u_int16_t uflag = 0x1234;
+    const u_int16_t uflag = 0x5537;
     const u_int64_t rid = 0xfedcba9876543210ULL;
     const u_int32_t fid = 0xfedcba98UL;
 #ifdef JRNL_32_BIT
@@ -124,6 +141,7 @@
     const size_t fro = 0xfedcba9876543210ULL;
 #endif
     timespec ts;
+    const bool owi = true;
 
     {
         file_hdr fh1;
@@ -139,13 +157,14 @@
         BOOST_CHECK_EQUAL(fh1._ts_nsec, 0UL);
 #else
         BOOST_CHECK_EQUAL(fh1._fro, 0ULL);
-        BOOST_CHECK_EQUAL(fh1._ts_sec, 0ULL);
+        BOOST_CHECK_EQUAL(fh1._ts_sec, 0LL);
         BOOST_CHECK_EQUAL(fh1._ts_nsec, 0ULL);
 #endif
+        BOOST_CHECK(!fh1.get_owi());
     }
 
     {
-        file_hdr fh2(magic, version, uflag, rid, fid, fro);
+        file_hdr fh2(magic, version, rid, fid, fro, owi, false);
         BOOST_CHECK_EQUAL(fh2._hdr._magic, magic);
         BOOST_CHECK_EQUAL(fh2._hdr._version, version);
 #ifdef JRNL_LITTLE_ENDIAN
@@ -153,7 +172,7 @@
 #else
         BOOST_CHECK_EQUAL(fh2._hdr._eflag, RHM_BENDIAN_FLAG);
 #endif
-        BOOST_CHECK_EQUAL(fh2._hdr._uflag, uflag);
+        BOOST_CHECK_EQUAL(fh2._hdr._uflag, hdr::HDR_OVERWRITE_INDICATOR_MASK);
         BOOST_CHECK_EQUAL(fh2._hdr._rid, rid);
         BOOST_CHECK_EQUAL(fh2._fid,fid );
         BOOST_CHECK_EQUAL(fh2._fro, fro);
@@ -168,10 +187,22 @@
         fh2.set_time(ts);
         BOOST_CHECK_EQUAL(fh2._ts_sec, ts.tv_sec);
         BOOST_CHECK_EQUAL(fh2._ts_nsec, (u_int32_t)ts.tv_nsec);
+        BOOST_CHECK(fh2.get_owi());
+
+        fh2._hdr._uflag = uflag;
+        BOOST_CHECK(fh2.get_owi());
+
+        fh2.set_owi(false);
+        BOOST_CHECK(!fh2.get_owi());
+        BOOST_CHECK_EQUAL(fh2._hdr._uflag, (uflag & ~hdr::HDR_OVERWRITE_INDICATOR_MASK));
+
+        fh2.set_owi(true);
+        BOOST_CHECK(fh2.get_owi());
+        BOOST_CHECK_EQUAL(fh2._hdr._uflag, uflag);
     }
 
     {
-        file_hdr fh3(magic, version, uflag, rid, fid, fro, true);
+        file_hdr fh3(magic, version, rid, fid, fro, owi, true);
         BOOST_CHECK_EQUAL(fh3._hdr._magic, magic);
         BOOST_CHECK_EQUAL(fh3._hdr._version, version);
 #ifdef JRNL_LITTLE_ENDIAN
@@ -179,7 +210,7 @@
 #else
         BOOST_CHECK_EQUAL(fh3._hdr._eflag, RHM_BENDIAN_FLAG);
 #endif
-        BOOST_CHECK_EQUAL(fh3._hdr._uflag, uflag);
+        BOOST_CHECK_EQUAL(fh3._hdr._uflag, hdr::HDR_OVERWRITE_INDICATOR_MASK);
         BOOST_CHECK_EQUAL(fh3._hdr._rid, rid);
         BOOST_CHECK_EQUAL(fh3._fid, fid);
         BOOST_CHECK_EQUAL(fh3._fro, fro);
@@ -192,6 +223,7 @@
     const u_int32_t magic = 0xfedcba98UL;
     const u_int8_t version = 0xa5;
     const u_int64_t rid = 0xfedcba9876543210ULL;
+    const u_int16_t uflag = 0x5537;
 #ifdef JRNL_32_BIT
     const size_t xidsize = 0xfedcba98UL;
     const size_t dsize = 0x76543210UL;
@@ -199,6 +231,7 @@
     const size_t xidsize = 0xfedcba9876543210ULL;
     const size_t dsize = 0x76543210fedcba98ULL;
 #endif
+    const bool owi = true;
 
     {
         enq_hdr eh1;
@@ -214,10 +247,11 @@
         BOOST_CHECK_EQUAL(eh1._xidsize, 0ULL);
         BOOST_CHECK_EQUAL(eh1._dsize, 0ULL);
 #endif
+        BOOST_CHECK(!eh1.get_owi());
     }
 
     {
-        enq_hdr eh2(magic, version, rid, xidsize, dsize);
+        enq_hdr eh2(magic, version, rid, xidsize, dsize, owi, false);
         BOOST_CHECK_EQUAL(eh2._hdr._magic, magic);
         BOOST_CHECK_EQUAL(eh2._hdr._version, version);
 #ifdef JRNL_LITTLE_ENDIAN
@@ -225,41 +259,58 @@
 #else
         BOOST_CHECK_EQUAL(eh2._hdr._eflag, RHM_BENDIAN_FLAG);
 #endif
-        BOOST_CHECK_EQUAL(eh2._hdr._uflag, 0);
+        BOOST_CHECK_EQUAL(eh2._hdr._uflag, hdr::HDR_OVERWRITE_INDICATOR_MASK);
         BOOST_CHECK_EQUAL(eh2._hdr._rid, rid);
         BOOST_CHECK_EQUAL(eh2._xidsize, xidsize);
         BOOST_CHECK_EQUAL(eh2._dsize, dsize);
+        BOOST_CHECK(eh2.get_owi());
         BOOST_CHECK(!eh2.is_transient());
         BOOST_CHECK(!eh2.is_external());
 
-        eh2.set_transient(true);
-        BOOST_CHECK_EQUAL(eh2._hdr._uflag, enq_hdr::ENQ_HDR_TRANSIENT_MASK);
+        eh2._hdr._uflag = uflag;
+        BOOST_CHECK(eh2.get_owi());
         BOOST_CHECK(eh2.is_transient());
-        BOOST_CHECK(!eh2.is_external());
+        BOOST_CHECK(eh2.is_external());
 
+        eh2.set_owi(false);
+        BOOST_CHECK(!eh2.get_owi());
+        BOOST_CHECK(eh2.is_transient());
+        BOOST_CHECK(eh2.is_external());
+        BOOST_CHECK_EQUAL(eh2._hdr._uflag, (uflag & ~hdr::HDR_OVERWRITE_INDICATOR_MASK));
+
+        eh2.set_owi(true);
+        BOOST_CHECK(eh2.get_owi());
+        BOOST_CHECK(eh2.is_transient());
+        BOOST_CHECK(eh2.is_external());
+        BOOST_CHECK_EQUAL(eh2._hdr._uflag, uflag);
+
         eh2.set_transient(false);
-        BOOST_CHECK_EQUAL(eh2._hdr._uflag, 0);
+        BOOST_CHECK(eh2.get_owi());
         BOOST_CHECK(!eh2.is_transient());
-        BOOST_CHECK(!eh2.is_external());
+        BOOST_CHECK(eh2.is_external());
+        BOOST_CHECK_EQUAL(eh2._hdr._uflag, uflag & ~enq_hdr::ENQ_HDR_TRANSIENT_MASK);
 
-        eh2.set_external(true);
-        BOOST_CHECK_EQUAL(eh2._hdr._uflag, enq_hdr::ENQ_HDR_EXTERNAL_MASK);
-        BOOST_CHECK(!eh2.is_transient());
+        eh2.set_transient(true);
+        BOOST_CHECK(eh2.get_owi());
+        BOOST_CHECK(eh2.is_transient());
         BOOST_CHECK(eh2.is_external());
+        BOOST_CHECK_EQUAL(eh2._hdr._uflag, uflag);
 
         eh2.set_external(false);
-        BOOST_CHECK_EQUAL(eh2._hdr._uflag, 0);
-        BOOST_CHECK(!eh2.is_transient());
+        BOOST_CHECK(eh2.get_owi());
+        BOOST_CHECK(eh2.is_transient());
         BOOST_CHECK(!eh2.is_external());
+        BOOST_CHECK_EQUAL(eh2._hdr._uflag, uflag & ~enq_hdr::ENQ_HDR_EXTERNAL_MASK);
 
-        eh2.set_transient(true);
         eh2.set_external(true);
-        BOOST_CHECK_EQUAL(eh2._hdr._uflag,
-                enq_hdr::ENQ_HDR_TRANSIENT_MASK | enq_hdr::ENQ_HDR_EXTERNAL_MASK);
+        BOOST_CHECK(eh2.get_owi());
+        BOOST_CHECK(eh2.is_transient());
+        BOOST_CHECK(eh2.is_external());
+        BOOST_CHECK_EQUAL(eh2._hdr._uflag, uflag);
     }
 
     {
-        enq_hdr eh3(magic, version, rid, xidsize, dsize, true);
+        enq_hdr eh3(magic, version, rid, xidsize, dsize, owi, true);
         BOOST_CHECK_EQUAL(eh3._hdr._magic, magic);
         BOOST_CHECK_EQUAL(eh3._hdr._version, version);
 #ifdef JRNL_LITTLE_ENDIAN
@@ -267,10 +318,12 @@
 #else
         BOOST_CHECK_EQUAL(eh3._hdr._eflag, RHM_BENDIAN_FLAG);
 #endif
-        BOOST_CHECK_EQUAL(eh3._hdr._uflag, enq_hdr::ENQ_HDR_TRANSIENT_MASK);
+        BOOST_CHECK_EQUAL(eh3._hdr._uflag,
+                enq_hdr::ENQ_HDR_TRANSIENT_MASK | hdr::HDR_OVERWRITE_INDICATOR_MASK);
         BOOST_CHECK_EQUAL(eh3._hdr._rid, rid);
         BOOST_CHECK_EQUAL(eh3._xidsize, xidsize);
         BOOST_CHECK_EQUAL(eh3._dsize, dsize);
+        BOOST_CHECK(eh3.get_owi());
         BOOST_CHECK(eh3.is_transient());
         BOOST_CHECK(!eh3.is_external());
     }
@@ -280,7 +333,7 @@
 {
     const u_int32_t magic = 0xfedcba98UL;
     const u_int8_t version = 0xa5;
-    const u_int16_t uflag = 0x1234;
+    const u_int16_t uflag = 0x5537;
     const u_int64_t rid = 0xfedcba9876543210ULL;
     const u_int64_t drid = 0x76543210fedcba98ULL;
 #ifdef JRNL_32_BIT
@@ -288,6 +341,7 @@
 #else
     const size_t xidsize = 0xfedcba9876543210ULL;
 #endif
+    const bool owi = true;
 
     {
         deq_hdr dh1;
@@ -302,10 +356,11 @@
 #else
         BOOST_CHECK_EQUAL(dh1._xidsize, 0ULL);
 #endif
+        BOOST_CHECK(!dh1.get_owi());
     }
 
     {
-        deq_hdr dh2(magic, version, uflag, rid, drid, xidsize);
+        deq_hdr dh2(magic, version, rid, drid, xidsize, owi);
         BOOST_CHECK_EQUAL(dh2._hdr._magic, magic);
         BOOST_CHECK_EQUAL(dh2._hdr._version, version);
 #ifdef JRNL_LITTLE_ENDIAN
@@ -313,10 +368,22 @@
 #else
         BOOST_CHECK_EQUAL(dh2._hdr._eflag, RHM_BENDIAN_FLAG);
 #endif
-        BOOST_CHECK_EQUAL(dh2._hdr._uflag, uflag);
+        BOOST_CHECK_EQUAL(dh2._hdr._uflag, hdr::HDR_OVERWRITE_INDICATOR_MASK);
         BOOST_CHECK_EQUAL(dh2._hdr._rid, rid);
         BOOST_CHECK_EQUAL(dh2._deq_rid, drid);
         BOOST_CHECK_EQUAL(dh2._xidsize, xidsize);
+        BOOST_CHECK(dh2.get_owi());
+
+        dh2._hdr._uflag = uflag;
+        BOOST_CHECK(dh2.get_owi());
+
+        dh2.set_owi(false);
+        BOOST_CHECK(!dh2.get_owi());
+        BOOST_CHECK_EQUAL(dh2._hdr._uflag, (uflag & ~hdr::HDR_OVERWRITE_INDICATOR_MASK));
+
+        dh2.set_owi(true);
+        BOOST_CHECK(dh2.get_owi());
+        BOOST_CHECK_EQUAL(dh2._hdr._uflag, uflag);
     }
 }
 
@@ -324,13 +391,14 @@
 {
     const u_int32_t magic = 0xfedcba98UL;
     const u_int8_t version = 0xa5;
-    const u_int16_t uflag = 0x1234;
+    const u_int16_t uflag = 0x5537;
     const u_int64_t rid = 0xfedcba9876543210ULL;
 #ifdef JRNL_32_BIT
     const size_t xidsize = 0xfedcba98UL;
 #else
     const size_t xidsize = 0xfedcba9876543210ULL;
 #endif
+    const bool owi = true;
 
     {
         txn_hdr th1;
@@ -344,10 +412,11 @@
 #else
         BOOST_CHECK_EQUAL(th1._xidsize, 0ULL);
 #endif
+        BOOST_CHECK(!th1.get_owi());
     }
 
     {
-        txn_hdr th2(magic, version, uflag, rid, xidsize);
+        txn_hdr th2(magic, version, rid, xidsize, owi);
         BOOST_CHECK_EQUAL(th2._hdr._magic, magic);
         BOOST_CHECK_EQUAL(th2._hdr._version, version);
 #ifdef JRNL_LITTLE_ENDIAN
@@ -355,9 +424,21 @@
 #else
         BOOST_CHECK_EQUAL(th2._hdr._eflag, RHM_BENDIAN_FLAG);
 #endif
-        BOOST_CHECK_EQUAL(th2._hdr._uflag, uflag);
+        BOOST_CHECK_EQUAL(th2._hdr._uflag, hdr::HDR_OVERWRITE_INDICATOR_MASK);
         BOOST_CHECK_EQUAL(th2._hdr._rid, rid);
         BOOST_CHECK_EQUAL(th2._xidsize, xidsize);
+        BOOST_CHECK(th2.get_owi());
+
+        th2._hdr._uflag = uflag;
+        BOOST_CHECK(th2.get_owi());
+
+        th2.set_owi(false);
+        BOOST_CHECK(!th2.get_owi());
+        BOOST_CHECK_EQUAL(th2._hdr._uflag, (uflag & ~hdr::HDR_OVERWRITE_INDICATOR_MASK));
+
+        th2.set_owi(true);
+        BOOST_CHECK(th2.get_owi());
+        BOOST_CHECK_EQUAL(th2._hdr._uflag, uflag);
     }
 }
 

Modified: store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp	2007-11-29 14:50:56 UTC (rev 1388)
+++ store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp	2007-11-29 14:51:02 UTC (rev 1389)
@@ -55,7 +55,8 @@
 void create_journal_files(std::vector<std::string>& jfiles, rid_scheme scheme,
         u_int32_t min_fid_offs = 0, u_int64_t rid_offs = 0);
 void clean_journal_files(std::vector<std::string>& jfiles);
-void init_fhdr(file_hdr& fh, u_int32_t fid, u_int64_t rid, bool no_enq = false);
+void init_fhdr(file_hdr& fh, const u_int32_t fid, const u_int64_t rid, const bool owi,
+        const bool no_enq = false);
 void clean_journal_info_file();
 
 
@@ -187,7 +188,7 @@
         if (scheme == RID_NONE) // create file containing 0s
             ::memset(&fh, 0, sizeof(file_hdr));
         else
-            init_fhdr(fh, fid, rid);
+            init_fhdr(fh, fid, rid, fid >= min_fid_offs);
         
         // write file header
         int cnt = sizeof(file_hdr);
@@ -219,7 +220,8 @@
     jfiles.clear();
 }
 
-void init_fhdr(file_hdr& fh, u_int32_t fid, u_int64_t rid, bool no_enq)
+void init_fhdr(file_hdr& fh, const u_int32_t fid, const u_int64_t rid, const bool owi,
+        const bool no_enq)
 {
     fh._hdr._magic = RHM_JDAT_FILE_MAGIC;
     fh._hdr._version = RHM_JDAT_VERSION;
@@ -228,7 +230,7 @@
 #else
     fh._hdr._eflag = RHM_LENDIAN_FLAG;
 #endif
-    fh._hdr._uflag = 0;
+    fh._hdr._uflag = owi ? hdr::HDR_OVERWRITE_INDICATOR_MASK : 0;
     fh._hdr._rid = rid;
     fh._fid = fid;
     fh._fro = no_enq ? 0 : 0x200;




More information about the rhmessaging-commits mailing list