[rhmessaging-commits] rhmessaging commits: r1474 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Dec 12 17:22:31 EST 2007


Author: kpvdr
Date: 2007-12-12 17:22:30 -0500 (Wed, 12 Dec 2007)
New Revision: 1474

Added:
   store/trunk/cpp/lib/jrnl/enums.hpp
Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/Makefile.am
   store/trunk/cpp/lib/jrnl/enq_rec.cpp
   store/trunk/cpp/lib/jrnl/enq_rec.hpp
   store/trunk/cpp/lib/jrnl/jcfg.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/pmgr.cpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/rrfc.cpp
   store/trunk/cpp/lib/jrnl/rrfc.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/lib/jrnl/wrfc.cpp
   store/trunk/cpp/lib/jrnl/wrfc.hpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
First go at a strategy to prevent the journal from filling up... it will close the producer connection when the proposed enqueue operation would take the journal to > 80% full. Consumers may contunue to dequeue, however.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -977,12 +977,11 @@
 
     try {
 
-        if ( queue && usingJrnl()){
-            //std::cout << "E" << std::flush;
+        if ( queue && usingJrnl()) {
             boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
             dtokp->addRef();
-	    dtokp->setSourceMessage(message);
-	    dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+	        dtokp->setSourceMessage(message);
+	        dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
 
             bool written = false;
             unsigned aio_sleep_cnt = 0;
@@ -1026,12 +1025,15 @@
                         THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
                     usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
                     break;
+                  case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+                    std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+                    THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
                   case rhm::journal::RHM_IORES_FULL:
                     std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
                     THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
                     break;
                   default:
-                    assert( "Store Error: Unexpected msg state");
+                    assert("Store Error: Unexpected msg state");
                 }
             }
 	 
@@ -1173,7 +1175,7 @@
             THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
             break;
           default:
-            assert( "Store Error: Unexpected msg state");
+            assert("Store Error: Unexpected msg state");
         }
     }
 }

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -146,13 +146,13 @@
                     usleep(AIO_SLEEP_TIME);
                 } else {
                     std::stringstream ss;
-                    ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+                    ss << "read_data_record() returned " << journal::iores_str(res);
                     ss << "; exceeded maximum wait time";
                     throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
                 }
             } else {
                 std::stringstream ss;
-                ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+                ss << "read_data_record() returned " << journal::iores_str(res);
                 throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
             }
         }

Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/Makefile.am	2007-12-12 22:22:30 UTC (rev 1474)
@@ -63,6 +63,7 @@
   jrnl/enq_hdr.hpp			\
   jrnl/enq_map.hpp			\
   jrnl/enq_rec.hpp			\
+  jrnl/enums.hpp            \
   jrnl/file_hdr.hpp			\
   jrnl/jcfg.hpp			\
   jrnl/jcntl.hpp			\

Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -565,11 +565,17 @@
 const size_t
 enq_rec::rec_size() const
 {
-    if (_enq_hdr.is_external())
-        return enq_hdr::size() + _enq_hdr._xidsize + rec_tail::size();
-    return enq_hdr::size() + _enq_hdr._xidsize + _enq_hdr._dsize + rec_tail::size();
+    return rec_size(_enq_hdr._xidsize, _enq_hdr._dsize, _enq_hdr.is_external());
 }
 
+const size_t
+enq_rec::rec_size(const size_t xidsize, const size_t dsize, const bool external)
+{
+    if (external)
+        return enq_hdr::size() + xidsize + rec_tail::size();
+    return enq_hdr::size() + xidsize + dsize + rec_tail::size();
+}
+
 void
 enq_rec::set_rid(const u_int64_t rid)
 {

Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -100,6 +100,7 @@
         const size_t data_size() const;
         const size_t xid_size() const;
         const size_t rec_size() const;
+        static const size_t rec_size(const size_t xidsize, const size_t dsize, const bool external);
         inline const u_int64_t rid() const { return _enq_hdr._rid; }
         void set_rid(const u_int64_t rid);
 

Added: store/trunk/cpp/lib/jrnl/enums.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enums.hpp	                        (rev 0)
+++ store/trunk/cpp/lib/jrnl/enums.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -0,0 +1,76 @@
+/**
+* \file enums.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing definitions for namespace rhm::journal enums.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_enums_hpp
+#define rhm_journal_enums_hpp
+
+namespace rhm
+{
+namespace journal
+{
+
+    // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
+    /**
+    * \brief Enumeration of possilbe return states from journal read and write operations.
+    */
+    enum _iores
+    {
+        RHM_IORES_SUCCESS = 0,  ///< Success: IO operation completed noramlly.
+        RHM_IORES_AIO_WAIT,     ///< IO operation suspended - all pages are waiting for AIO.
+        RHM_IORES_EMPTY,        ///< During read operations, nothing further is available to read.
+        RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
+        RHM_IORES_FULL,         ///< During write operations, the journal files are full.
+        RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
+        RHM_IORES_TXPENDING,    ///< Operation blocked by pending transaction.
+        RHM_IORES_NOTIMPL       ///< Function is not yet implemented.
+    };
+    typedef _iores iores;
+
+    static inline const char* iores_str(iores res)
+    {
+        switch (res)
+        {
+            case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
+            case RHM_IORES_AIO_WAIT: return "RHM_IORES_AIO_WAIT";
+            case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
+            case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
+            case RHM_IORES_FULL: return "RHM_IORES_FULL";
+            case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
+            case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
+            case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
+        }
+        return "<iores unknown>";
+    }
+
+} // namespace journal
+} // namespace rhm
+
+#endif // ifndef rhm_journal_enums_hpp

Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -61,9 +61,7 @@
 #define JRNL_SBLK_SIZE          4           ///< Disk softblock size in multiples of JRNL_DBLK_SIZE
 #define JRNL_MIN_FILE_SIZE      128         ///< Min. jrnl file size in sblks (excl. file_hdr)
 #define JRNL_MIN_NUM_FILES      4           ///< Min. number of journal files
-// The following are now parameterized in the jcntl constructor, and no longer needed here.
-//#define JRNL_FILE_SIZE          3072        ///< Journal file size in softblocks excl. file_hdr
-//#define JRNL_NUM_FILES          8           ///< Number of journal files
+#define JRNL_ENQ_THRESHOLD      80          ///< Percent full when enqueue connection will be closed
 
 // NOTE: JRNL_RMGR_PAGE_SIZE must be a multiple of JRNL_WMGR_PAGE_SIZE.
 #define JRNL_RMGR_PAGE_SIZE     128         ///< Journal page size in softblocks

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -120,7 +120,7 @@
     // NOTE: The write RFC must initialize first. This sets all the file handle object
     // (lfh) counters and pointers for both read and write, since write activity
     // constrains read activity (i.e. one can't read what has not yet been written).
-    _wrfc.initialize(_num_jfiles, (nlfh**)_datafh);
+    _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
     _rmgr.initialize(rdtoklp, rd_cb, 0);
     _wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
@@ -171,7 +171,7 @@
     // NOTE: The write RFC must initialize first. This sets all the file handle object
     // (lfh) counters and pointers for both read and write, since write activity
     // constrains read activity (i.e. one can't read what has not yet been written).
-    _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+    _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
     _rmgr.initialize(rdtoklp, rd_cb, _rcvdat._fro);
     _wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
@@ -187,7 +187,7 @@
         throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
     for (u_int16_t i=0; i<_num_jfiles; i++)
         _datafh[i]->reset(&_rcvdat);
-    _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+    _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
     _rmgr.recover_complete(_rcvdat._fro);
     _readonly_flag = false;

Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -239,28 +239,5 @@
     return "<page_state unknown>";
 }
 
-const char*
-pmgr::iores_str(iores res)
-{
-    switch (res)
-    {
-        case RHM_IORES_SUCCESS:
-            return "RHM_IORES_SUCCESS";
-        case RHM_IORES_AIO_WAIT:
-            return "RHM_IORES_AIO_WAIT";
-        case RHM_IORES_EMPTY:
-            return "RHM_IORES_EMPTY";
-        case RHM_IORES_FULL:
-            return "RHM_IORES_FULL";
-        case RHM_IORES_BUSY:
-            return "RHM_IORES_BUSY";
-        case RHM_IORES_TXPENDING:
-            return "RHM_IORES_TXPENDING";
-        case RHM_IORES_NOTIMPL:
-            return "RHM_IORES_NOTIMPL";
-    }
-    return "<iores unknown>";
-}
-
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -57,21 +57,6 @@
 {
 namespace journal
 {
-    // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
-    /**
-    * \brief Enumeration of possilbe return states from journal read and write operations.
-    */
-    enum _iores
-    {
-        RHM_IORES_SUCCESS = 0,  ///< Success: IO operation completed noramlly.
-        RHM_IORES_AIO_WAIT,     ///< IO operation suspended - all pages are waiting for AIO.
-        RHM_IORES_EMPTY,        ///< During read operations, nothing further is available to read.
-        RHM_IORES_FULL,         ///< During write operations, the journal files are full.
-        RHM_IORES_BUSY,         ///< Another blocking operation is in progress.
-        RHM_IORES_TXPENDING,    ///< Operation blocked by pending transaction.
-        RHM_IORES_NOTIMPL       ///< Not yet implemented.
-    };
-    typedef _iores iores;
 
     /**
     * \brief Abstract class for managing either read or write page cache of arbitrary size and
@@ -146,7 +131,6 @@
         virtual const u_int32_t get_events(page_state state) = 0;
         inline const u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
         static const char* page_state_str(page_state ps);
-        static const char* iores_str(iores res);
 
     protected:
         virtual void initialize();

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -41,6 +41,7 @@
 }
 }
 
+#include <jrnl/enums.hpp>
 #include <jrnl/pmgr.hpp>
 #include <jrnl/rec_hdr.hpp>
 #include <jrnl/rrfc.hpp>

Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -51,7 +51,7 @@
 rrfc::~rrfc() {}
 
 void
-rrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index)
+rrfc::initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index)
 {
     _nfiles = nfiles;
     _fh_arr = fh_arr;

Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -55,7 +55,7 @@
     class rrfc
     {
     protected:
-        u_int32_t _nfiles;          ///< Number of data files
+        u_int16_t _nfiles;          ///< Number of data files
         nlfh** _fh_arr;             ///< Array of pointers to data file handles
         u_int16_t _fh_index;        ///< Index of current file handle
         nlfh* _curr_fh;             ///< Pointer to current file handle
@@ -71,7 +71,7 @@
         *     each of which correspond to one of the physical files.
         * \param fh_index Initial index of journal file. Default = 0.
         */
-        void initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
+        void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
 
         /**
         * \brief Rotate active file handle to next file in rotating file group.

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -119,7 +119,7 @@
     if (this_data_len != tot_data_len && !external)
         return RHM_IORES_NOTIMPL;
 
-    iores res = pre_write_check(WMGR_ENQUEUE, dtokp);
+    iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
     if (res != RHM_IORES_SUCCESS)
         return res;
 
@@ -926,7 +926,8 @@
 }
 
 const iores
-wmgr::pre_write_check(_op_type op, data_tok* dtokp)
+wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, const size_t xidsize,
+        const size_t dsize, const bool external) const
 {
     // Check status of current file
     if (!_wrfc.is_reset())
@@ -955,13 +956,20 @@
     switch (op)
     {
         case WMGR_ENQUEUE:
-            if (!dtokp->is_writable())
             {
-                std::ostringstream oss;
-                oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
-                oss << " dtok_state=" << dtokp->wstate_str();
-                throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
+                // Check for enqueue reaching cutoff threshold
+                u_int32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
+                        external));
+                if (_wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
+                    return RHM_IORES_ENQCAPTHRESH;
+                if (!dtokp->is_writable())
+                {
+                    std::ostringstream oss;
+                    oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
+                    oss << " dtok_state=" << dtokp->wstate_str();
+                    throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
                         "pre_write_check");
+                }
             }
             break;
         case WMGR_DEQUEUE:

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -41,6 +41,7 @@
 }
 }
 
+#include <jrnl/enums.hpp>
 #include <jrnl/pmgr.hpp>
 #include <jrnl/wrfc.hpp>
 #include <set>
@@ -117,7 +118,9 @@
 
     private:
         void initialize();
-        const iores pre_write_check(_op_type op, data_tok* dtokp);
+        const iores pre_write_check(const _op_type op, const data_tok* const dtokp,
+                const size_t xidsize = 0, const size_t dsize = 0, const bool external = false)
+                const;
         const u_int64_t initialize_rid(const bool cont, data_tok* dtokp);
         const iores write_flush();
         const iores rotate_file();

Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -30,6 +30,7 @@
 * The GNU Lesser General Public License is available in the file COPYING.
 */
 
+#include <iostream> // TODO: remove later, for debug only.
 
 #include <jrnl/wrfc.hpp>
 #include <jrnl/jerrno.hpp>
@@ -42,6 +43,9 @@
 
 wrfc::wrfc():
         rrfc(),
+        _fsize_sblks(0),
+        _fsize_dblks(0),
+        _enq_cap_offs_dblks(0),
 #ifdef DRHM_TESTVALS
         // TODO: Find method of specifying 64-bit literals under gcc with -pedantic option
         _rid(u_int64_t(0xffeeddcc) << 32), // For testing high rids
@@ -55,7 +59,7 @@
 wrfc::~wrfc() {}
 
 void
-wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp)
+wrfc::initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, nlfh** fh_arr, rcvdat* rdp)
 {
     if (rdp)
     {
@@ -74,6 +78,12 @@
 #endif
         _reset_ok = false;
     }
+    _fsize_sblks = fsize_sblks;
+    _fsize_dblks = fsize_sblks * JRNL_SBLK_SIZE;
+    _enq_cap_offs_dblks = _fsize_dblks * _nfiles * (100 - JRNL_ENQ_THRESHOLD) / 100;
+    // Check the offset is at least one file; if not, make it so
+    if (_enq_cap_offs_dblks < _fsize_dblks)
+        _enq_cap_offs_dblks = _fsize_dblks;
 }
 
 bool
@@ -85,7 +95,7 @@
     if (_fh_index == _nfiles)
     {
         _fh_index = 0;
-        _owi = !_owi; // flip owi
+        _owi = !_owi;
     }
     _curr_fh = _fh_arr[_fh_index];
     return reset(); //Checks if file is still in use (ie not fully dequeued yet)
@@ -98,5 +108,29 @@
     return _reset_ok;
 }
 
+const bool
+wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
+{
+    u_int32_t subm_dblks = subm_cnt_dblks();
+    // This compensates for new files which don't have their file headers written yet,
+    // as file header space cannot be included in this calculation.
+    if (subm_dblks == 0)
+        subm_dblks = 4;
+    u_int32_t fwd_dblks = subm_dblks + enq_dsize_dblks + _enq_cap_offs_dblks;
+    u_int16_t findex = _fh_index;
+    nlfh* fhp = _curr_fh;
+    while (fwd_dblks && !(findex != _fh_index && fhp->enqcnt()))
+    {
+        fwd_dblks -= fwd_dblks > _fsize_dblks ? _fsize_dblks : fwd_dblks;
+        if (fwd_dblks)
+        {
+            if (++findex == _nfiles)
+                findex = 0;
+            fhp = _fh_arr[findex];
+        }
+    }
+    return findex != _fh_index && fhp->enqcnt() > 0;
+}
+
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -55,9 +55,12 @@
     class wrfc : public rrfc
     {
     private:
-        u_int64_t _rid;     ///< Master counter for record ID (rid)
-        bool _reset_ok;     ///< Flag set when reset succeeds
-        bool _owi;          ///< Overwrite indicator
+        u_int32_t _fsize_sblks; ///< Size of journal files in sblks
+        u_int32_t _fsize_dblks; ///< Size of journal files in dblks
+        u_int32_t _enq_cap_offs_dblks;  ///< Enqueue capacity offset
+        u_int64_t _rid;         ///< Master counter for record ID (rid)
+        bool _reset_ok;         ///< Flag set when reset succeeds
+        bool _owi;              ///< Overwrite indicator
 
     public:
         wrfc();
@@ -66,12 +69,14 @@
         /**
         * \brief Initialize the controller.
         * \param nfiles Number of files in the rotating file group.
+        * \param fsize_sblks Size of each journal file in sblks.
         * \param fh_arr Pointer to an array of file handles (nlogging_fh or subclasses),
         *     each of which correspond to one of the physical files.
         * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
         *     NULL.
         */
-        void initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp = NULL);
+        void initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, nlfh** fh_arr,
+                rcvdat* rdp = NULL);
 
         /**
         * \brief Rotate active file handle to next file in rotating file group.
@@ -106,6 +111,7 @@
         inline const u_int32_t aio_outstanding_dblks() const
                 { return _curr_fh->wr_aio_outstanding_dblks(); }
         inline const bool file_rotate() const { return _curr_fh->wr_file_rotate(); }
+        const bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
     }; // class wrfc
 
 } // namespace journal

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-12-12 22:22:30 UTC (rev 1474)
@@ -758,7 +758,6 @@
     switch (res)
     {
         case rhm::journal::RHM_IORES_SUCCESS:
-            //((char*)mbuff)[msize] = '\0';
             return false;
         case rhm::journal::RHM_IORES_AIO_WAIT:
             if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
@@ -772,21 +771,9 @@
                 CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
             }
             break;
-        case rhm::journal::RHM_IORES_EMPTY:
-            delete dtp;
-            CPPUNIT_FAIL("RHM_IORES_EMPTY");
-        case rhm::journal::RHM_IORES_FULL:
-            delete dtp;
-            CPPUNIT_FAIL("RHM_IORES_FULL");
-        case rhm::journal::RHM_IORES_BUSY:
-            delete dtp;
-            CPPUNIT_FAIL("RHM_IORES_BUSY");
-        case rhm::journal::RHM_IORES_TXPENDING:
-            delete dtp;
-            CPPUNIT_FAIL("RHM_IORES_TXPENDING");
         default:
             delete dtp;
-            CPPUNIT_FAIL("unknown return value");
+            CPPUNIT_FAIL(rhm::journal::iores_str(res));
     }
     return true;
 }




More information about the rhmessaging-commits mailing list