[rhmessaging-commits] rhmessaging commits: r2561 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Oct 1 15:42:31 EDT 2008


Author: kpvdr
Date: 2008-10-01 15:42:31 -0400 (Wed, 01 Oct 2008)
New Revision: 2561

Added:
   store/trunk/cpp/lib/jrnl/rfc.cpp
   store/trunk/cpp/lib/jrnl/rfc.hpp
Modified:
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/jrnl/fcntl.cpp
   store/trunk/cpp/lib/jrnl/fcntl.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jerrno.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/rrfc.cpp
   store/trunk/cpp/lib/jrnl/rrfc.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
   store/trunk/cpp/lib/jrnl/wrfc.hpp
   store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Removed read file handles from class fcntl, moved a single read file to class rrfc where it is opened and closed as needed. This reduces the number of consumed file handles to a half of the previous number; now only one file handle plus one when lazy-load is active is consumed per journal file instead of two per file. Additional code tidy-up also performed, including adding class rfc as a superclass to both rrfc and wrfc.

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/Makefile.am	2008-10-01 19:42:31 UTC (rev 2561)
@@ -58,6 +58,7 @@
   jrnl/lf_map.cpp               \
   jrnl/pmgr.cpp                 \
   jrnl/rmgr.cpp                 \
+  jrnl/rfc.cpp                  \
   jrnl/rrfc.cpp                 \
   jrnl/slock.cpp                \
   jrnl/time_ns.cpp              \
@@ -91,6 +92,7 @@
   jrnl/rec_hdr.hpp              \
   jrnl/rec_tail.hpp             \
   jrnl/rmgr.hpp                 \
+  jrnl/rfc.hpp                  \
   jrnl/rrfc.hpp                 \
   jrnl/slock.hpp                \
   jrnl/time_ns.hpp              \

Modified: store/trunk/cpp/lib/jrnl/fcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/fcntl.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/fcntl.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -51,7 +51,6 @@
         _fid(fid),
         _lid(lid),
         _ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
-        _rd_fh(-1),
         _wr_fh(-1),
         _rec_enqcnt(0),
         _rd_subm_cnt_dblks(0),
@@ -61,12 +60,12 @@
         _aio_cnt(0)
 {
     initialize(fbasename, fid, lid, jfsize_sblks, ro);
-    open_fh();
+    open_wr_fh();
 }
 
 fcntl::~fcntl()
 {
-    close_fh();
+    close_wr_fh();
 }
 
 bool
@@ -113,6 +112,32 @@
     return true;
 }
 
+int
+fcntl::open_wr_fh()
+{
+    if (_wr_fh < 0)
+    {
+        _wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
+        if (_wr_fh < 0)
+        {
+            std::ostringstream oss;
+            oss << "file=\"" << _fname << "\"" << FORMAT_SYSERR(errno);
+            throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "open_fh");
+        }
+    }
+    return _wr_fh;
+}
+
+void
+fcntl::close_wr_fh()
+{
+    if (_wr_fh >= 0)
+    {
+        ::close(_wr_fh);
+        _wr_fh = -1;
+    }
+}
+
 u_int32_t
 fcntl::add_enqcnt(u_int32_t a)
 {
@@ -263,41 +288,6 @@
 #endif
 }
 
-void
-fcntl::open_fh()
-{
-    _rd_fh = ::open(_fname.c_str(), O_RDONLY | O_DIRECT);
-    if (_rd_fh < 0)
-    {
-        std::ostringstream oss;
-        oss << "file=\"" << _fname << "\"" << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_FCNTL_OPENRD, oss.str(), "fcntl", "open_fh");
-    }
-    _wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT,
-            S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
-    if (_wr_fh < 0)
-    {
-        std::ostringstream oss;
-        oss << "file=\"" << _fname << "\"" << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "open_fh");
-    }
-}
-
-void
-fcntl::close_fh()
-{
-    if (_rd_fh >= 0)
-    {
-        ::close(_rd_fh);
-        _rd_fh = -1;
-    }
-    if (_wr_fh >= 0)
-    {
-        ::close(_wr_fh);
-        _wr_fh = -1;
-    }
-}
-
 std::string
 fcntl::filename(const std::string& fbasename, const u_int16_t fid)
 {

Modified: store/trunk/cpp/lib/jrnl/fcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/fcntl.hpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/fcntl.hpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -61,7 +61,6 @@
         u_int16_t _fid;                 ///< File ID (file number in order of creation)
         u_int16_t _lid;                 ///< Logical ID (ordinal number in ring store)
         const u_int32_t _ffull_dblks;   ///< File size in dblks (incl. file header)
-        int _rd_fh;                     ///< Read file handle
         int _wr_fh;                     ///< Write file handle
         u_int32_t _rec_enqcnt;          ///< Count of enqueued records
         u_int32_t _rd_subm_cnt_dblks;   ///< Read file count (data blocks) for submitted AIO
@@ -80,11 +79,14 @@
         virtual void rd_reset();
         virtual bool wr_reset(const rcvdat* const ro = 0);
 
+        virtual int open_wr_fh();
+        virtual void close_wr_fh();
+        inline bool is_wr_fh_open() const { return _wr_fh >= 0; }
+
         inline const std::string& fname() const { return _fname; }
         inline u_int16_t fid() const { return _fid; }
         inline u_int16_t lid() const { return _lid; }
         inline void set_lid(const u_int16_t lid) { _lid = lid; }
-        inline int rd_fh() const { return _rd_fh; }
         inline int wr_fh() const { return _wr_fh; }
         inline u_int32_t enqcnt() const { return _rec_enqcnt; }
         inline u_int32_t incr_enqcnt() { return ++_rec_enqcnt; }
@@ -137,8 +139,6 @@
         virtual void initialize(const std::string& fbasename, const u_int16_t fid, const u_int16_t lid,
                 const u_int32_t jfsize_sblks, const rcvdat* const ro);
 
-        virtual void open_fh();
-        virtual void close_fh();
         static std::string filename(const std::string& fbasename, const u_int16_t fid);
         void clean_file(const u_int32_t jfsize_sblks);
         void create_jfile(const u_int32_t jfsize_sblks);

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -140,6 +140,7 @@
 
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr);
     _rrfc.initialize(_num_jfiles, _fc_arr);
+    _rrfc.set_findex(0);
     _rmgr.initialize(rd_cb);
     _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
             JRNL_WMGR_MAXWAITUS);
@@ -206,7 +207,8 @@
     }
 
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
-    _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
+    _rrfc.initialize(_num_jfiles, _fc_arr);
+    _rrfc.set_findex(_rcvdat.ffid(_num_jfiles));
     _rmgr.initialize(rd_cb);
     _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
             JRNL_WMGR_MAXWAITUS, (_rcvdat._lffull ? 0 : _rcvdat._eo));
@@ -223,7 +225,8 @@
     for (u_int16_t i=0; i<_num_jfiles; i++)
         _fc_arr[i]->reset(&_rcvdat);
     _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
-    _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
+    _rrfc.initialize(_num_jfiles, _fc_arr);
+    _rrfc.set_findex(_rcvdat.ffid(_num_jfiles));
     _rmgr.recover_complete();
     _readonly_flag = false;
 }
@@ -416,6 +419,7 @@
     _stop_flag = true;
     if (!_readonly_flag)
         flush(block_till_aio_cmpl);
+    _rrfc.reset();
 }
 
 u_int16_t
@@ -428,8 +432,8 @@
         if (++ffid >= _num_jfiles)
             ffid = 0; 
     }
-    if (ffid != _rrfc.fid())
-        _rrfc.reset(ffid);
+    if (!_rrfc.is_active())
+        _rrfc.set_findex(ffid);
     return ffid;
 }
 

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -75,15 +75,15 @@
 const u_int32_t jerrno::JERR_JDIR_BADFTYPE      = 0x030a;
 
 // class fcntl
-const u_int32_t jerrno::JERR_FCNTL_OPENRD       = 0x0400;
-const u_int32_t jerrno::JERR_FCNTL_OPENWR       = 0x0401;
-const u_int32_t jerrno::JERR_FCNTL_WRITE        = 0x0402;
-const u_int32_t jerrno::JERR_FCNTL_CLOSE        = 0x0403;
-const u_int32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0404;
-const u_int32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0405;
-const u_int32_t jerrno::JERR_FCNTL_RDOFFSOVFL   = 0x0406;
+const u_int32_t jerrno::JERR_FCNTL_OPENWR       = 0x0400;
+const u_int32_t jerrno::JERR_FCNTL_WRITE        = 0x0401;
+const u_int32_t jerrno::JERR_FCNTL_CLOSE        = 0x0402;
+const u_int32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0403;
+const u_int32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0404;
+const u_int32_t jerrno::JERR_FCNTL_RDOFFSOVFL   = 0x0405;
 
-// class file_hdr
+// class rrfc
+const u_int32_t jerrno::JERR_RRFC_OPENRD        = 0x0500;
 
 // class jrec, enq_rec, deq_rec, txn_rec
 const u_int32_t jerrno::JERR_JREC_BADRECHDR     = 0x0600;
@@ -162,7 +162,6 @@
     _err_map[JERR_JDIR_BADFTYPE] = "JERR_JDIR_BADFTYPE: Bad or unknown file type (stat mode).";
 
     // class fcntl
-    _err_map[JERR_FCNTL_OPENRD] = "JERR_FCNTL_OPENRD: Unable to open file for read.";
     _err_map[JERR_FCNTL_OPENWR] = "JERR_FCNTL_OPENWR: Unable to open file for write.";
     _err_map[JERR_FCNTL_WRITE] = "JERR_FCNTL_WRITE: Unable to write to file.";
     _err_map[JERR_FCNTL_CLOSE] = "JERR_FCNTL_CLOSE: File close failed.";
@@ -174,7 +173,8 @@
             "Attempted increase read offset past write offset.";
 
 
-    // class file_hdr
+    // class rrfc
+    _err_map[JERR_RRFC_OPENRD] = "JERR_RRFC_OPENRD: Unable to open file for read.";
 
     // class jrec, enq_rec, deq_rec, txn_rec
     _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header.";

Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -93,7 +93,6 @@
         static const u_int32_t JERR_JDIR_BADFTYPE;      ///< Bad or unknown file type (stat mode)
 
         // class fcntl
-        static const u_int32_t JERR_FCNTL_OPENRD;       ///< Unable to open file for read
         static const u_int32_t JERR_FCNTL_OPENWR;       ///< Unable to open file for write
         static const u_int32_t JERR_FCNTL_WRITE;        ///< Unable to write to file
         static const u_int32_t JERR_FCNTL_CLOSE;        ///< File close failed
@@ -101,7 +100,8 @@
         static const u_int32_t JERR_FCNTL_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs
         static const u_int32_t JERR_FCNTL_RDOFFSOVFL;   ///< Increased read offs past write offs
 
-        // class file_hdr
+        // class rrfc
+        static const u_int32_t JERR_RRFC_OPENRD;        ///< Unable to open file for read
 
         // class jrec, enq_rec, deq_rec, txn_rec
         static const u_int32_t JERR_JREC_BADRECHDR;       ///< Invalid data record header

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -89,6 +89,18 @@
                 _enq_cnt_list.resize(num_jfiles, 0);
             }
 
+            // Find first fid with enqueued records
+            u_int16_t ffid(const u_int16_t num_jfiles)
+            {
+                u_int16_t index = _ffid;
+                while (index != _lfid && _enq_cnt_list[index] == 0)
+                {
+                    if (++index >= num_jfiles)
+                        index = 0;
+                }
+                return index;
+            }
+
             std::string to_string(std::string& jid)
             {
                 std::ostringstream oss;

Added: store/trunk/cpp/lib/jrnl/rfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.cpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/rfc.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -0,0 +1,84 @@
+/**
+* \file rfc.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class mrg::journal::rfc (rotating
+* file controller). See comments in file rfc.hpp for details.
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "jrnl/rfc.hpp"
+
+#include <cassert>
+
+namespace mrg
+{
+namespace journal
+{
+
+rfc::rfc(): _nfiles(0), _fc_arr(0), _fc_index(0), _curr_fc(0)
+{}
+
+rfc::~rfc()
+{}
+
+void
+rfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
+{
+    _nfiles = nfiles;
+    _fc_arr = fc_arr;
+}
+
+void
+rfc::reset()
+{
+    unset_findex();
+    _nfiles = 0;
+    _fc_arr = 0;
+}
+
+void
+rfc::set_findex(const u_int16_t fc_index)
+{
+    _fc_index = fc_index;
+    _curr_fc = _fc_arr[_fc_index];
+    _curr_fc->rd_reset();
+}
+
+void
+rfc::unset_findex()
+{
+    _fc_index = 0;
+    _curr_fc = 0;
+}
+
+fcntl*
+rfc::file_controller(const u_int16_t pg_index) const
+{
+    assert(pg_index < _nfiles);
+    return _fc_arr[pg_index];
+}
+
+} // namespace journal
+} // namespace mrg

Added: store/trunk/cpp/lib/jrnl/rfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/rfc.hpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -0,0 +1,199 @@
+/**
+* \file rfc.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class mrg::journal::rfc (rotating
+* file controller). See class documentation for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef mrg_journal_rfc_hpp
+#define mrg_journal_rfc_hpp
+
+namespace mrg
+{
+namespace journal
+{
+class rfc;
+}
+}
+
+#include "jrnl/fcntl.hpp"
+#include "jrnl/enums.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+    /**
+    * \class rfc
+    * \brief Rotating File Controller (rfc) - Class to handle the manangement of an array of file controllers (fcntl)
+    *     objects for use in a circular disk buffer (journal). Each fcntl object corresponds to a file in the journal.
+    *
+    * The following states exist in this class:
+    *
+    * <pre>
+    *                                                          is_initialized()  is_active()
+    *                  +===+                    _nfiles == 0
+    *      +---------->|   |     Uninitialized: _fc_arr == 0          F              F
+    *      |       +-->+===+ --+                _curr_fc == 0
+    *      |       |           |
+    *      |       |           |
+    *      |     reset()   initialize()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _nfiles > 0
+    *    reset()       |   |     Inactive:      _fc_arr != 0          T              F
+    *      |       +-->+===+ --+                _curr_fc == 0
+    *      |       |           |
+    *      |       |           |
+    *      | unset_findex() set_findex()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _nfiles > 0
+    *      +---------- |   |     Active:        _fc_arr != 0          T              T
+    *                  +===+                    _curr_fc != 0
+    * </pre>
+    *
+    * The Uninitialized state is where the class starts after construction. Once the number of files is known and
+    * the array of file controllers allocated, then initialize() is called to set these, causing the state to move
+    * to Inactive.
+    *
+    * Once the index of the active file is known, then calling activate() will set the index and internal pointer
+    * to the currently active file controller. This moves the state to Active.
+    *
+    * Note that if the number of files change, then the object will have to be reset and reinitialized with a new array
+    * of fcntl objects of the appropriate size.
+    */
+    class rfc
+    {
+    protected:
+        u_int16_t _nfiles;      ///< Number of data files
+        fcntl**   _fc_arr;      ///< Array of pointers to data file controllers
+        u_int16_t _fc_index;    ///< Index of current file controller
+        fcntl*    _curr_fc;     ///< Pointer to current file controller
+
+    public:
+        rfc();
+        virtual ~rfc();
+
+        /**
+        * \brief Initialize the controller, moving from state Uninitialized to Inactive. The main function of
+        *     initialize() is to set the number of files and the pointer to the fcntl array.
+        * \param nfiles Number of files in the rotating file group.
+        * \param fc_arr Pointer to an array of file controller, each of which correspond to one of
+        *     the physical journal files.
+        */
+        virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
+
+        /**
+        * \brief Reset the controller to Uninitialized state, usually called when the journal is stopped. Once called,
+        *     initialize() must be called to reuse an instance.
+        */
+        virtual void reset();
+
+        /**
+        * /brief Check initialization state: true = Not Uninitialized, ie Initialized or Active; false = Uninitialized.
+        */
+        virtual inline bool is_initialized() const { return _nfiles == 0 || _fc_arr == 0; }
+
+        /**
+        * /brief Sets the current file index and active fcntl object. Moves to state Active.
+        */
+        virtual void set_findex(const u_int16_t fc_index);
+
+        /**
+        * /brief Nulls the current file index and active fcntl pointer, moves to state Inactive.
+        */
+        virtual void unset_findex();
+
+        /**
+        * /brief Check the file handle state: true = Active; false = Not Active, ie Inactive or Uninitialized.
+        */
+        virtual inline bool is_active() const { return _curr_fc != 0; }
+
+        /**
+        * \brief Rotate active file controller to next file in rotating file group.
+        * \exception jerrno::JERR__NINIT if called before calling initialize().
+        */
+        virtual iores rotate() = 0;
+
+        /**
+        * \brief Returns the index of the currently active file within the rotating file group.
+        */
+        inline u_int16_t index() const { return _fc_index; }
+
+        /**
+        * \brief Returns the currently active journal file controller within the rotating file group.
+        */
+        inline fcntl* file_controller() const { return _curr_fc; }
+
+        /**
+        * \brief Returns the journal file controller for the given page index within the rotating file group.
+        */
+        fcntl* file_controller(const u_int16_t pg_index) const;
+
+        /**
+        * \brief Returns the currently active file id (fid)
+        */
+        inline u_int16_t fid() const { return _curr_fc->fid(); }
+
+        // Convenience access methods to current file controller
+        // Note: Call only when in open state
+
+        virtual inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
+        virtual inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
+        virtual inline u_int32_t incr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->incr_enqcnt(); }
+        virtual inline u_int32_t add_enqcnt(u_int32_t a) { return _curr_fc->add_enqcnt(a); }
+        virtual inline u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a) { return _fc_arr[fid]->add_enqcnt(a); }
+        virtual inline u_int32_t decr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->decr_enqcnt(); }
+        virtual inline u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s) { return _fc_arr[fid]->subtr_enqcnt(s); }
+
+        virtual inline u_int32_t subm_cnt_dblks() const = 0;
+        virtual inline std::size_t subm_offs() const = 0;
+        virtual inline u_int32_t add_subm_cnt_dblks(u_int32_t a) = 0;
+
+        virtual inline u_int32_t cmpl_cnt_dblks() const = 0;
+        virtual inline std::size_t cmpl_offs() const = 0;
+        virtual inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) = 0;
+
+        virtual inline bool is_void() const = 0;
+        virtual inline bool is_empty() const = 0;
+        virtual inline u_int32_t remaining_dblks() const = 0;
+        virtual inline bool is_full() const = 0;
+        virtual inline bool is_compl() const = 0;
+        virtual inline u_int32_t aio_outstanding_dblks() const = 0;
+        virtual inline bool file_rotate() const = 0;
+        
+        // Debug aid
+        virtual std::string status_str() const = 0;
+    }; // class rfc
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_rfc_hpp

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -46,7 +46,6 @@
         pmgr(jc, emap, tmap),
         _rrfc(rrfc),
         _hdr(),
-        _valid(false),
         _fhdr_buffer(0),
         _fhdr_aio_cb_ptr(0),
         _fhdr_rd_outstanding(false)
@@ -303,8 +302,6 @@
             std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
             _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
 
-//             std::size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
-//             _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
             u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
             _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
             u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
@@ -314,7 +311,7 @@
             _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
 
             _fhdr_rd_outstanding = false;
-            _valid = true;
+            _rrfc.set_valid();
         }
     }
 
@@ -331,12 +328,11 @@
 void
 rmgr::invalidate()
 {
-    if (_valid)
+    if (_rrfc.is_valid())
     {
-        _valid = false;
         for (int i=0; i<_cache_num_pages; i++)
             _page_cb_arr[i]._state = UNUSED;
-        _rrfc.reset();
+        _rrfc.unset_findex();
     	_pg_offset_dblks = 0;
     }
 }
@@ -353,13 +349,8 @@
     if (_aio_evt_rem)
         get_events();
 
-    if (!_valid)
-    {
-        if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
-            return RHM_IORES_EMPTY;
-        else
+    if (!_rrfc.is_valid())
             return RHM_IORES_RCINVALID;
-    }
 
     // block reads until outstanding file header read completes as fro is needed to read
     if (_fhdr_rd_outstanding)
@@ -564,18 +555,18 @@
         return RHM_IORES_SUCCESS;
     for (int16_t i=0; i<num_uninit; i++)
     {
-        if (_rrfc.is_void()) // Nothing to do; this file not yet written to
-            break;
 
-        if (!_valid)
+        if (!_rrfc.is_valid())
         {
-            u_int16_t fid = _jc->get_earliest_fid();
+            _jc->get_earliest_fid(); // calls _rrfc.set_findex()
             // If this file has not yet been written to, return RHM_IORES_EMPTY
-            if (_rrfc.file_controller(fid)->rd_void())
+            if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
                 return RHM_IORES_EMPTY;
-            init_file_header_read(fid);
+            init_file_header_read();
             break;
         }
+        if (_rrfc.is_void()) // Nothing to do; this file not yet written to
+            break;
         
         if (_rrfc.subm_offs() == 0)
         {
@@ -657,9 +648,9 @@
 }
 
 void
-rmgr::init_file_header_read(u_int16_t fid)
+rmgr::init_file_header_read()
 {
-    int rfh = _rrfc.file_controller(fid)->rd_fh();
+    int rfh = _rrfc.fh();
     aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
     if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
         throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
@@ -673,136 +664,13 @@
 rmgr::get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
         const void** const data, bool auto_discard)
 {
-    iores res = pre_read_check(0);
-    if (res != RHM_IORES_SUCCESS)
-        return res;
-
-    _hdr.reset();
-    // Read header, determine next record type
-    while (true)
-    {
-        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
-        {
-            aio_cycle();   // check if any AIOs have returned
-            return RHM_IORES_EMPTY;
-        }
-        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
-        {
-            aio_cycle();
-            return RHM_IORES_PAGE_AIOWAIT;
-        }
-        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
-                (_pg_offset_dblks * JRNL_DBLK_SIZE));
-        std::memcpy(&_hdr, rptr, sizeof(hdr));
-        switch (_hdr._magic)
-        {
-            case RHM_JDAT_ENQ_MAGIC:
-                {
-                    std::size_t xid_size = *((std::size_t*)((char*)rptr + sizeof(hdr) 
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-                            + sizeof(u_int32_t) // filler0
-#endif
-                            ));
-                    std::size_t data_size = *((std::size_t*)((char*)rptr + sizeof(hdr) + sizeof(u_int64_t)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-                            + sizeof(u_int32_t) // filler1
-#endif
-                            ));
-                    // TODO: Check if transaction is still in transaction map. If so, block read
-                    // (unless in recovery, in whcih case return info normally
-//                     std::string xid = ?? (decode xid here)
-//                     if (xid_size && !readonly && tx_map.exists(xid))
-//                         return RHM_IORES_TXPENDING;
-                    rid = _hdr._rid;
-                    dsize = data_size;
-
-                    // Analyze how much of message is available
-                    void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
-                    void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * _cache_num_pages;
-                    u_int16_t data_start_pg_index = _pg_index;
-                    u_int16_t data_start_pg_index = _pg_index;
-                    for (u_int16_t i=0; i<_cache_num_pages; i++)
-                    {
-                        pi = (i + _pg_index) % _cache_num_pages;
-                        if (data_ptr >= _page_ptr_arr[pi] &&
-                                data_ptr < (char*)_page_ptr_arr[pi] + _cache_pgsize_sblks * _sblksize)
-                            data_end_pg_index = pi; // found start page index
-                        
-                    }
-                    u_int16_t data_end_pg_index;
-                    u_int16_t last_pg_avail_index;
-                    
-                    void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
-                    void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * _cache_num_pages;
-                    if (data_ptr >= page_end_ptr) // folded, go back to first page...
-                        data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
-                    void* data_end_ptr = (char*)data_ptr + data_size;
-                    if (data_end_ptr >= page_end_ptr) // folded, go back to first page...
-                        data_end_ptr = (char*)_page_base_ptr + data_end_ptr - page_end_ptr;
-                    dsize_avail = ??;
-                    if(data_ptr folded)
-                    else
-                        *data = data_ptr;
-                }
-                break;
-            case RHM_JDAT_DEQ_MAGIC:
-                consume_deq();
-                break;
-            case RHM_JDAT_EMPTY_MAGIC:
-                consume_filler();
-                break;
-            default:
-                std::ostringstream oss;
-                oss << std::hex << std::setfill('0');
-                oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
-                throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "get");
-        } // switch(_hdr._magic)
-    } // while
+    return RHM_IORES_SUCCESS;
 }
 
 const iores
 rmgr::discard(data_tok* dtokp)
 {
-    iores res = pre_read_check(dtokp);
-    if (res != RHM_IORES_SUCCESS)
-        return res;
-
-    _hdr.reset();
-    // Read header, determine next record type
-    while (true)
-    {
-        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
-        {
-            aio_cycle();   // check if any AIOs have returned
-            return RHM_IORES_EMPTY;
-        }
-        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
-        {
-            aio_cycle();
-            return RHM_IORES_PAGE_AIOWAIT;
-        }
-        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
-                (_pg_offset_dblks * JRNL_DBLK_SIZE));
-        std::memcpy(&_hdr, rptr, sizeof(hdr));
-        switch (_hdr._magic)
-        {
-            case RHM_JDAT_ENQ_MAGIC:
-                {
-                }
-                break;
-            case RHM_JDAT_DEQ_MAGIC:
-                consume_deq();
-                break;
-            case RHM_JDAT_EMPTY_MAGIC:
-                consume_filler();
-                break;
-            default:
-                std::ostringstream oss;
-                oss << std::hex << std::setfill('0');
-                oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
-                throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "discard");
-        } // switch
-    } // while
+    return RHM_IORES_SUCCESS;
 }
 */
 

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -65,7 +65,6 @@
         rec_hdr _hdr;               ///< Header used to determind record type
         rd_aio_cb _cb;              ///< Callback function pointer for AIO events
 
-        bool _valid;                ///< Flag is true when read pages contain vailid data
         void* _fhdr_buffer;         ///< Buffer used for fhdr reads
         aio_cb* _fhdr_aio_cb_ptr;   ///< iocb pointer for fhdr reads
         file_hdr _fhdr;             ///< file header instance for reading file headers
@@ -81,8 +80,7 @@
                 bool ignore_pending_txns);
         u_int32_t get_events(page_state state = AIO_COMPLETE);
         void recover_complete();
-        inline bool is_valid() const {return _valid; }
-        inline iores synchronize() { if (!_valid) return aio_cycle(); return RHM_IORES_SUCCESS; }
+        inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS; return aio_cycle(); }
         void invalidate();
         
         /* TODO (if required)
@@ -106,7 +104,7 @@
         u_int32_t dblks_rem() const;
         void set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp,
                 std::size_t& xidsize);
-        void init_file_header_read(u_int16_t fid);
+        void init_file_header_read();
     };
 
 } // namespace journal

Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -31,7 +31,8 @@
 
 #include "jrnl/rrfc.hpp"
 
-#include <cassert>
+#include <cerrno>
+#include <fcntl.h>
 #include "jrnl/jerrno.hpp"
 #include "jrnl/jexception.hpp"
 
@@ -40,44 +41,44 @@
 namespace journal
 {
 
-rrfc::rrfc():
-        _nfiles(0),
-        _fc_arr(0),
-        _fc_index(0),
-        _curr_fc(0)
+rrfc::rrfc(): rfc(), _fh(-1), _valid(false)
 {}
 
-rrfc::~rrfc() {}
+rrfc::~rrfc()
+{
+    close_fh();
+}
 
 void
-rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp)
+rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
 {
-    _nfiles = nfiles;
-    _fc_arr = fc_arr;
-    if (rdp)
-    {
-        // Find first file with enqueued records
-        u_int16_t index = rdp->_ffid;
-        while (index != rdp->_lfid && rdp->_enq_cnt_list[index] == 0)
-        {
-            if (++index >= _nfiles)
-                index = 0;
-        }
-        reset(index);
-    }
-    else
-        reset(0);
+    rfc::initialize(nfiles, fc_arr);
+    _valid = false;    
 }
 
 void
-rrfc::reset(u_int16_t fc_index)
+rrfc::reset()
 {
-    _fc_index = fc_index;
-    _curr_fc = _fc_arr[_fc_index];
-    _curr_fc->rd_reset();
+    unset_findex();
+    rfc::reset();
 }
 
-bool
+void
+rrfc::set_findex(const u_int16_t fc_index)
+{
+    rfc::set_findex(fc_index);
+    open_fh(_curr_fc->fname());
+}
+
+void
+rrfc::unset_findex()
+{
+    set_invalid();
+    close_fh();
+    rfc::unset_findex();
+}
+
+iores
 rrfc::rotate()
 {
     if (!_nfiles)
@@ -88,16 +89,11 @@
     fcntl* next_fc = _fc_arr[next_fc_index];
     _fc_index = next_fc_index;
     _curr_fc = next_fc;
-    return true;
+    open_fh(_curr_fc->fname());
+    return RHM_IORES_SUCCESS;
 }
 
-fcntl*
-rrfc::file_controller(u_int16_t pg_index) const
-{
-    assert(pg_index < _nfiles);
-    return _fc_arr[pg_index];
-}
-
+// TODO: update this to reflect all status data
 std::string
 rrfc::status_str() const
 {
@@ -106,5 +102,30 @@
     return oss.str();
 }
 
+// === protected functions ===
+
+void
+rrfc::open_fh(const std::string& fn)
+{
+    close_fh();
+    _fh = ::open(fn.c_str(), O_RDONLY | O_DIRECT);
+    if (_fh < 0)
+    {
+        std::ostringstream oss;
+        oss << "file=\"" << fn << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_RRFC_OPENRD, oss.str(), "rrfc", "open_fh");
+    }
+}
+
+void
+rrfc::close_fh()
+{
+    if (_fh >= 0)
+    {
+        ::close(_fh);
+        _fh = -1;
+    }
+}
+
 } // namespace journal
 } // namespace mrg

Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -8,7 +8,7 @@
 *
 * \author Kim van der Riet
 *
-* Copyright 2007 Red Hat, Inc.
+* Copyright 2007, 2008 Red Hat, Inc.
 *
 * This file is part of Red Hat Messaging.
 *
@@ -41,8 +41,8 @@
 }
 }
 
-#include <cstddef>
 #include "jrnl/fcntl.hpp"
+#include "jrnl/rfc.hpp"
 
 namespace mrg
 {
@@ -51,93 +51,130 @@
 
     /**
     * \class rrfc
-    * \brief Class to handle read manangement of a journal rotating file controller.
+    * \brief Read Rotating File Controller (rrfc) - Subclassed from pure virtual class rfc. Used to control the read
+    *     pipeline in a rotating file buffer or journal. See class rfc for further details.
+    *
+    * The states that exist in this class are identical to class rfc from which it inherits, but in addition, the value
+    * of the read file handle _fh is also considered. The calls to set_findex also opens the file handle _fh to the
+    * active file for reading. Similarly, unset_findex() closes this file handle.
+    *
+    * <pre>
+    *                                                          is_initialized()  is_active()
+    *                  +===+                    _nfiles == 0
+    *      +---------->|   |     Uninitialized: _fc_arr == 0          F              F
+    *      |       +-->+===+ --+                _curr_fc == 0
+    *      |       |           |                _fh == -1
+    *      |       |           |
+    *      |     reset()   initialize()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _nfiles > 0
+    *    reset()       |   |     Inactive:      _fc_arr != 0          T              F
+    *      |       +-->+===+ --+                _curr_fc == 0
+    *      |       |           |                _fh == -1
+    *      |       |           |
+    *      | unset_findex() set_findex()
+    *      |       |           |
+    *      |       |           |
+    *      |       +-- +===+<--+                _nfiles > 0
+    *      +---------- |   |     Active:        _fc_arr != 0          T              T
+    *                  +===+                    _curr_fc != 0
+    *                                           _fh >= 0
+    * </pre>
+    *
+    * In adition to the states above, class rrfc contains a validity flag. This is operated indepenedently of the state
+    * machine. This flag (_valid) indicates when the read buffers are valid for reading. This is not strictly a state,
+    * but simply a flag used to keep track of the status, and is set/unset with calls to set_valid() and set_invalid()
+    * respectively.
     */
-    class rrfc
+    class rrfc : public rfc
     {
     protected:
-        u_int16_t _nfiles;          ///< Number of data files
-        fcntl** _fc_arr;            ///< Array of pointers to data file controllers
-        u_int16_t _fc_index;        ///< Index of current file controller
-        fcntl* _curr_fc;            ///< Pointer to current file controller
+        int _fh;                ///< Read file handle
+        bool _valid;            ///< Flag is true when read pages contain vailid data
 
     public:
         rrfc();
         virtual ~rrfc();
 
         /**
-        * \brief Initialize the controller.
+        * \brief Initialize the controller, moving from state Uninitialized to Initialized. The main function of
+        *     initialize() is to set the number of files and the pointer to the fcntl array.
         * \param nfiles Number of files in the rotating file group.
         * \param fc_arr Pointer to an array of file controller, each of which correspond to one of
         *     the physical journal files.
-        * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
-        *     0 (NULL).
         */
-        virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp = 0);
+        virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
 
-        void reset(u_int16_t fc_index = 0);
+        /**
+        * \brief Reset the controller to Uninitialized state, usually called when the journal is stopped. Once called,
+        *     initialize() must be called to reuse an instance.
+        */
+        void reset();
 
         /**
-        * \brief Rotate active file controller to next file in rotating file group.
-        * \exception jerrno::JERR__NINIT if called before calling initialize().
+        * /brief Opens the file handle for reading a particular fid. Moves to state open.
         */
-        bool rotate();
+        virtual void set_findex(const u_int16_t fc_index);
 
         /**
-        * \brief Returns the index of the currently active file within the rotating
-        *     file group.
+        * /brief Closes the read file handle and nulls the active fcntl pointer. Moves to state closed.
         */
-        inline u_int16_t index() const { return _fc_index; }
+        virtual void unset_findex();
 
         /**
-        * \brief Returns the currently active journal file controller within the rotating
-        *     file group.
+        * /brief Check the state: true = open; false = closed.
         */
-        inline fcntl* file_controller() const { return _curr_fc; }
+        virtual inline bool is_active() const { return _curr_fc != 0 && _fh >= 0; }
 
         /**
-        * \brief Returns the journal file controller for the given page index within the rotating
-        *     file group.
+        * /brief Sets the validity flag which indicates that the read buffers contain valid data for reading.
         */
-        fcntl* file_controller(u_int16_t pg_index) const;
+        inline void set_invalid() { _valid = false; }
 
-        // Convenience access methods to current file controller
+        /**
+        * /brief Resets the validity flag wich indicates that the read buffers are no longer synchronized and cannot
+        *     be read whithout resynchronization.
+        */
+        inline void set_valid() { _valid = true; }
 
-        inline u_int16_t fid() const { return _curr_fc->fid(); }
-        inline int fh() const { return _curr_fc->rd_fh(); }
-        inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
-        inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
-        inline u_int32_t incr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->incr_enqcnt(); }
-        inline u_int32_t add_enqcnt(u_int32_t a) { return _curr_fc->add_enqcnt(a); }
-        inline u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a)
-                { return _fc_arr[fid]->add_enqcnt(a); }
-        inline u_int32_t decr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->decr_enqcnt(); }
-        inline u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s)
-                { return _fc_arr[fid]->subtr_enqcnt(s); }
+        /**
+        * /brief Checks the read buffer validity status: true = valid, can be read; false = invalid, synchronization
+        *     required.
+        */
+        inline bool is_valid() const { return _valid; }
 
+        /**
+        * \brief Rotate active file controller to next file in rotating file group.
+        * \exception jerrno::JERR__NINIT if called before calling initialize().
+        */
+        iores rotate();
+
+        inline int fh() const { return _fh; }
+
         inline u_int32_t subm_cnt_dblks() const { return _curr_fc->rd_subm_cnt_dblks(); }
         inline std::size_t subm_offs() const { return _curr_fc->rd_subm_offs(); }
-        inline u_int32_t add_subm_cnt_dblks(u_int32_t a)
-                { return _curr_fc->add_rd_subm_cnt_dblks(a); }
+        inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return _curr_fc->add_rd_subm_cnt_dblks(a); }
 
         inline u_int32_t cmpl_cnt_dblks() const { return _curr_fc->rd_cmpl_cnt_dblks(); }
         inline std::size_t cmpl_offs() const { return _curr_fc->rd_cmpl_offs(); }
-        inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
-                { return _curr_fc->add_rd_cmpl_cnt_dblks(a); }
+        inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return _curr_fc->add_rd_cmpl_cnt_dblks(a); }
 
         inline bool is_void() const { return _curr_fc->rd_void(); }
         inline bool is_empty() const { return _curr_fc->rd_empty(); }
-        inline u_int32_t remaining_dblks() { return _curr_fc->rd_remaining_dblks(); }
+        inline u_int32_t remaining_dblks() const { return _curr_fc->rd_remaining_dblks(); }
         inline bool is_full() const { return _curr_fc->is_rd_full(); }
         inline bool is_compl() const { return _curr_fc->is_rd_compl(); }
-        inline u_int32_t aio_outstanding_dblks() const
-                { return _curr_fc->rd_aio_outstanding_dblks(); }
+        inline u_int32_t aio_outstanding_dblks() const { return _curr_fc->rd_aio_outstanding_dblks(); }
         inline bool file_rotate() const { return _curr_fc->rd_file_rotate(); }
-        inline bool is_wr_aio_outstanding() const
-                { return _curr_fc->wr_aio_outstanding_dblks() > 0; }
+        inline bool is_wr_aio_outstanding() const { return _curr_fc->wr_aio_outstanding_dblks() > 0; }
         
         // Debug aid
         std::string status_str() const;
+
+    protected:
+        void open_fh(const std::string& fn);
+        void close_fh();
     }; // class rrfc
 
 } // namespace journal

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -935,9 +935,9 @@
         ) const
 {
     // Check status of current file
-    if (!_wrfc.is_reset())
+    if (!_wrfc.is_wr_reset())
     {
-        if (!_wrfc.reset())
+        if (!_wrfc.wr_reset())
             return RHM_IORES_FULL;
     }
 

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -40,7 +40,7 @@
 {
 
 wrfc::wrfc():
-        rrfc(),
+        rfc(),
         _fsize_sblks(0),
         _fsize_dblks(0),
         _enq_cap_offs_dblks(0),
@@ -55,7 +55,8 @@
         _frot(true)
 {}
 
-wrfc::~wrfc() {}
+wrfc::~wrfc()
+{}
 
 void
 wrfc::initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, fcntl** fc_arr, rcvdat* rdp)
@@ -76,7 +77,8 @@
     }
     else
     {
-        rrfc::initialize(nfiles, fc_arr);
+        rfc::initialize(nfiles, fc_arr);
+        rfc::set_findex(0);
 #ifdef DRHM_TESTVALS
         _rid = u_int64_t(0xffeeddcc) << 32;
 #else
@@ -93,8 +95,7 @@
         _enq_cap_offs_dblks = _fsize_dblks;
 }
 
-iores
-wrfc::rotate()
+iores wrfc::rotate()
 {
     if (!_nfiles)
         throw jexception(jerrno::JERR__NINIT, "wrfc", "rotate");
@@ -108,13 +109,12 @@
     _curr_fc = _fc_arr[_fc_index];
     if (_curr_fc->aio_cnt())
         return RHM_IORES_FILE_AIOWAIT;
-    if (!reset()) //Checks if file is still in use (ie not fully dequeued yet)
+    if (!wr_reset()) //Checks if file is still in use (ie not fully dequeued yet)
         return RHM_IORES_FULL;
     return RHM_IORES_SUCCESS;
 }
 
-u_int16_t
-wrfc::earliest_index() const
+u_int16_t wrfc::earliest_index() const
 {
     if (_frot)
         return 0;
@@ -125,31 +125,6 @@
 }
 
 bool
-wrfc::reset()
-{
-    _reset_ok = _curr_fc->reset(); // returns false if full (ie file still contains enqueued recs)
-    return _reset_ok;
-}
-
-
-/**
-* The following routine finds whether the next write will take the write pointer to beyond the
-* enqueue limit threshold. The following illustrates how this is achieved.
-*
-* Current file index: 4                         +---+----------+
-* X's mark still-enqueued records               |msg| 1-thresh |
-* msg = current msg size + unwritten cache      +---+----------+
-* thresh = JRNL_ENQ_THRESHOLD as a fraction     ^              V
-*            +-------+-------+-------+-------+--+----+-------+-+-----+-------+
-* file num ->|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |
-* enq recs ->| X  XX |XX XXX |XX XXXX|XXXXXXX|XX     |       |       |     X |
-*            +-------+-------+-------+-------+--+----+-------+-+-----+-------+
-*                                               ^        ^       ^
-*                                  subm_dblks --+        |       |
-*                                                      These files must be free of enqueues
-*                                                      If not, return true.
-*/
-bool
 wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
 {
     u_int32_t subm_dblks = subm_cnt_dblks(); // includes file hdr if > 0
@@ -176,6 +151,13 @@
     return findex != _fc_index && in_use;
 }
 
+bool wrfc::wr_reset()
+{
+    _reset_ok = _curr_fc->reset(); // returns false if full (ie file still contains enqueued recs)
+    return _reset_ok;
+}
+
+// TODO: update this to reflect all status data
 std::string
 wrfc::status_str() const
 {

Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -28,8 +28,8 @@
 * The GNU Lesser General Public License is available in the file COPYING.
 */
 
-#ifndef mrg_journal_rfc_hpp
-#define mrg_journal_rfc_hpp
+#ifndef mrg_journal_wrfc_hpp
+#define mrg_journal_wrfc_hpp
 
 namespace mrg
 {
@@ -52,7 +52,7 @@
     * \class wrfc
     * \brief Class to handle write manangement of a journal rotating file controller.
     */
-    class wrfc : public rrfc
+    class wrfc : public rfc
     {
     private:
         u_int32_t _fsize_sblks;         ///< Size of journal files in sblks
@@ -62,6 +62,7 @@
         bool _reset_ok;                 ///< Flag set when reset succeeds
         bool _owi;                      ///< Overwrite indicator
         bool _frot;                     ///< Flag is true for first rotation, false otherwise
+
     public:
         wrfc();
         virtual ~wrfc();
@@ -91,10 +92,33 @@
         */
         u_int16_t earliest_index() const;
 
+        /**
+        * \brief Determines if a paroposed write would cause the enqueue threshold to be exceeded.
+        *
+        * The following routine finds whether the next write will take the write pointer to beyond the
+        * enqueue limit threshold. The following illustrates how this is achieved.
+        * <pre>
+        * Current file index: 4                         +---+----------+
+        * X's mark still-enqueued records               |msg| 1-thresh |
+        * msg = current msg size + unwritten cache      +---+----------+
+        * thresh = JRNL_ENQ_THRESHOLD as a fraction     ^              V
+        *            +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+        * file num ->|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |
+        * enq recs ->| X  XX |XX XXX |XX XXXX|XXXXXXX|XX     |       |       |     X |
+        *            +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+        *                                               ^        ^       ^
+        *                                  subm_dblks --+        |       |
+        *                                                      These files must be free of enqueues
+        *                                                      If not, return true.
+        * </pre>
+        * \param enq_dsize_dblks Proposed size of write in dblocks
+        */
+        bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
+
         inline u_int64_t rid() const { return _rid; }
         inline u_int64_t get_incr_rid() { return _rid++; }
-        bool reset();
-        inline bool is_reset() const { return _reset_ok; }
+        bool wr_reset();
+        inline bool is_wr_reset() const { return _reset_ok; }
         inline bool owi() const { return _owi; }
         inline bool frot() const { return _frot; }
 
@@ -104,13 +128,11 @@
 
         inline u_int32_t subm_cnt_dblks() const { return _curr_fc->wr_subm_cnt_dblks(); }
         inline std::size_t subm_offs() const { return _curr_fc->wr_subm_offs(); }
-        inline u_int32_t add_subm_cnt_dblks(u_int32_t a)
-                { return _curr_fc->add_wr_subm_cnt_dblks(a); }
+        inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return _curr_fc->add_wr_subm_cnt_dblks(a); }
 
         inline u_int32_t cmpl_cnt_dblks() const { return _curr_fc->wr_cmpl_cnt_dblks(); }
         inline std::size_t cmpl_offs() const { return _curr_fc->wr_cmpl_offs(); }
-        inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
-                { return _curr_fc->add_wr_cmpl_cnt_dblks(a); }
+        inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return _curr_fc->add_wr_cmpl_cnt_dblks(a); }
 
         inline u_int16_t aio_cnt() const { return _curr_fc->aio_cnt(); }
         inline u_int16_t incr_aio_cnt() { return _curr_fc->incr_aio_cnt(); }
@@ -121,16 +143,14 @@
         inline u_int32_t remaining_dblks() const { return _curr_fc->wr_remaining_dblks(); }
         inline bool is_full() const { return _curr_fc->is_wr_full(); };
         inline bool is_compl() const { return _curr_fc->is_wr_compl(); };
-        inline u_int32_t aio_outstanding_dblks() const
-                { return _curr_fc->wr_aio_outstanding_dblks(); }
+        inline u_int32_t aio_outstanding_dblks() const { return _curr_fc->wr_aio_outstanding_dblks(); }
         inline bool file_rotate() const { return _curr_fc->wr_file_rotate(); }
-        bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
-        
+       
         // Debug aid
         std::string status_str() const;
-    }; // class wrfc
+    };
 
 } // namespace journal
 } // namespace mrg
 
-#endif // ifndef mrg_journal_rfc_hpp
+#endif // ifndef mrg_journal_wrfc_hpp

Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp	2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp	2008-10-01 19:42:31 UTC (rev 2561)
@@ -98,6 +98,39 @@
     cout << "ok" << endl;
 }
 
+QPID_AUTO_TEST_CASE(multi_page_enqueue_read_dequeue_block)
+{
+    string test_name = get_test_name(test_filename, "multi_page_read");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        test_jrnl jc(test_name, test_dir, test_name);
+        jc.initialize(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS);
+        for (int m=0; m<NUM_MSGS*2000; m++)
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        for (int m=0; m<NUM_MSGS*2000; m++)
+        {
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+            BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+            BOOST_CHECK_EQUAL(transientFlag, false);
+            BOOST_CHECK_EQUAL(externalFlag, false);
+        }
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+        for (int m=0; m<NUM_MSGS*2000; m++)
+            deq_msg(jc, m);
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_CASE(enqueue_read_dequeue_interleaved)
 {
     string test_name = get_test_name(test_filename, "enqueue_read_dequeue_interleaved");
@@ -171,6 +204,48 @@
     cout << "ok" << endl;
 }
 
+QPID_AUTO_TEST_CASE(multi_page_enqueue_recovered_read_dequeue)
+{
+    string test_name = get_test_name(test_filename, "multi_page_enqueue_recovered_read_dequeue");
+    try
+    {
+        {
+            string msg;
+
+            test_jrnl jc(test_name, test_dir, test_name);
+            jc.initialize(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS);
+            for (int m=0; m<NUM_MSGS*2000; m++)
+                enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        }
+        {
+            string msg;
+            u_int64_t hrid;
+            string rmsg;
+            string xid;
+            bool transientFlag;
+            bool externalFlag;
+
+            test_jrnl jc(test_name, test_dir, test_name);
+            jc.recover(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS, 0, hrid);
+            BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS*2000 - 1));
+            jc.recover_complete();
+            for (int m=0; m<NUM_MSGS*2000; m++)
+            {
+                read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+                BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+                BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+                BOOST_CHECK_EQUAL(transientFlag, false);
+                BOOST_CHECK_EQUAL(externalFlag, false);
+            }
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+            for (int m=0; m<NUM_MSGS*2000; m++)
+                deq_msg(jc, m);
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_CASE(enqueue_recover_read_recovered_read_dequeue)
 {
     string test_name = get_test_name(test_filename, "enqueue_recover_read_recovered_read_dequeue");




More information about the rhmessaging-commits mailing list