[rhmessaging-commits] rhmessaging commits: r1474 - in store/trunk/cpp: lib/jrnl and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Dec 12 17:22:31 EST 2007
Author: kpvdr
Date: 2007-12-12 17:22:30 -0500 (Wed, 12 Dec 2007)
New Revision: 1474
Added:
store/trunk/cpp/lib/jrnl/enums.hpp
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcfg.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
First go at a strategy to prevent the journal from filling up... it will close the producer connection when the proposed enqueue operation would take the journal to > 80% full. Consumers may contunue to dequeue, however.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -977,12 +977,11 @@
try {
- if ( queue && usingJrnl()){
- //std::cout << "E" << std::flush;
+ if ( queue && usingJrnl()) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
- dtokp->setSourceMessage(message);
- dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
+ dtokp->setSourceMessage(message);
+ dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
bool written = false;
unsigned aio_sleep_cnt = 0;
@@ -1026,12 +1025,15 @@
THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
break;
+ case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+ std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
case rhm::journal::RHM_IORES_FULL:
std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
break;
default:
- assert( "Store Error: Unexpected msg state");
+ assert("Store Error: Unexpected msg state");
}
}
@@ -1173,7 +1175,7 @@
THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
break;
default:
- assert( "Store Error: Unexpected msg state");
+ assert("Store Error: Unexpected msg state");
}
}
}
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -146,13 +146,13 @@
usleep(AIO_SLEEP_TIME);
} else {
std::stringstream ss;
- ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+ ss << "read_data_record() returned " << journal::iores_str(res);
ss << "; exceeded maximum wait time";
throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
}
} else {
std::stringstream ss;
- ss << "read_data_record() returned " << journal::pmgr::iores_str(res);
+ ss << "read_data_record() returned " << journal::iores_str(res);
throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
}
}
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/Makefile.am 2007-12-12 22:22:30 UTC (rev 1474)
@@ -63,6 +63,7 @@
jrnl/enq_hdr.hpp \
jrnl/enq_map.hpp \
jrnl/enq_rec.hpp \
+ jrnl/enums.hpp \
jrnl/file_hdr.hpp \
jrnl/jcfg.hpp \
jrnl/jcntl.hpp \
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -565,11 +565,17 @@
const size_t
enq_rec::rec_size() const
{
- if (_enq_hdr.is_external())
- return enq_hdr::size() + _enq_hdr._xidsize + rec_tail::size();
- return enq_hdr::size() + _enq_hdr._xidsize + _enq_hdr._dsize + rec_tail::size();
+ return rec_size(_enq_hdr._xidsize, _enq_hdr._dsize, _enq_hdr.is_external());
}
+const size_t
+enq_rec::rec_size(const size_t xidsize, const size_t dsize, const bool external)
+{
+ if (external)
+ return enq_hdr::size() + xidsize + rec_tail::size();
+ return enq_hdr::size() + xidsize + dsize + rec_tail::size();
+}
+
void
enq_rec::set_rid(const u_int64_t rid)
{
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -100,6 +100,7 @@
const size_t data_size() const;
const size_t xid_size() const;
const size_t rec_size() const;
+ static const size_t rec_size(const size_t xidsize, const size_t dsize, const bool external);
inline const u_int64_t rid() const { return _enq_hdr._rid; }
void set_rid(const u_int64_t rid);
Added: store/trunk/cpp/lib/jrnl/enums.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enums.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/enums.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -0,0 +1,76 @@
+/**
+* \file enums.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing definitions for namespace rhm::journal enums.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_enums_hpp
+#define rhm_journal_enums_hpp
+
+namespace rhm
+{
+namespace journal
+{
+
+ // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
+ /**
+ * \brief Enumeration of possilbe return states from journal read and write operations.
+ */
+ enum _iores
+ {
+ RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly.
+ RHM_IORES_AIO_WAIT, ///< IO operation suspended - all pages are waiting for AIO.
+ RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
+ RHM_IORES_ENQCAPTHRESH, ///< Enqueue capacity threshold (limit) reached.
+ RHM_IORES_FULL, ///< During write operations, the journal files are full.
+ RHM_IORES_BUSY, ///< Another blocking operation is in progress.
+ RHM_IORES_TXPENDING, ///< Operation blocked by pending transaction.
+ RHM_IORES_NOTIMPL ///< Function is not yet implemented.
+ };
+ typedef _iores iores;
+
+ static inline const char* iores_str(iores res)
+ {
+ switch (res)
+ {
+ case RHM_IORES_SUCCESS: return "RHM_IORES_SUCCESS";
+ case RHM_IORES_AIO_WAIT: return "RHM_IORES_AIO_WAIT";
+ case RHM_IORES_EMPTY: return "RHM_IORES_EMPTY";
+ case RHM_IORES_ENQCAPTHRESH: return "RHM_IORES_ENQCAPTHRESH";
+ case RHM_IORES_FULL: return "RHM_IORES_FULL";
+ case RHM_IORES_BUSY: return "RHM_IORES_BUSY";
+ case RHM_IORES_TXPENDING: return "RHM_IORES_TXPENDING";
+ case RHM_IORES_NOTIMPL: return "RHM_IORES_NOTIMPL";
+ }
+ return "<iores unknown>";
+ }
+
+} // namespace journal
+} // namespace rhm
+
+#endif // ifndef rhm_journal_enums_hpp
Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -61,9 +61,7 @@
#define JRNL_SBLK_SIZE 4 ///< Disk softblock size in multiples of JRNL_DBLK_SIZE
#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr)
#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
-// The following are now parameterized in the jcntl constructor, and no longer needed here.
-//#define JRNL_FILE_SIZE 3072 ///< Journal file size in softblocks excl. file_hdr
-//#define JRNL_NUM_FILES 8 ///< Number of journal files
+#define JRNL_ENQ_THRESHOLD 80 ///< Percent full when enqueue connection will be closed
// NOTE: JRNL_RMGR_PAGE_SIZE must be a multiple of JRNL_WMGR_PAGE_SIZE.
#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -120,7 +120,7 @@
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
- _wrfc.initialize(_num_jfiles, (nlfh**)_datafh);
+ _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
_rmgr.initialize(rdtoklp, rd_cb, 0);
_wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
@@ -171,7 +171,7 @@
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
- _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+ _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.initialize(rdtoklp, rd_cb, _rcvdat._fro);
_wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
@@ -187,7 +187,7 @@
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
for (u_int16_t i=0; i<_num_jfiles; i++)
_datafh[i]->reset(&_rcvdat);
- _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+ _wrfc.initialize(_num_jfiles, _jfsize_sblks, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete(_rcvdat._fro);
_readonly_flag = false;
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -239,28 +239,5 @@
return "<page_state unknown>";
}
-const char*
-pmgr::iores_str(iores res)
-{
- switch (res)
- {
- case RHM_IORES_SUCCESS:
- return "RHM_IORES_SUCCESS";
- case RHM_IORES_AIO_WAIT:
- return "RHM_IORES_AIO_WAIT";
- case RHM_IORES_EMPTY:
- return "RHM_IORES_EMPTY";
- case RHM_IORES_FULL:
- return "RHM_IORES_FULL";
- case RHM_IORES_BUSY:
- return "RHM_IORES_BUSY";
- case RHM_IORES_TXPENDING:
- return "RHM_IORES_TXPENDING";
- case RHM_IORES_NOTIMPL:
- return "RHM_IORES_NOTIMPL";
- }
- return "<iores unknown>";
-}
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -57,21 +57,6 @@
{
namespace journal
{
- // TODO: Change this to flags, as multiple of these conditions may exist simultaneously
- /**
- * \brief Enumeration of possilbe return states from journal read and write operations.
- */
- enum _iores
- {
- RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly.
- RHM_IORES_AIO_WAIT, ///< IO operation suspended - all pages are waiting for AIO.
- RHM_IORES_EMPTY, ///< During read operations, nothing further is available to read.
- RHM_IORES_FULL, ///< During write operations, the journal files are full.
- RHM_IORES_BUSY, ///< Another blocking operation is in progress.
- RHM_IORES_TXPENDING, ///< Operation blocked by pending transaction.
- RHM_IORES_NOTIMPL ///< Not yet implemented.
- };
- typedef _iores iores;
/**
* \brief Abstract class for managing either read or write page cache of arbitrary size and
@@ -146,7 +131,6 @@
virtual const u_int32_t get_events(page_state state) = 0;
inline const u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
static const char* page_state_str(page_state ps);
- static const char* iores_str(iores res);
protected:
virtual void initialize();
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -41,6 +41,7 @@
}
}
+#include <jrnl/enums.hpp>
#include <jrnl/pmgr.hpp>
#include <jrnl/rec_hdr.hpp>
#include <jrnl/rrfc.hpp>
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -51,7 +51,7 @@
rrfc::~rrfc() {}
void
-rrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index)
+rrfc::initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index)
{
_nfiles = nfiles;
_fh_arr = fh_arr;
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -55,7 +55,7 @@
class rrfc
{
protected:
- u_int32_t _nfiles; ///< Number of data files
+ u_int16_t _nfiles; ///< Number of data files
nlfh** _fh_arr; ///< Array of pointers to data file handles
u_int16_t _fh_index; ///< Index of current file handle
nlfh* _curr_fh; ///< Pointer to current file handle
@@ -71,7 +71,7 @@
* each of which correspond to one of the physical files.
* \param fh_index Initial index of journal file. Default = 0.
*/
- void initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
+ void initialize(const u_int16_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0);
/**
* \brief Rotate active file handle to next file in rotating file group.
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -119,7 +119,7 @@
if (this_data_len != tot_data_len && !external)
return RHM_IORES_NOTIMPL;
- iores res = pre_write_check(WMGR_ENQUEUE, dtokp);
+ iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
if (res != RHM_IORES_SUCCESS)
return res;
@@ -926,7 +926,8 @@
}
const iores
-wmgr::pre_write_check(_op_type op, data_tok* dtokp)
+wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp, const size_t xidsize,
+ const size_t dsize, const bool external) const
{
// Check status of current file
if (!_wrfc.is_reset())
@@ -955,13 +956,20 @@
switch (op)
{
case WMGR_ENQUEUE:
- if (!dtokp->is_writable())
{
- std::ostringstream oss;
- oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
- oss << " dtok_state=" << dtokp->wstate_str();
- throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
+ // Check for enqueue reaching cutoff threshold
+ u_int32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
+ external));
+ if (_wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
+ return RHM_IORES_ENQCAPTHRESH;
+ if (!dtokp->is_writable())
+ {
+ std::ostringstream oss;
+ oss << "op=" << _op_str[op] << " dtok_id=" << dtokp->id();
+ oss << " dtok_state=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"pre_write_check");
+ }
}
break;
case WMGR_DEQUEUE:
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -41,6 +41,7 @@
}
}
+#include <jrnl/enums.hpp>
#include <jrnl/pmgr.hpp>
#include <jrnl/wrfc.hpp>
#include <set>
@@ -117,7 +118,9 @@
private:
void initialize();
- const iores pre_write_check(_op_type op, data_tok* dtokp);
+ const iores pre_write_check(const _op_type op, const data_tok* const dtokp,
+ const size_t xidsize = 0, const size_t dsize = 0, const bool external = false)
+ const;
const u_int64_t initialize_rid(const bool cont, data_tok* dtokp);
const iores write_flush();
const iores rotate_file();
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -30,6 +30,7 @@
* The GNU Lesser General Public License is available in the file COPYING.
*/
+#include <iostream> // TODO: remove later, for debug only.
#include <jrnl/wrfc.hpp>
#include <jrnl/jerrno.hpp>
@@ -42,6 +43,9 @@
wrfc::wrfc():
rrfc(),
+ _fsize_sblks(0),
+ _fsize_dblks(0),
+ _enq_cap_offs_dblks(0),
#ifdef DRHM_TESTVALS
// TODO: Find method of specifying 64-bit literals under gcc with -pedantic option
_rid(u_int64_t(0xffeeddcc) << 32), // For testing high rids
@@ -55,7 +59,7 @@
wrfc::~wrfc() {}
void
-wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp)
+wrfc::initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, nlfh** fh_arr, rcvdat* rdp)
{
if (rdp)
{
@@ -74,6 +78,12 @@
#endif
_reset_ok = false;
}
+ _fsize_sblks = fsize_sblks;
+ _fsize_dblks = fsize_sblks * JRNL_SBLK_SIZE;
+ _enq_cap_offs_dblks = _fsize_dblks * _nfiles * (100 - JRNL_ENQ_THRESHOLD) / 100;
+ // Check the offset is at least one file; if not, make it so
+ if (_enq_cap_offs_dblks < _fsize_dblks)
+ _enq_cap_offs_dblks = _fsize_dblks;
}
bool
@@ -85,7 +95,7 @@
if (_fh_index == _nfiles)
{
_fh_index = 0;
- _owi = !_owi; // flip owi
+ _owi = !_owi;
}
_curr_fh = _fh_arr[_fh_index];
return reset(); //Checks if file is still in use (ie not fully dequeued yet)
@@ -98,5 +108,29 @@
return _reset_ok;
}
+const bool
+wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
+{
+ u_int32_t subm_dblks = subm_cnt_dblks();
+ // This compensates for new files which don't have their file headers written yet,
+ // as file header space cannot be included in this calculation.
+ if (subm_dblks == 0)
+ subm_dblks = 4;
+ u_int32_t fwd_dblks = subm_dblks + enq_dsize_dblks + _enq_cap_offs_dblks;
+ u_int16_t findex = _fh_index;
+ nlfh* fhp = _curr_fh;
+ while (fwd_dblks && !(findex != _fh_index && fhp->enqcnt()))
+ {
+ fwd_dblks -= fwd_dblks > _fsize_dblks ? _fsize_dblks : fwd_dblks;
+ if (fwd_dblks)
+ {
+ if (++findex == _nfiles)
+ findex = 0;
+ fhp = _fh_arr[findex];
+ }
+ }
+ return findex != _fh_index && fhp->enqcnt() > 0;
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -55,9 +55,12 @@
class wrfc : public rrfc
{
private:
- u_int64_t _rid; ///< Master counter for record ID (rid)
- bool _reset_ok; ///< Flag set when reset succeeds
- bool _owi; ///< Overwrite indicator
+ u_int32_t _fsize_sblks; ///< Size of journal files in sblks
+ u_int32_t _fsize_dblks; ///< Size of journal files in dblks
+ u_int32_t _enq_cap_offs_dblks; ///< Enqueue capacity offset
+ u_int64_t _rid; ///< Master counter for record ID (rid)
+ bool _reset_ok; ///< Flag set when reset succeeds
+ bool _owi; ///< Overwrite indicator
public:
wrfc();
@@ -66,12 +69,14 @@
/**
* \brief Initialize the controller.
* \param nfiles Number of files in the rotating file group.
+ * \param fsize_sblks Size of each journal file in sblks.
* \param fh_arr Pointer to an array of file handles (nlogging_fh or subclasses),
* each of which correspond to one of the physical files.
* \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
* NULL.
*/
- void initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp = NULL);
+ void initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, nlfh** fh_arr,
+ rcvdat* rdp = NULL);
/**
* \brief Rotate active file handle to next file in rotating file group.
@@ -106,6 +111,7 @@
inline const u_int32_t aio_outstanding_dblks() const
{ return _curr_fh->wr_aio_outstanding_dblks(); }
inline const bool file_rotate() const { return _curr_fh->wr_file_rotate(); }
+ const bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
}; // class wrfc
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-12-12 21:47:23 UTC (rev 1473)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-12-12 22:22:30 UTC (rev 1474)
@@ -758,7 +758,6 @@
switch (res)
{
case rhm::journal::RHM_IORES_SUCCESS:
- //((char*)mbuff)[msize] = '\0';
return false;
case rhm::journal::RHM_IORES_AIO_WAIT:
if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
@@ -772,21 +771,9 @@
CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
}
break;
- case rhm::journal::RHM_IORES_EMPTY:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_EMPTY");
- case rhm::journal::RHM_IORES_FULL:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_FULL");
- case rhm::journal::RHM_IORES_BUSY:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_BUSY");
- case rhm::journal::RHM_IORES_TXPENDING:
- delete dtp;
- CPPUNIT_FAIL("RHM_IORES_TXPENDING");
default:
delete dtp;
- CPPUNIT_FAIL("unknown return value");
+ CPPUNIT_FAIL(rhm::journal::iores_str(res));
}
return true;
}
More information about the rhmessaging-commits
mailing list