[rhmessaging-commits] rhmessaging commits: r1804 - in store/trunk/cpp: tests/jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Mar 27 11:24:43 EDT 2008


Author: kpvdr
Date: 2008-03-27 11:24:41 -0400 (Thu, 27 Mar 2008)
New Revision: 1804

Modified:
   store/trunk/cpp/lib/jrnl/enums.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jinf.cpp
   store/trunk/cpp/lib/jrnl/jinf.hpp
   store/trunk/cpp/lib/jrnl/nlfh.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/rrfc.cpp
   store/trunk/cpp/lib/jrnl/rrfc.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
   store/trunk/cpp/lib/jrnl/wrfc.hpp
   store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Added read sync to journal. Fixes BZ430479. This fix should enable lazy-load to work.

Modified: store/trunk/cpp/lib/jrnl/enums.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enums.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/enums.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -47,6 +47,7 @@
         RHM_IORES_PAGE_AIOWAIT, ///< IO operation suspended - next page is waiting for AIO.
         RHM_IORES_FILE_AIOWAIT, ///< IO operation suspended - next file is waiting for AIO.
         RHM_IORES_EMPTY,        ///< During read operations, nothing further is available to read.
+        RHM_IORES_RCINVALID,    ///< Read page cache is invalid (ie obsolete or uninitialized)
         RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
         RHM_IORES_FULL,         ///< During write operations, the journal files are full.
         RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
@@ -63,6 +64,7 @@
             case RHM_IORES_PAGE_AIOWAIT: return "RHM_IORES_PAGE_AIOWAIT";
             case RHM_IORES_FILE_AIOWAIT: return "RHM_IORES_FILE_AIOWAIT";
             case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
+            case RHM_IORES_RCINVALID: return "RHM_IORES_RCINVALID";
             case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
             case RHM_IORES_FULL: return "RHM_IORES_FULL";
             case RHM_IORES_BUSY: return "RHM_IORES_BUSY";

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -151,7 +151,7 @@
     rcvr_janalyze(_rcvdat, prep_txn_list);
     highest_rid = _rcvdat._h_rid;
     if (_rcvdat._full)
-        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover_complete");
+        throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl", "recover");
 
     // Debug info; should be sent to log file
     // std::cout << _rcvdat.to_string(_jid) << std::endl << std::flush;
@@ -264,27 +264,46 @@
     }
 }
 
+/* TODO
 const iores
 jcntl::get_data_record(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
         const void** const data, bool auto_discard)
 {
     check_rstatus("get_data_record");
     return _rmgr.get(rid, dsize, dsize_avail, data, auto_discard);
-}
+} */
 
+/* TODO
 const iores
 jcntl::discard_data_record(data_tok* const dtokp)
 {
     check_rstatus("discard_data_record");
     return _rmgr.discard(dtokp);
-}
+} */
 
+#define MAX_RCINVALID_CNT 50
+#define RCINVALID_SLEEP_TIME_MS 2
 const iores
 jcntl::read_data_record(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
         bool& transient, bool& external, data_tok* const dtokp)
 {
     check_rstatus("read_data");
-    return _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
+    unsigned cnt = 0;
+    iores res;
+    do
+    {
+        res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
+        if (res == RHM_IORES_RCINVALID)
+        {
+            get_wr_events(); // check for outstanding write events
+            _rmgr.synchronize();
+            if (cnt > 0)
+                ::usleep(RCINVALID_SLEEP_TIME_MS * 1000);
+        }
+        cnt++;
+    }
+    while (cnt < MAX_RCINVALID_CNT && res == RHM_IORES_RCINVALID);
+    return res;
 }
 
 const iores
@@ -385,8 +404,15 @@
     return res;
 }
 
-// Private functions
+void
+jcntl::chk_wr_frot()
+{
+    if (_wrfc.index() == _rrfc.index())
+        _rmgr.invalidate();
+}
 
+// Protected/Private functions
+
 void
 jcntl::check_wstatus(const char* fn_name) const
 {
@@ -507,6 +533,7 @@
         rd._ffid = ji.get_start_file();
         rd._lfid = ji.get_end_file();
         rd._owi = ji.get_initial_owi();
+        rd._frot = ji.get_frot();
         rd._empty = false;
     }
     catch (const jexception& e)

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -294,7 +294,8 @@
         const iores enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
                 const std::string& xid, const bool transient = false);
 
-        /**
+        /* TODO
+        **
         * \brief Retrieve details of next record to be read without consuming the record.
         *
         * Retrieve information about current read record. A pointer to the data is returned, along
@@ -350,18 +351,21 @@
         *     discard_next_data_record() must be explicitly called.
         *
         * \exception TODO
-        */
+        *
         // *** NOT YET IMPLEMENTED ***
         const iores get_data_record(const u_int64_t& rid, const size_t& dsize,
                 const size_t& dsize_avail, const void** const data, bool auto_discard = false);
+        */
 
-        /**
+        /* TODO
+        **
         * \brief Discard (skip) next record to be read without reading or retrieving it.
         *
         * \exception TODO
-        */
+        *
         // *** NOT YET IMPLEMENTED ***
         const iores discard_data_record(data_tok* const dtokp);
+        */
 
         /**
         * \brief Reads data from the journal. It is the responsibility of the reader to free
@@ -524,6 +528,10 @@
         inline const u_int32_t get_rd_outstanding_aio_dblks(u_int16_t pi) const
                 { return _rrfc.file_handle(pi)->rd_aio_outstanding_dblks(); }
 
+        inline const u_int16_t get_rd_fid() const { return _rrfc.index(); }
+        inline const u_int16_t get_wr_fid() const { return _wrfc.index(); }
+        inline const u_int16_t get_earliest_fid() const { return _wrfc.earliest_index(); }
+
         /**
         * \brief Check if a particular rid is enqueued. Note that this function will return
         *     false if the rid is transactionally enqueued and is not committed, or if it is
@@ -578,8 +586,12 @@
 
         inline const u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
 
+        // these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:        
+        void chk_wr_frot();
+        inline const u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }
 
 
+
     protected:
         /**
         * \brief Check status of journal before allowing write operations.

Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -57,7 +57,8 @@
         _analyzed_flag(false),
         _start_file(0),
         _end_file(0),
-        _initial_owi(false)
+        _initial_owi(false),
+        _frot(false)
 {
     read(jinf_filename);
     if (validate_flag)
@@ -179,6 +180,7 @@
         {
             if (!fnum)
                 throw jexception(jerrno::JERR_JINF_JDATEMPTY, "jinf", "analyze");
+            _frot = true;
             done = true;
         }
         else if (fnum == 0) // First file only
@@ -238,6 +240,14 @@
     return _initial_owi;
 }
 
+const bool
+jinf::get_frot()
+{
+    if (!_analyzed_flag)
+        analyze();
+    return _frot;
+}
+
 const std::string
 jinf::to_string() const
 {

Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -64,6 +64,7 @@
         u_int16_t _start_file;
         u_int16_t _end_file;
         bool _initial_owi;
+        bool _frot;
 
     public:
         // constructor for reading existing jinf file
@@ -93,6 +94,7 @@
         const u_int16_t get_start_file();
         const u_int16_t get_end_file();
         const bool get_initial_owi();
+        const bool get_frot();
 
         const std::string to_string() const;
         const std::string xml_str() const;

Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -111,7 +111,8 @@
 
         // Derived helper functions
 
-        inline const bool rd_empty() const { return _wr_cmpl_cnt_dblks == 0; }
+        inline const bool rd_void() const { return _wr_cmpl_cnt_dblks == 0; }
+        inline const bool rd_empty() const { return _wr_cmpl_cnt_dblks <= JRNL_SBLK_SIZE; }
         inline const u_int32_t rd_remaining_dblks() const
                 { return _wr_cmpl_cnt_dblks - _rd_subm_cnt_dblks; }
         inline const bool is_rd_full() const { return _wr_cmpl_cnt_dblks == _rd_subm_cnt_dblks; }
@@ -121,7 +122,8 @@
                 { return _rd_subm_cnt_dblks - _rd_cmpl_cnt_dblks; }
         inline const bool rd_file_rotate() const { return is_rd_full() && is_wr_compl(); }
 
-        inline const bool wr_empty() const { return _wr_subm_cnt_dblks == 0; }
+        inline const bool wr_void() const { return _wr_subm_cnt_dblks == 0; }
+        inline const bool wr_empty() const { return _wr_subm_cnt_dblks <= JRNL_SBLK_SIZE; }
         inline const u_int32_t wr_remaining_dblks() const
                 { return _ffull_dblks - _wr_subm_cnt_dblks; }
         inline const bool is_wr_full() const { return _ffull_dblks == _wr_subm_cnt_dblks; }

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -45,7 +45,8 @@
 
         struct rcvdat
         {
-            bool _owi;
+            bool _owi;          ///< Overwrite indicator
+            bool _frot;         ///< First rotation flag
             bool _empty;        ///< Journal data files empty
             u_int16_t _ffid;    ///< First file id
             size_t _fro;        ///< First record offset in ffid
@@ -57,6 +58,7 @@
 
             rcvdat(u_int16_t num_jfiles):
                     _owi(false),
+                    _frot(false),
                     _empty(true),
                     _ffid(0),
                     _fro(0),
@@ -70,6 +72,7 @@
             void reset()
             {
                 _owi=false;
+                _frot = false;
                 _empty=true;
                 _ffid=0;
                 _fro=0;
@@ -86,6 +89,7 @@
                 std::ostringstream oss;
                 oss << "Jorunal file analysis (jid=\"" << jid << "\"):" << std::endl;
                 oss << "  Overwrite indicator (_owi) = " << (_owi ? "TRUE" : "FALSE") << std::endl;
+                oss << "  First rotation (_frot) = " << (_frot ? "TRUE" : "FALSE") << std::endl;
                 oss << "  Journal empty (_empty) = " << (_empty ? "TRUE" : "FALSE") << std::endl;
                 oss << "  First fid (_ffid) = " << _ffid << std::endl;
                 oss << "  First record offset in first fid (_fro) = 0x" << std::hex << _fro <<

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -44,11 +44,17 @@
 rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc):
         pmgr(jc, emap, tmap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES),
         _rrfc(rrfc),
-        _hdr()
+        _hdr(),
+        _valid(false),
+        _fhdr_buffer(0),
+        _iocbp(0),
+        _fhdr_rd_outstanding(false)
 {}
 
 rmgr::~rmgr()
-{}
+{
+    clean();
+}
 
 void
 rmgr::initialize(const rd_aio_cb rd_cb, const size_t fro)
@@ -65,150 +71,30 @@
         _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks);
         _rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
     }
-}
 
-const iores
-rmgr::get(const u_int64_t& /*rid*/, const size_t& /*dsize*/, const size_t& /*dsize_avail*/,
-        const void** const /*data*/, bool /*auto_discard*/)
-{
-//std::cout << " rmgr::get()" << std::flush;
-    iores res = pre_read_check(0);
-    if (res != RHM_IORES_SUCCESS)
-        return res;
-
-    return RHM_IORES_NOTIMPL;
-
-/* TODO...
-    _hdr.reset();
-    // Read header, determine next record type
-    while (true)
+    // Allocate memory for reading file header
+    if (::posix_memalign(&_fhdr_buffer, _sblksize, _sblksize))
     {
-        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
-        {
-            aio_cycle();   // check if any AIOs have returned
-            return RHM_IORES_EMPTY;
-        }
-        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
-        {
-            aio_cycle();
-            return RHM_IORES_PAGE_AIOWAIT;
-        }
-        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
-                (_pg_offset_dblks * JRNL_DBLK_SIZE));
-        ::memcpy(&_hdr, rptr, sizeof(hdr));
-        switch (_hdr._magic)
-        {
-            case RHM_JDAT_ENQ_MAGIC:
-                {
-                    size_t xid_size = *((size_t*)((char*)rptr + sizeof(hdr) 
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-                            + sizeof(u_int32_t) // filler0
-#endif
-                            ));
-                    size_t data_size = *((size_t*)((char*)rptr + sizeof(hdr) + sizeof(u_int64_t)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-                            + sizeof(u_int32_t) // filler1
-#endif
-                            ));
-                    // TODO: Check if transaction is still in transaction map. If so, block read
-                    // (unless in recovery, in whcih case return info normally
-//                     std::string xid = ?? (decode xid here)
-//                     if (xid_size && !readonly && tx_map.exists(xid))
-//                         return RHM_IORES_TXPENDING;
-                    rid = _hdr._rid;
-                    dsize = data_size;
-
-                    // Analyze how much of message is available
-                    void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
-                    void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize * _pages;
-                    u_int16_t data_start_pg_index = _pg_index;
-                    u_int16_t data_start_pg_index = _pg_index;
-                    for (u_int16_t i=0; i<_pages; i++)
-                    {
-                        pi = (i + _pg_index) % _pages;
-                        if (data_ptr >= _page_ptr_arr[pi] &&
-                                data_ptr < (char*)_page_ptr_arr[pi] + _pagesize * _sblksize)
-                            data_end_pg_index = pi; // found start page index
-                        
-                    }
-                    u_int16_t data_end_pg_index;
-                    u_int16_t last_pg_avail_index;
-                    
-                    void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
-                    void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize * _pages;
-                    if (data_ptr >= page_end_ptr) // folded, go back to first page...
-                        data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
-                    void* data_end_ptr = (char*)data_ptr + data_size;
-                    if (data_end_ptr >= page_end_ptr) // folded, go back to first page...
-                        data_end_ptr = (char*)_page_base_ptr + data_end_ptr - page_end_ptr;
-                    dsize_avail = ??;
-                    if(data_ptr folded)
-                    else
-                        *data = data_ptr;
-                }
-                break;
-            case RHM_JDAT_DEQ_MAGIC:
-                consume_deq();
-                break;
-            case RHM_JDAT_EMPTY_MAGIC:
-                consume_filler();
-                break;
-            default:
-                std::ostringstream oss;
-                oss << std::hex << std::setfill('0');
-                oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
-                throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "get");
-        } // switch(_hdr._magic)
-    } // while */
+        clean();
+        std::ostringstream oss;
+        oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
+        oss << " errno=" << errno;
+        throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
+    }
+    _iocbp = new iocb;
+    ::memset(_iocbp, 0, sizeof(iocb*));
 }
 
-const iores
-rmgr::discard(data_tok* dtokp)
+void
+rmgr::clean()
 {
-//std::cout << " rmgr::get()" << std::flush;
-    iores res = pre_read_check(dtokp);
-    if (res != RHM_IORES_SUCCESS)
-        return res;
-
-    return RHM_IORES_NOTIMPL;
-
-/* TODO...
-    _hdr.reset();
-    // Read header, determine next record type
-    while (true)
+    ::free(_fhdr_buffer);
+    _fhdr_buffer = 0;
+    if (_iocbp)
     {
-        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
-        {
-            aio_cycle();   // check if any AIOs have returned
-            return RHM_IORES_EMPTY;
-        }
-        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
-        {
-            aio_cycle();
-            return RHM_IORES_PAGE_AIOWAIT;
-        }
-        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
-                (_pg_offset_dblks * JRNL_DBLK_SIZE));
-        ::memcpy(&_hdr, rptr, sizeof(hdr));
-        switch (_hdr._magic)
-        {
-            case RHM_JDAT_ENQ_MAGIC:
-                {
-                }
-                break;
-            case RHM_JDAT_DEQ_MAGIC:
-                consume_deq();
-                break;
-            case RHM_JDAT_EMPTY_MAGIC:
-                consume_filler();
-                break;
-            default:
-                std::ostringstream oss;
-                oss << std::hex << std::setfill('0');
-                oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
-                throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "discard");
-        } // switch
-    } // while */
+        delete _iocbp;
+        _iocbp = 0;
+    }
 }
 
 const iores
@@ -225,7 +111,10 @@
     if (dtokp->rstate() == data_tok::SKIP_PART)
     {
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
             return RHM_IORES_PAGE_AIOWAIT;
+        }        
         const iores res = skip(dtokp);
         if (res != RHM_IORES_SUCCESS)
         {
@@ -250,8 +139,11 @@
     {
         if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
         {
-            aio_cycle();   // check if any AIOs have returned
-            return RHM_IORES_EMPTY;
+            aio_cycle();   // check if rd AIOs returned; initiate new reads if possible
+            if (_jc->unflushed_dblks() > 0)
+                _jc->flush();
+            else
+                return RHM_IORES_EMPTY;
         }
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
         {
@@ -405,13 +297,25 @@
             throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
         }
 
-        // Increment the completed read offset
-        // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
-        // Use stored pointer to nlfh in the pcb instead.
-        pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
-        pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
-        pcbp->_state = state;
-        pil[i] = pcbp->_index;
+        if (pcbp) // Page reads have pcb
+        {
+            // Increment the completed read offset
+            // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
+            // Use stored pointer to nlfh in the pcb instead.
+            pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
+            pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
+            pcbp->_state = state;
+            pil[i] = pcbp->_index;
+        }
+        else // File header reads have no pcb
+        {
+            ::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
+            _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+            size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
+            _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
+            _fhdr_rd_outstanding = false;
+            _valid = true;
+        }
     }
 
     // Perform AIO return callback
@@ -436,6 +340,19 @@
 }
 
 void
+rmgr::invalidate()
+{
+    if (_valid)
+    {
+        _valid = false;
+        for (int i=0; i<_pages; i++)
+            _page_cb_arr[i]._state = UNUSED;
+        _rrfc.reset();
+    	_pg_offset_dblks = 0;
+    }
+}
+
+void
 rmgr::initialize()
 {
     pmgr::initialize();
@@ -447,9 +364,24 @@
     if (_aio_evt_rem)
         get_events();
 
+    if (!_valid)
+    {
+        if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
+            return RHM_IORES_EMPTY;
+        else
+            return RHM_IORES_RCINVALID;
+    }
+
+    // block reads until outstanding file header read completes as fro is needed to read
+    if (_fhdr_rd_outstanding)
+        return RHM_IORES_PAGE_AIOWAIT;
+
     if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
     {
         aio_cycle();   // check if any AIOs have returned
+        if (_jc->unflushed_dblks() > 0)
+            _jc->flush();
+        else
         return RHM_IORES_EMPTY;
     }
 
@@ -639,16 +571,24 @@
 {
     for (int16_t i=0; i<num_uninit; i++)
     {
-        if (_rrfc.empty()) // Nothing to do; this file not yet written to
+        if (_rrfc.is_void()) // Nothing to do; this file not yet written to
             break;
             
-        // If this is the first read from a file, increase the read pointers to beyond fhdr
-        // or consume fhdr here for analysis (not req'd at present)
         if (_rrfc.subm_offs() == 0)
         {
-            _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
-            _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+            if (_valid)
+            {
+                _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+                _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
+            }
+            else
+            {
+                u_int16_t fid = _jc->get_earliest_fid();
+                init_file_header_read(fid);
+            }
         }
+        else
+            _valid = true;
 
         // TODO: Future perf improvement: Do a single AIO read for all available file
         // space into all contiguous empty pages in one AIO operation.
@@ -681,9 +621,6 @@
 void
 rmgr::consume_fhdr()
 {
-    // If in the future it should become necessary to read each file header, this is where it would
-    // happen.
-
     // Set read pointers to first dblk after file header
     _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
     _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
@@ -725,5 +662,155 @@
     xidsize = 0;
 }
 
+void
+rmgr::init_file_header_read(u_int16_t fid)
+{
+    int rfh = _rrfc.file_handle(fid)->rd_fh();
+    rhm_prep_pread(_iocbp, rfh, _fhdr_buffer, _sblksize, 0);
+    if (::io_submit(_ioctx, 1, &_iocbp) < 0)
+        throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
+    _aio_evt_rem++;
+    _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
+    _fhdr_rd_outstanding = true;
+}
+
+/* TODO (sometime in the future)
+const iores
+rmgr::get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
+        const void** const data, bool auto_discard)
+{
+    iores res = pre_read_check(0);
+    if (res != RHM_IORES_SUCCESS)
+        return res;
+
+    _hdr.reset();
+    // Read header, determine next record type
+    while (true)
+    {
+        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+        {
+            aio_cycle();   // check if any AIOs have returned
+            return RHM_IORES_EMPTY;
+        }
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            aio_cycle();
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
+                (_pg_offset_dblks * JRNL_DBLK_SIZE));
+        ::memcpy(&_hdr, rptr, sizeof(hdr));
+        switch (_hdr._magic)
+        {
+            case RHM_JDAT_ENQ_MAGIC:
+                {
+                    size_t xid_size = *((size_t*)((char*)rptr + sizeof(hdr) 
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+                            + sizeof(u_int32_t) // filler0
+#endif
+                            ));
+                    size_t data_size = *((size_t*)((char*)rptr + sizeof(hdr) + sizeof(u_int64_t)
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+                            + sizeof(u_int32_t) // filler1
+#endif
+                            ));
+                    // TODO: Check if transaction is still in transaction map. If so, block read
+                    // (unless in recovery, in whcih case return info normally
+//                     std::string xid = ?? (decode xid here)
+//                     if (xid_size && !readonly && tx_map.exists(xid))
+//                         return RHM_IORES_TXPENDING;
+                    rid = _hdr._rid;
+                    dsize = data_size;
+
+                    // Analyze how much of message is available
+                    void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
+                    void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize * _pages;
+                    u_int16_t data_start_pg_index = _pg_index;
+                    u_int16_t data_start_pg_index = _pg_index;
+                    for (u_int16_t i=0; i<_pages; i++)
+                    {
+                        pi = (i + _pg_index) % _pages;
+                        if (data_ptr >= _page_ptr_arr[pi] &&
+                                data_ptr < (char*)_page_ptr_arr[pi] + _pagesize * _sblksize)
+                            data_end_pg_index = pi; // found start page index
+                        
+                    }
+                    u_int16_t data_end_pg_index;
+                    u_int16_t last_pg_avail_index;
+                    
+                    void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
+                    void* page_end_ptr = (char*)_page_base_ptr + _pagesize * _sblksize * _pages;
+                    if (data_ptr >= page_end_ptr) // folded, go back to first page...
+                        data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
+                    void* data_end_ptr = (char*)data_ptr + data_size;
+                    if (data_end_ptr >= page_end_ptr) // folded, go back to first page...
+                        data_end_ptr = (char*)_page_base_ptr + data_end_ptr - page_end_ptr;
+                    dsize_avail = ??;
+                    if(data_ptr folded)
+                    else
+                        *data = data_ptr;
+                }
+                break;
+            case RHM_JDAT_DEQ_MAGIC:
+                consume_deq();
+                break;
+            case RHM_JDAT_EMPTY_MAGIC:
+                consume_filler();
+                break;
+            default:
+                std::ostringstream oss;
+                oss << std::hex << std::setfill('0');
+                oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
+                throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "get");
+        } // switch(_hdr._magic)
+    } // while
+}
+
+const iores
+rmgr::discard(data_tok* dtokp)
+{
+    iores res = pre_read_check(dtokp);
+    if (res != RHM_IORES_SUCCESS)
+        return res;
+
+    _hdr.reset();
+    // Read header, determine next record type
+    while (true)
+    {
+        if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+        {
+            aio_cycle();   // check if any AIOs have returned
+            return RHM_IORES_EMPTY;
+        }
+        if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+        {
+            aio_cycle();
+            return RHM_IORES_PAGE_AIOWAIT;
+        }
+        void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
+                (_pg_offset_dblks * JRNL_DBLK_SIZE));
+        ::memcpy(&_hdr, rptr, sizeof(hdr));
+        switch (_hdr._magic)
+        {
+            case RHM_JDAT_ENQ_MAGIC:
+                {
+                }
+                break;
+            case RHM_JDAT_DEQ_MAGIC:
+                consume_deq();
+                break;
+            case RHM_JDAT_EMPTY_MAGIC:
+                consume_filler();
+                break;
+            default:
+                std::ostringstream oss;
+                oss << std::hex << std::setfill('0');
+                oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
+                throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "discard");
+        } // switch
+    } // while
+}
+*/
+
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -41,6 +41,7 @@
 
 #include <jrnl/aio_cb.hpp>
 #include <jrnl/enums.hpp>
+#include <jrnl/file_hdr.hpp>
 #include <jrnl/pmgr.hpp>
 #include <jrnl/rec_hdr.hpp>
 #include <jrnl/rrfc.hpp>
@@ -63,21 +64,34 @@
         rec_hdr _hdr;           ///< Header used to determind record type
         rd_aio_cb _cb;          ///< Callback function pointer for AIO events
 
+        bool _valid;            ///< Flag is true when read pages contain vailid data
+        void* _fhdr_buffer;     ///< Buffer used for fhdr reads
+        iocb* _iocbp;           ///< iocb pointer for fhdr reads
+        file_hdr _fhdr;         ///< file header instance for reading file headers
+        bool _fhdr_rd_outstanding; ///< true if a fhdr read is outstanding
+
     public:
         rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
         virtual ~rmgr();
 
         void initialize(const rd_aio_cb rd_cb, const size_t fro);
-        const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
-                const void** const data, bool auto_discard);
-        const iores discard(data_tok* dtok);
         const iores read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
                 bool& transient, bool& external, data_tok* dtokp);
         const u_int32_t get_events(page_state state = AIO_COMPLETE);
         void recover_complete(size_t fro);
+        inline const bool is_valid() const {return _valid; }
+        inline void synchronize() { if (!_valid) aio_cycle(); }
+        void invalidate();
+        
+        /* TODO (if required)
+        const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
+                const void** const data, bool auto_discard);
+        const iores discard(data_tok* dtok);
+        */
 
     private:
         void initialize();
+        void clean();
         const iores pre_read_check(data_tok* dtokp);
         const iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
         void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
@@ -90,6 +104,7 @@
         const u_int32_t dblks_rem() const;
         void set_params_null(void** const datapp, size_t& dsize, void** const xidpp,
                 size_t& xidsize);
+        void init_file_header_read(u_int16_t fid);
 
 
         // Special version of libaio's io_prep_pread() which preserves the value of the data

Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -53,6 +53,12 @@
 {
     _nfiles = nfiles;
     _fh_arr = fh_arr;
+    reset(fh_index);
+}
+
+void
+rrfc::reset(u_int32_t fh_index)
+{
     _fh_index = fh_index;
     _curr_fh = _fh_arr[_fh_index];
 }

Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -73,6 +73,8 @@
         */
         void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
 
+        void reset(u_int32_t fh_index = 0);
+
         /**
         * \brief Rotate active file handle to next file in rotating file group.
         * \exception jerrno::JERR__NINIT if called before calling initialize().
@@ -99,7 +101,7 @@
 
         // Convenience access methods to current file handle
 
-        inline const u_int32_t fid() const { return _curr_fh->fid(); }
+        inline const u_int16_t fid() const { return _curr_fh->fid(); }
         inline const int fh() const { return _curr_fh->rd_fh(); }
         inline const u_int32_t enqcnt() const { return _curr_fh->enqcnt(); }
         inline const u_int32_t incr_enqcnt() { return _curr_fh->incr_enqcnt(); }
@@ -121,7 +123,8 @@
         inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
                 { return _curr_fh->add_rd_cmpl_cnt_dblks(a); }
 
-        inline const bool empty() const { return _curr_fh->rd_empty(); }
+        inline const bool is_void() const { return _curr_fh->rd_void(); }
+        inline const bool is_empty() const { return _curr_fh->rd_empty(); }
         inline const u_int32_t remaining_dblks() { return _curr_fh->rd_remaining_dblks(); }
         inline const bool is_full() const { return _curr_fh->is_rd_full(); }
         inline const bool is_compl() const { return _curr_fh->is_rd_compl(); }

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -199,7 +199,7 @@
             dtokp->set_wstate(data_tok::ENQ_PART);
 
         // Has the file header been written (i.e. write pointers still at 0)?
-        if (_wrfc.empty())
+        if (_wrfc.is_void())
         {
             u_int32_t rec_dblks_rem = _enq_rec.rec_size_dblks() - data_offs_dblks;
             bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -342,7 +342,7 @@
             dtokp->set_wstate(data_tok::DEQ_PART);
 
         // Has the file header been written (i.e. write pointers still at 0)?
-        if (_wrfc.empty())
+        if (_wrfc.is_void())
         {
             u_int32_t rec_dblks_rem = _deq_rec.rec_size_dblks() - data_offs_dblks;
             bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -483,7 +483,7 @@
             dtokp->set_wstate(data_tok::ABORT_PART);
 
         // Has the file header been written (i.e. write pointers still at 0)?
-        if (_wrfc.empty())
+        if (_wrfc.is_void())
         {
             u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
             bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -620,7 +620,7 @@
             dtokp->set_wstate(data_tok::COMMIT_PART);
 
         // Has the file header been written (i.e. write pointers still at 0)?
-        if (_wrfc.empty())
+        if (_wrfc.is_void())
         {
             u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
             bool file_fit = rec_dblks_rem <= _jfsize_dblks;
@@ -737,7 +737,9 @@
 wmgr::rotate_file()
 {
     _pg_cntr = 0;
-    return _wrfc.rotate();
+    iores res = _wrfc.rotate();
+    _jc->chk_wr_frot();
+    return res;
 }
 
 const u_int32_t

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -116,6 +116,7 @@
         inline const bool curr_pg_blocked() const
                 { return _page_cb_arr[_pg_index]._state != UNUSED; }
         inline const bool curr_file_blocked() const { return _wrfc.aio_cnt() > 0; }
+        inline const u_int32_t unflushed_dblks() { return _cached_offset_dblks; }
         
         // Debug aid
         const std::string status_str() const;

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -50,7 +50,8 @@
         _rid(0),
 #endif
         _reset_ok(false),
-        _owi(false)
+        _owi(false),
+        _frot(true)
 {}
 
 wrfc::~wrfc() {}
@@ -64,6 +65,7 @@
         _rid = rdp->_h_rid + 1;
         _reset_ok = true;
         _owi = rdp->_owi;
+        _frot = rdp->_frot;
     }
     else
     {
@@ -94,6 +96,7 @@
     {
         _fh_index = 0;
         _owi = !_owi;
+        _frot = false;
     }
     _curr_fh = _fh_arr[_fh_index];
     if (_curr_fh->aio_cnt())
@@ -103,6 +106,17 @@
     return RHM_IORES_SUCCESS;
 }
 
+const u_int16_t
+wrfc::earliest_index() const
+{
+    if (_frot)
+        return 0;
+    u_int16_t next_index = _fh_index + 1;
+    if (next_index >= _nfiles)
+        next_index = 0;
+    return next_index;
+}
+
 const bool
 wrfc::reset()
 {

Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -60,6 +60,7 @@
         u_int64_t _rid;         ///< Master counter for record ID (rid)
         bool _reset_ok;         ///< Flag set when reset succeeds
         bool _owi;              ///< Overwrite indicator
+        bool _frot;             ///< Flag is true for first rotation, false at all other times
 
     public:
         wrfc();
@@ -83,11 +84,19 @@
         */
         const iores rotate();
 
+        /**
+        * \brief Returns the index of the earliest complete file within the rotating
+        *     file group. Unwritten files are excluded. The currently active file is
+        *     excluded unless it is the only written file.
+        */
+        const u_int16_t earliest_index() const;
+
         inline const u_int64_t rid() const { return _rid; }
         inline const u_int64_t get_incr_rid() { return _rid++; }
         const bool reset();
         inline const bool is_reset() const { return _reset_ok; }
         inline const bool owi() const { return _owi; }
+        inline const bool frot() const { return _frot; }
 
         // Convenience access methods to current file handle
 
@@ -107,7 +116,8 @@
         inline const u_int16_t incr_aio_cnt() { return _curr_fh->incr_aio_cnt(); }
         inline const u_int16_t decr_aio_cnt() { return _curr_fh->decr_aio_cnt(); }
 
-        inline const bool empty() const { return _curr_fh->wr_empty(); }
+        inline const bool is_void() const { return _curr_fh->wr_void(); }
+        inline const bool is_empty() const { return _curr_fh->wr_empty(); }
         inline const u_int32_t remaining_dblks() const { return _curr_fh->wr_remaining_dblks(); }
         inline const bool is_full() const { return _curr_fh->is_wr_full(); };
         inline const bool is_compl() const { return _curr_fh->is_wr_compl(); };

Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp	2008-03-27 15:24:30 UTC (rev 1803)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp	2008-03-27 15:24:41 UTC (rev 1804)
@@ -46,6 +46,25 @@
 
 // === Test suite ===
 
+QPID_AUTO_TEST_CASE(empty_read)
+{
+    string test_name = get_test_name(test_filename, "empty_read");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+        jrnl_init(jc);
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_CASE(enqueue_read_dequeue_block)
 {
     string test_name = get_test_name(test_filename, "enqueue_read_dequeue_block");
@@ -232,24 +251,24 @@
     string test_name = get_test_name(test_filename, "delayed_read");
     try
     {
-        cout << "[NOTE: Disabled until outstanding issue(s) resolved.] ";
-//         string msg;
-//         string rmsg;
-//         string xid;
-//         bool transientFlag;
-//         bool externalFlag;
-// 
-//         jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
-//         jrnl_init(jc);
-//         unsigned m;
-//         for (m=0; m<2*NUM_MSGS; m+=2)
-//         {
-//             enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-//             deq_msg(jc, m);
-//         }
-//         enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-//         read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
-//         BOOST_CHECK_EQUAL(msg, rmsg);
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+        jrnl_init(jc);
+        unsigned m;
+        for (m=0; m<2*NUM_MSGS; m+=2)
+        {
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+            deq_msg(jc, m);
+        }
+        enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+        BOOST_CHECK_EQUAL(msg, rmsg);
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }
     cout << "ok" << endl;
@@ -260,29 +279,77 @@
     string test_name = get_test_name(test_filename, "cache_cycled_delayed_read");
     try
     {
-        cout << "[NOTE: Disabled until outstanding issue(s) resolved.] ";
-//         string msg;
-//         string rmsg;
-//         string xid;
-//         bool transientFlag;
-//         bool externalFlag;
-// 
-//         jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
-//         jrnl_init(jc);
-//         unsigned m;
-//         unsigned n = num_msgs_to_full(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS * JRNL_SBLK_SIZE,
-//                 MSG_REC_SIZE_DBLKS);
-//         for (m=0; m<12*2*n; m+=2) // 12 file cycles
-//         {
-//             enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-//             deq_msg(jc, m);
-//         }
-//         enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
-//         read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
-//         BOOST_CHECK_EQUAL(msg, rmsg);
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+        jrnl_init(jc);
+        unsigned m;
+        unsigned read_buffer_size_dblks = JRNL_RMGR_PAGES * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        unsigned n = num_msgs_to_full(1, read_buffer_size_dblks, MSG_REC_SIZE_DBLKS, true);
+        for (m=0; m<2*2*n + 20; m+=2) // fill read buffer twice + 10 msgs
+        {
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+            deq_msg(jc, m);
+        }
+        enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+        BOOST_CHECK_EQUAL(msg, rmsg);
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }
     cout << "ok" << endl;
 }
 
+QPID_AUTO_TEST_CASE(increasing_interval_delayed_read)
+{
+    string test_name = get_test_name(test_filename, "increasing_interval_delayed_read");
+    try
+    {
+        string msg;
+        string rmsg;
+        string xid;
+        bool transientFlag;
+        bool externalFlag;
+
+        jcntl jc(test_name, JRNL_DIR, test_name, NUM_TEST_JFILES, TEST_JFSIZE_SBLKS);
+        jrnl_init(jc);
+        unsigned read_buffer_size_dblks = JRNL_RMGR_PAGES * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        unsigned n = num_msgs_to_full(1, read_buffer_size_dblks, MSG_REC_SIZE_DBLKS, true);
+        unsigned m = 0;
+
+        // Validate read pipeline
+        enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+        jc.flush();
+        read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+        deq_msg(jc, m);
+        m += 2;
+
+        // repeat the following multiple times...
+        for (int i=0; i<10; i++)
+        {
+            // Invalidate read pipeline with large write
+            unsigned t = m + (i*n) + 25;
+            for (; m<t; m+=2)
+            {
+                enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+                deq_msg(jc, m);
+            }
+
+            // Revalidate read pipeline
+            enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+            jc.flush();
+            read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+            BOOST_CHECK_EQUAL(msg, rmsg);
+            deq_msg(jc, m);
+            m += 2;
+        }
+    }
+    catch(const exception& e) { BOOST_FAIL(e.what()); }
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_SUITE_END()




More information about the rhmessaging-commits mailing list