Author: kpvdr
Date: 2008-10-01 15:42:31 -0400 (Wed, 01 Oct 2008)
New Revision: 2561
Added:
store/trunk/cpp/lib/jrnl/rfc.cpp
store/trunk/cpp/lib/jrnl/rfc.hpp
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/fcntl.cpp
store/trunk/cpp/lib/jrnl/fcntl.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Removed read file handles from class fcntl, moved a single read file to class rrfc where
it is opened and closed as needed. This reduces the number of consumed file handles to a
half of the previous number; now only one file handle plus one when lazy-load is active is
consumed per journal file instead of two per file. Additional code tidy-up also performed,
including adding class rfc as a superclass to both rrfc and wrfc.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/Makefile.am 2008-10-01 19:42:31 UTC (rev 2561)
@@ -58,6 +58,7 @@
jrnl/lf_map.cpp \
jrnl/pmgr.cpp \
jrnl/rmgr.cpp \
+ jrnl/rfc.cpp \
jrnl/rrfc.cpp \
jrnl/slock.cpp \
jrnl/time_ns.cpp \
@@ -91,6 +92,7 @@
jrnl/rec_hdr.hpp \
jrnl/rec_tail.hpp \
jrnl/rmgr.hpp \
+ jrnl/rfc.hpp \
jrnl/rrfc.hpp \
jrnl/slock.hpp \
jrnl/time_ns.hpp \
Modified: store/trunk/cpp/lib/jrnl/fcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/fcntl.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/fcntl.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -51,7 +51,6 @@
_fid(fid),
_lid(lid),
_ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
- _rd_fh(-1),
_wr_fh(-1),
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
@@ -61,12 +60,12 @@
_aio_cnt(0)
{
initialize(fbasename, fid, lid, jfsize_sblks, ro);
- open_fh();
+ open_wr_fh();
}
fcntl::~fcntl()
{
- close_fh();
+ close_wr_fh();
}
bool
@@ -113,6 +112,32 @@
return true;
}
+int
+fcntl::open_wr_fh()
+{
+ if (_wr_fh < 0)
+ {
+ _wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP
| S_IROTH); // 0644 -rw-r--r--
+ if (_wr_fh < 0)
+ {
+ std::ostringstream oss;
+ oss << "file=\"" << _fname <<
"\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl",
"open_fh");
+ }
+ }
+ return _wr_fh;
+}
+
+void
+fcntl::close_wr_fh()
+{
+ if (_wr_fh >= 0)
+ {
+ ::close(_wr_fh);
+ _wr_fh = -1;
+ }
+}
+
u_int32_t
fcntl::add_enqcnt(u_int32_t a)
{
@@ -263,41 +288,6 @@
#endif
}
-void
-fcntl::open_fh()
-{
- _rd_fh = ::open(_fname.c_str(), O_RDONLY | O_DIRECT);
- if (_rd_fh < 0)
- {
- std::ostringstream oss;
- oss << "file=\"" << _fname <<
"\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_FCNTL_OPENRD, oss.str(), "fcntl",
"open_fh");
- }
- _wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
- if (_wr_fh < 0)
- {
- std::ostringstream oss;
- oss << "file=\"" << _fname <<
"\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl",
"open_fh");
- }
-}
-
-void
-fcntl::close_fh()
-{
- if (_rd_fh >= 0)
- {
- ::close(_rd_fh);
- _rd_fh = -1;
- }
- if (_wr_fh >= 0)
- {
- ::close(_wr_fh);
- _wr_fh = -1;
- }
-}
-
std::string
fcntl::filename(const std::string& fbasename, const u_int16_t fid)
{
Modified: store/trunk/cpp/lib/jrnl/fcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/fcntl.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/fcntl.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -61,7 +61,6 @@
u_int16_t _fid; ///< File ID (file number in order of
creation)
u_int16_t _lid; ///< Logical ID (ordinal number in ring
store)
const u_int32_t _ffull_dblks; ///< File size in dblks (incl. file header)
- int _rd_fh; ///< Read file handle
int _wr_fh; ///< Write file handle
u_int32_t _rec_enqcnt; ///< Count of enqueued records
u_int32_t _rd_subm_cnt_dblks; ///< Read file count (data blocks) for
submitted AIO
@@ -80,11 +79,14 @@
virtual void rd_reset();
virtual bool wr_reset(const rcvdat* const ro = 0);
+ virtual int open_wr_fh();
+ virtual void close_wr_fh();
+ inline bool is_wr_fh_open() const { return _wr_fh >= 0; }
+
inline const std::string& fname() const { return _fname; }
inline u_int16_t fid() const { return _fid; }
inline u_int16_t lid() const { return _lid; }
inline void set_lid(const u_int16_t lid) { _lid = lid; }
- inline int rd_fh() const { return _rd_fh; }
inline int wr_fh() const { return _wr_fh; }
inline u_int32_t enqcnt() const { return _rec_enqcnt; }
inline u_int32_t incr_enqcnt() { return ++_rec_enqcnt; }
@@ -137,8 +139,6 @@
virtual void initialize(const std::string& fbasename, const u_int16_t fid,
const u_int16_t lid,
const u_int32_t jfsize_sblks, const rcvdat* const ro);
- virtual void open_fh();
- virtual void close_fh();
static std::string filename(const std::string& fbasename, const u_int16_t
fid);
void clean_file(const u_int32_t jfsize_sblks);
void create_jfile(const u_int32_t jfsize_sblks);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -140,6 +140,7 @@
_wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr);
_rrfc.initialize(_num_jfiles, _fc_arr);
+ _rrfc.set_findex(0);
_rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS);
@@ -206,7 +207,8 @@
}
_wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
- _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, _fc_arr);
+ _rrfc.set_findex(_rcvdat.ffid(_num_jfiles));
_rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS, (_rcvdat._lffull ? 0 : _rcvdat._eo));
@@ -223,7 +225,8 @@
for (u_int16_t i=0; i<_num_jfiles; i++)
_fc_arr[i]->reset(&_rcvdat);
_wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
- _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, _fc_arr);
+ _rrfc.set_findex(_rcvdat.ffid(_num_jfiles));
_rmgr.recover_complete();
_readonly_flag = false;
}
@@ -416,6 +419,7 @@
_stop_flag = true;
if (!_readonly_flag)
flush(block_till_aio_cmpl);
+ _rrfc.reset();
}
u_int16_t
@@ -428,8 +432,8 @@
if (++ffid >= _num_jfiles)
ffid = 0;
}
- if (ffid != _rrfc.fid())
- _rrfc.reset(ffid);
+ if (!_rrfc.is_active())
+ _rrfc.set_findex(ffid);
return ffid;
}
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -75,15 +75,15 @@
const u_int32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a;
// class fcntl
-const u_int32_t jerrno::JERR_FCNTL_OPENRD = 0x0400;
-const u_int32_t jerrno::JERR_FCNTL_OPENWR = 0x0401;
-const u_int32_t jerrno::JERR_FCNTL_WRITE = 0x0402;
-const u_int32_t jerrno::JERR_FCNTL_CLOSE = 0x0403;
-const u_int32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0404;
-const u_int32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0405;
-const u_int32_t jerrno::JERR_FCNTL_RDOFFSOVFL = 0x0406;
+const u_int32_t jerrno::JERR_FCNTL_OPENWR = 0x0400;
+const u_int32_t jerrno::JERR_FCNTL_WRITE = 0x0401;
+const u_int32_t jerrno::JERR_FCNTL_CLOSE = 0x0402;
+const u_int32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0403;
+const u_int32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0404;
+const u_int32_t jerrno::JERR_FCNTL_RDOFFSOVFL = 0x0405;
-// class file_hdr
+// class rrfc
+const u_int32_t jerrno::JERR_RRFC_OPENRD = 0x0500;
// class jrec, enq_rec, deq_rec, txn_rec
const u_int32_t jerrno::JERR_JREC_BADRECHDR = 0x0600;
@@ -162,7 +162,6 @@
_err_map[JERR_JDIR_BADFTYPE] = "JERR_JDIR_BADFTYPE: Bad or unknown file type
(stat mode).";
// class fcntl
- _err_map[JERR_FCNTL_OPENRD] = "JERR_FCNTL_OPENRD: Unable to open file for
read.";
_err_map[JERR_FCNTL_OPENWR] = "JERR_FCNTL_OPENWR: Unable to open file for
write.";
_err_map[JERR_FCNTL_WRITE] = "JERR_FCNTL_WRITE: Unable to write to file.";
_err_map[JERR_FCNTL_CLOSE] = "JERR_FCNTL_CLOSE: File close failed.";
@@ -174,7 +173,8 @@
"Attempted increase read offset past write offset.";
- // class file_hdr
+ // class rrfc
+ _err_map[JERR_RRFC_OPENRD] = "JERR_RRFC_OPENRD: Unable to open file for
read.";
// class jrec, enq_rec, deq_rec, txn_rec
_err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record
header.";
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -93,7 +93,6 @@
static const u_int32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type
(stat mode)
// class fcntl
- static const u_int32_t JERR_FCNTL_OPENRD; ///< Unable to open file for
read
static const u_int32_t JERR_FCNTL_OPENWR; ///< Unable to open file for
write
static const u_int32_t JERR_FCNTL_WRITE; ///< Unable to write to file
static const u_int32_t JERR_FCNTL_CLOSE; ///< File close failed
@@ -101,7 +100,8 @@
static const u_int32_t JERR_FCNTL_CMPLOFFSOVFL; ///< Increased cmpl offs past
subm offs
static const u_int32_t JERR_FCNTL_RDOFFSOVFL; ///< Increased read offs past
write offs
- // class file_hdr
+ // class rrfc
+ static const u_int32_t JERR_RRFC_OPENRD; ///< Unable to open file for
read
// class jrec, enq_rec, deq_rec, txn_rec
static const u_int32_t JERR_JREC_BADRECHDR; ///< Invalid data record
header
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -89,6 +89,18 @@
_enq_cnt_list.resize(num_jfiles, 0);
}
+ // Find first fid with enqueued records
+ u_int16_t ffid(const u_int16_t num_jfiles)
+ {
+ u_int16_t index = _ffid;
+ while (index != _lfid && _enq_cnt_list[index] == 0)
+ {
+ if (++index >= num_jfiles)
+ index = 0;
+ }
+ return index;
+ }
+
std::string to_string(std::string& jid)
{
std::ostringstream oss;
Added: store/trunk/cpp/lib/jrnl/rfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/rfc.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -0,0 +1,84 @@
+/**
+* \file rfc.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class mrg::journal::rfc (rotating
+* file controller). See comments in file rfc.hpp for details.
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "jrnl/rfc.hpp"
+
+#include <cassert>
+
+namespace mrg
+{
+namespace journal
+{
+
+rfc::rfc(): _nfiles(0), _fc_arr(0), _fc_index(0), _curr_fc(0)
+{}
+
+rfc::~rfc()
+{}
+
+void
+rfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
+{
+ _nfiles = nfiles;
+ _fc_arr = fc_arr;
+}
+
+void
+rfc::reset()
+{
+ unset_findex();
+ _nfiles = 0;
+ _fc_arr = 0;
+}
+
+void
+rfc::set_findex(const u_int16_t fc_index)
+{
+ _fc_index = fc_index;
+ _curr_fc = _fc_arr[_fc_index];
+ _curr_fc->rd_reset();
+}
+
+void
+rfc::unset_findex()
+{
+ _fc_index = 0;
+ _curr_fc = 0;
+}
+
+fcntl*
+rfc::file_controller(const u_int16_t pg_index) const
+{
+ assert(pg_index < _nfiles);
+ return _fc_arr[pg_index];
+}
+
+} // namespace journal
+} // namespace mrg
Added: store/trunk/cpp/lib/jrnl/rfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/rfc.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -0,0 +1,199 @@
+/**
+* \file rfc.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class mrg::journal::rfc (rotating
+* file controller). See class documentation for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef mrg_journal_rfc_hpp
+#define mrg_journal_rfc_hpp
+
+namespace mrg
+{
+namespace journal
+{
+class rfc;
+}
+}
+
+#include "jrnl/fcntl.hpp"
+#include "jrnl/enums.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+ /**
+ * \class rfc
+ * \brief Rotating File Controller (rfc) - Class to handle the manangement of an array
of file controllers (fcntl)
+ * objects for use in a circular disk buffer (journal). Each fcntl object
corresponds to a file in the journal.
+ *
+ * The following states exist in this class:
+ *
+ * <pre>
+ * is_initialized()
is_active()
+ * +===+ _nfiles == 0
+ * +---------->| | Uninitialized: _fc_arr == 0 F
F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | |
+ * | | |
+ * | reset() initialize()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * reset() | | Inactive: _fc_arr != 0 T F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | |
+ * | | |
+ * | unset_findex() set_findex()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * +---------- | | Active: _fc_arr != 0 T T
+ * +===+ _curr_fc != 0
+ * </pre>
+ *
+ * The Uninitialized state is where the class starts after construction. Once the
number of files is known and
+ * the array of file controllers allocated, then initialize() is called to set these,
causing the state to move
+ * to Inactive.
+ *
+ * Once the index of the active file is known, then calling activate() will set the
index and internal pointer
+ * to the currently active file controller. This moves the state to Active.
+ *
+ * Note that if the number of files change, then the object will have to be reset and
reinitialized with a new array
+ * of fcntl objects of the appropriate size.
+ */
+ class rfc
+ {
+ protected:
+ u_int16_t _nfiles; ///< Number of data files
+ fcntl** _fc_arr; ///< Array of pointers to data file controllers
+ u_int16_t _fc_index; ///< Index of current file controller
+ fcntl* _curr_fc; ///< Pointer to current file controller
+
+ public:
+ rfc();
+ virtual ~rfc();
+
+ /**
+ * \brief Initialize the controller, moving from state Uninitialized to Inactive.
The main function of
+ * initialize() is to set the number of files and the pointer to the fcntl
array.
+ * \param nfiles Number of files in the rotating file group.
+ * \param fc_arr Pointer to an array of file controller, each of which correspond
to one of
+ * the physical journal files.
+ */
+ virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
+
+ /**
+ * \brief Reset the controller to Uninitialized state, usually called when the
journal is stopped. Once called,
+ * initialize() must be called to reuse an instance.
+ */
+ virtual void reset();
+
+ /**
+ * /brief Check initialization state: true = Not Uninitialized, ie Initialized or
Active; false = Uninitialized.
+ */
+ virtual inline bool is_initialized() const { return _nfiles == 0 || _fc_arr == 0;
}
+
+ /**
+ * /brief Sets the current file index and active fcntl object. Moves to state
Active.
+ */
+ virtual void set_findex(const u_int16_t fc_index);
+
+ /**
+ * /brief Nulls the current file index and active fcntl pointer, moves to state
Inactive.
+ */
+ virtual void unset_findex();
+
+ /**
+ * /brief Check the file handle state: true = Active; false = Not Active, ie
Inactive or Uninitialized.
+ */
+ virtual inline bool is_active() const { return _curr_fc != 0; }
+
+ /**
+ * \brief Rotate active file controller to next file in rotating file group.
+ * \exception jerrno::JERR__NINIT if called before calling initialize().
+ */
+ virtual iores rotate() = 0;
+
+ /**
+ * \brief Returns the index of the currently active file within the rotating file
group.
+ */
+ inline u_int16_t index() const { return _fc_index; }
+
+ /**
+ * \brief Returns the currently active journal file controller within the rotating
file group.
+ */
+ inline fcntl* file_controller() const { return _curr_fc; }
+
+ /**
+ * \brief Returns the journal file controller for the given page index within the
rotating file group.
+ */
+ fcntl* file_controller(const u_int16_t pg_index) const;
+
+ /**
+ * \brief Returns the currently active file id (fid)
+ */
+ inline u_int16_t fid() const { return _curr_fc->fid(); }
+
+ // Convenience access methods to current file controller
+ // Note: Call only when in open state
+
+ virtual inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
+ virtual inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
+ virtual inline u_int32_t incr_enqcnt(u_int16_t fid) { return
_fc_arr[fid]->incr_enqcnt(); }
+ virtual inline u_int32_t add_enqcnt(u_int32_t a) { return
_curr_fc->add_enqcnt(a); }
+ virtual inline u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a) { return
_fc_arr[fid]->add_enqcnt(a); }
+ virtual inline u_int32_t decr_enqcnt(u_int16_t fid) { return
_fc_arr[fid]->decr_enqcnt(); }
+ virtual inline u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s) { return
_fc_arr[fid]->subtr_enqcnt(s); }
+
+ virtual inline u_int32_t subm_cnt_dblks() const = 0;
+ virtual inline std::size_t subm_offs() const = 0;
+ virtual inline u_int32_t add_subm_cnt_dblks(u_int32_t a) = 0;
+
+ virtual inline u_int32_t cmpl_cnt_dblks() const = 0;
+ virtual inline std::size_t cmpl_offs() const = 0;
+ virtual inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) = 0;
+
+ virtual inline bool is_void() const = 0;
+ virtual inline bool is_empty() const = 0;
+ virtual inline u_int32_t remaining_dblks() const = 0;
+ virtual inline bool is_full() const = 0;
+ virtual inline bool is_compl() const = 0;
+ virtual inline u_int32_t aio_outstanding_dblks() const = 0;
+ virtual inline bool file_rotate() const = 0;
+
+ // Debug aid
+ virtual std::string status_str() const = 0;
+ }; // class rfc
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_rfc_hpp
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -46,7 +46,6 @@
pmgr(jc, emap, tmap),
_rrfc(rrfc),
_hdr(),
- _valid(false),
_fhdr_buffer(0),
_fhdr_aio_cb_ptr(0),
_fhdr_rd_outstanding(false)
@@ -303,8 +302,6 @@
std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
-// std::size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
-// _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
_pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE *
JRNL_SBLK_SIZE;
@@ -314,7 +311,7 @@
_rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
_fhdr_rd_outstanding = false;
- _valid = true;
+ _rrfc.set_valid();
}
}
@@ -331,12 +328,11 @@
void
rmgr::invalidate()
{
- if (_valid)
+ if (_rrfc.is_valid())
{
- _valid = false;
for (int i=0; i<_cache_num_pages; i++)
_page_cb_arr[i]._state = UNUSED;
- _rrfc.reset();
+ _rrfc.unset_findex();
_pg_offset_dblks = 0;
}
}
@@ -353,13 +349,8 @@
if (_aio_evt_rem)
get_events();
- if (!_valid)
- {
- if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
- return RHM_IORES_EMPTY;
- else
+ if (!_rrfc.is_valid())
return RHM_IORES_RCINVALID;
- }
// block reads until outstanding file header read completes as fro is needed to read
if (_fhdr_rd_outstanding)
@@ -564,18 +555,18 @@
return RHM_IORES_SUCCESS;
for (int16_t i=0; i<num_uninit; i++)
{
- if (_rrfc.is_void()) // Nothing to do; this file not yet written to
- break;
- if (!_valid)
+ if (!_rrfc.is_valid())
{
- u_int16_t fid = _jc->get_earliest_fid();
+ _jc->get_earliest_fid(); // calls _rrfc.set_findex()
// If this file has not yet been written to, return RHM_IORES_EMPTY
- if (_rrfc.file_controller(fid)->rd_void())
+ if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
return RHM_IORES_EMPTY;
- init_file_header_read(fid);
+ init_file_header_read();
break;
}
+ if (_rrfc.is_void()) // Nothing to do; this file not yet written to
+ break;
if (_rrfc.subm_offs() == 0)
{
@@ -657,9 +648,9 @@
}
void
-rmgr::init_file_header_read(u_int16_t fid)
+rmgr::init_file_header_read()
{
- int rfh = _rrfc.file_controller(fid)->rd_fh();
+ int rfh = _rrfc.fh();
aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr",
"init_file_header_read");
@@ -673,136 +664,13 @@
rmgr::get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t&
dsize_avail,
const void** const data, bool auto_discard)
{
- iores res = pre_read_check(0);
- if (res != RHM_IORES_SUCCESS)
- return res;
-
- _hdr.reset();
- // Read header, determine next record type
- while (true)
- {
- if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
- {
- aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_EMPTY;
- }
- if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
- {
- aio_cycle();
- return RHM_IORES_PAGE_AIOWAIT;
- }
- void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
- (_pg_offset_dblks * JRNL_DBLK_SIZE));
- std::memcpy(&_hdr, rptr, sizeof(hdr));
- switch (_hdr._magic)
- {
- case RHM_JDAT_ENQ_MAGIC:
- {
- std::size_t xid_size = *((std::size_t*)((char*)rptr + sizeof(hdr)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- + sizeof(u_int32_t) // filler0
-#endif
- ));
- std::size_t data_size = *((std::size_t*)((char*)rptr + sizeof(hdr) +
sizeof(u_int64_t)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- + sizeof(u_int32_t) // filler1
-#endif
- ));
- // TODO: Check if transaction is still in transaction map. If so,
block read
- // (unless in recovery, in whcih case return info normally
-// std::string xid = ?? (decode xid here)
-// if (xid_size && !readonly && tx_map.exists(xid))
-// return RHM_IORES_TXPENDING;
- rid = _hdr._rid;
- dsize = data_size;
-
- // Analyze how much of message is available
- void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks *
_sblksize * _cache_num_pages;
- u_int16_t data_start_pg_index = _pg_index;
- u_int16_t data_start_pg_index = _pg_index;
- for (u_int16_t i=0; i<_cache_num_pages; i++)
- {
- pi = (i + _pg_index) % _cache_num_pages;
- if (data_ptr >= _page_ptr_arr[pi] &&
- data_ptr < (char*)_page_ptr_arr[pi] +
_cache_pgsize_sblks * _sblksize)
- data_end_pg_index = pi; // found start page index
-
- }
- u_int16_t data_end_pg_index;
- u_int16_t last_pg_avail_index;
-
- void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks *
_sblksize * _cache_num_pages;
- if (data_ptr >= page_end_ptr) // folded, go back to first page...
- data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
- void* data_end_ptr = (char*)data_ptr + data_size;
- if (data_end_ptr >= page_end_ptr) // folded, go back to first
page...
- data_end_ptr = (char*)_page_base_ptr + data_end_ptr -
page_end_ptr;
- dsize_avail = ??;
- if(data_ptr folded)
- else
- *data = data_ptr;
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- consume_deq();
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- consume_filler();
- break;
- default:
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "Magic=0x" << std::setw(8) <<
_hdr._magic << std::dec;
- throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(),
"rmgr", "get");
- } // switch(_hdr._magic)
- } // while
+ return RHM_IORES_SUCCESS;
}
const iores
rmgr::discard(data_tok* dtokp)
{
- iores res = pre_read_check(dtokp);
- if (res != RHM_IORES_SUCCESS)
- return res;
-
- _hdr.reset();
- // Read header, determine next record type
- while (true)
- {
- if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
- {
- aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_EMPTY;
- }
- if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
- {
- aio_cycle();
- return RHM_IORES_PAGE_AIOWAIT;
- }
- void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
- (_pg_offset_dblks * JRNL_DBLK_SIZE));
- std::memcpy(&_hdr, rptr, sizeof(hdr));
- switch (_hdr._magic)
- {
- case RHM_JDAT_ENQ_MAGIC:
- {
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- consume_deq();
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- consume_filler();
- break;
- default:
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "Magic=0x" << std::setw(8) <<
_hdr._magic << std::dec;
- throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(),
"rmgr", "discard");
- } // switch
- } // while
+ return RHM_IORES_SUCCESS;
}
*/
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -65,7 +65,6 @@
rec_hdr _hdr; ///< Header used to determind record type
rd_aio_cb _cb; ///< Callback function pointer for AIO events
- bool _valid; ///< Flag is true when read pages contain vailid
data
void* _fhdr_buffer; ///< Buffer used for fhdr reads
aio_cb* _fhdr_aio_cb_ptr; ///< iocb pointer for fhdr reads
file_hdr _fhdr; ///< file header instance for reading file
headers
@@ -81,8 +80,7 @@
bool ignore_pending_txns);
u_int32_t get_events(page_state state = AIO_COMPLETE);
void recover_complete();
- inline bool is_valid() const {return _valid; }
- inline iores synchronize() { if (!_valid) return aio_cycle(); return
RHM_IORES_SUCCESS; }
+ inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS;
return aio_cycle(); }
void invalidate();
/* TODO (if required)
@@ -106,7 +104,7 @@
u_int32_t dblks_rem() const;
void set_params_null(void** const datapp, std::size_t& dsize, void** const
xidpp,
std::size_t& xidsize);
- void init_file_header_read(u_int16_t fid);
+ void init_file_header_read();
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -31,7 +31,8 @@
#include "jrnl/rrfc.hpp"
-#include <cassert>
+#include <cerrno>
+#include <fcntl.h>
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
@@ -40,44 +41,44 @@
namespace journal
{
-rrfc::rrfc():
- _nfiles(0),
- _fc_arr(0),
- _fc_index(0),
- _curr_fc(0)
+rrfc::rrfc(): rfc(), _fh(-1), _valid(false)
{}
-rrfc::~rrfc() {}
+rrfc::~rrfc()
+{
+ close_fh();
+}
void
-rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp)
+rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
{
- _nfiles = nfiles;
- _fc_arr = fc_arr;
- if (rdp)
- {
- // Find first file with enqueued records
- u_int16_t index = rdp->_ffid;
- while (index != rdp->_lfid && rdp->_enq_cnt_list[index] == 0)
- {
- if (++index >= _nfiles)
- index = 0;
- }
- reset(index);
- }
- else
- reset(0);
+ rfc::initialize(nfiles, fc_arr);
+ _valid = false;
}
void
-rrfc::reset(u_int16_t fc_index)
+rrfc::reset()
{
- _fc_index = fc_index;
- _curr_fc = _fc_arr[_fc_index];
- _curr_fc->rd_reset();
+ unset_findex();
+ rfc::reset();
}
-bool
+void
+rrfc::set_findex(const u_int16_t fc_index)
+{
+ rfc::set_findex(fc_index);
+ open_fh(_curr_fc->fname());
+}
+
+void
+rrfc::unset_findex()
+{
+ set_invalid();
+ close_fh();
+ rfc::unset_findex();
+}
+
+iores
rrfc::rotate()
{
if (!_nfiles)
@@ -88,16 +89,11 @@
fcntl* next_fc = _fc_arr[next_fc_index];
_fc_index = next_fc_index;
_curr_fc = next_fc;
- return true;
+ open_fh(_curr_fc->fname());
+ return RHM_IORES_SUCCESS;
}
-fcntl*
-rrfc::file_controller(u_int16_t pg_index) const
-{
- assert(pg_index < _nfiles);
- return _fc_arr[pg_index];
-}
-
+// TODO: update this to reflect all status data
std::string
rrfc::status_str() const
{
@@ -106,5 +102,30 @@
return oss.str();
}
+// === protected functions ===
+
+void
+rrfc::open_fh(const std::string& fn)
+{
+ close_fh();
+ _fh = ::open(fn.c_str(), O_RDONLY | O_DIRECT);
+ if (_fh < 0)
+ {
+ std::ostringstream oss;
+ oss << "file=\"" << fn << "\""
<< FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_RRFC_OPENRD, oss.str(), "rrfc",
"open_fh");
+ }
+}
+
+void
+rrfc::close_fh()
+{
+ if (_fh >= 0)
+ {
+ ::close(_fh);
+ _fh = -1;
+ }
+}
+
} // namespace journal
} // namespace mrg
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
-* Copyright 2007 Red Hat, Inc.
+* Copyright 2007, 2008 Red Hat, Inc.
*
* This file is part of Red Hat Messaging.
*
@@ -41,8 +41,8 @@
}
}
-#include <cstddef>
#include "jrnl/fcntl.hpp"
+#include "jrnl/rfc.hpp"
namespace mrg
{
@@ -51,93 +51,130 @@
/**
* \class rrfc
- * \brief Class to handle read manangement of a journal rotating file controller.
+ * \brief Read Rotating File Controller (rrfc) - Subclassed from pure virtual class
rfc. Used to control the read
+ * pipeline in a rotating file buffer or journal. See class rfc for further
details.
+ *
+ * The states that exist in this class are identical to class rfc from which it
inherits, but in addition, the value
+ * of the read file handle _fh is also considered. The calls to set_findex also opens
the file handle _fh to the
+ * active file for reading. Similarly, unset_findex() closes this file handle.
+ *
+ * <pre>
+ * is_initialized()
is_active()
+ * +===+ _nfiles == 0
+ * +---------->| | Uninitialized: _fc_arr == 0 F
F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | | _fh == -1
+ * | | |
+ * | reset() initialize()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * reset() | | Inactive: _fc_arr != 0 T F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | | _fh == -1
+ * | | |
+ * | unset_findex() set_findex()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * +---------- | | Active: _fc_arr != 0 T T
+ * +===+ _curr_fc != 0
+ * _fh >= 0
+ * </pre>
+ *
+ * In adition to the states above, class rrfc contains a validity flag. This is
operated indepenedently of the state
+ * machine. This flag (_valid) indicates when the read buffers are valid for reading.
This is not strictly a state,
+ * but simply a flag used to keep track of the status, and is set/unset with calls to
set_valid() and set_invalid()
+ * respectively.
*/
- class rrfc
+ class rrfc : public rfc
{
protected:
- u_int16_t _nfiles; ///< Number of data files
- fcntl** _fc_arr; ///< Array of pointers to data file controllers
- u_int16_t _fc_index; ///< Index of current file controller
- fcntl* _curr_fc; ///< Pointer to current file controller
+ int _fh; ///< Read file handle
+ bool _valid; ///< Flag is true when read pages contain vailid data
public:
rrfc();
virtual ~rrfc();
/**
- * \brief Initialize the controller.
+ * \brief Initialize the controller, moving from state Uninitialized to
Initialized. The main function of
+ * initialize() is to set the number of files and the pointer to the fcntl
array.
* \param nfiles Number of files in the rotating file group.
* \param fc_arr Pointer to an array of file controller, each of which correspond
to one of
* the physical journal files.
- * \param rdp Struct carrying restore information. Optional for non-restore use,
defaults to
- * 0 (NULL).
*/
- virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp =
0);
+ virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
- void reset(u_int16_t fc_index = 0);
+ /**
+ * \brief Reset the controller to Uninitialized state, usually called when the
journal is stopped. Once called,
+ * initialize() must be called to reuse an instance.
+ */
+ void reset();
/**
- * \brief Rotate active file controller to next file in rotating file group.
- * \exception jerrno::JERR__NINIT if called before calling initialize().
+ * /brief Opens the file handle for reading a particular fid. Moves to state
open.
*/
- bool rotate();
+ virtual void set_findex(const u_int16_t fc_index);
/**
- * \brief Returns the index of the currently active file within the rotating
- * file group.
+ * /brief Closes the read file handle and nulls the active fcntl pointer. Moves to
state closed.
*/
- inline u_int16_t index() const { return _fc_index; }
+ virtual void unset_findex();
/**
- * \brief Returns the currently active journal file controller within the
rotating
- * file group.
+ * /brief Check the state: true = open; false = closed.
*/
- inline fcntl* file_controller() const { return _curr_fc; }
+ virtual inline bool is_active() const { return _curr_fc != 0 && _fh >=
0; }
/**
- * \brief Returns the journal file controller for the given page index within the
rotating
- * file group.
+ * /brief Sets the validity flag which indicates that the read buffers contain
valid data for reading.
*/
- fcntl* file_controller(u_int16_t pg_index) const;
+ inline void set_invalid() { _valid = false; }
- // Convenience access methods to current file controller
+ /**
+ * /brief Resets the validity flag wich indicates that the read buffers are no
longer synchronized and cannot
+ * be read whithout resynchronization.
+ */
+ inline void set_valid() { _valid = true; }
- inline u_int16_t fid() const { return _curr_fc->fid(); }
- inline int fh() const { return _curr_fc->rd_fh(); }
- inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
- inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
- inline u_int32_t incr_enqcnt(u_int16_t fid) { return
_fc_arr[fid]->incr_enqcnt(); }
- inline u_int32_t add_enqcnt(u_int32_t a) { return _curr_fc->add_enqcnt(a); }
- inline u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a)
- { return _fc_arr[fid]->add_enqcnt(a); }
- inline u_int32_t decr_enqcnt(u_int16_t fid) { return
_fc_arr[fid]->decr_enqcnt(); }
- inline u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s)
- { return _fc_arr[fid]->subtr_enqcnt(s); }
+ /**
+ * /brief Checks the read buffer validity status: true = valid, can be read; false
= invalid, synchronization
+ * required.
+ */
+ inline bool is_valid() const { return _valid; }
+ /**
+ * \brief Rotate active file controller to next file in rotating file group.
+ * \exception jerrno::JERR__NINIT if called before calling initialize().
+ */
+ iores rotate();
+
+ inline int fh() const { return _fh; }
+
inline u_int32_t subm_cnt_dblks() const { return
_curr_fc->rd_subm_cnt_dblks(); }
inline std::size_t subm_offs() const { return _curr_fc->rd_subm_offs(); }
- inline u_int32_t add_subm_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_rd_subm_cnt_dblks(a); }
+ inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return
_curr_fc->add_rd_subm_cnt_dblks(a); }
inline u_int32_t cmpl_cnt_dblks() const { return
_curr_fc->rd_cmpl_cnt_dblks(); }
inline std::size_t cmpl_offs() const { return _curr_fc->rd_cmpl_offs(); }
- inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_rd_cmpl_cnt_dblks(a); }
+ inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return
_curr_fc->add_rd_cmpl_cnt_dblks(a); }
inline bool is_void() const { return _curr_fc->rd_void(); }
inline bool is_empty() const { return _curr_fc->rd_empty(); }
- inline u_int32_t remaining_dblks() { return _curr_fc->rd_remaining_dblks(); }
+ inline u_int32_t remaining_dblks() const { return
_curr_fc->rd_remaining_dblks(); }
inline bool is_full() const { return _curr_fc->is_rd_full(); }
inline bool is_compl() const { return _curr_fc->is_rd_compl(); }
- inline u_int32_t aio_outstanding_dblks() const
- { return _curr_fc->rd_aio_outstanding_dblks(); }
+ inline u_int32_t aio_outstanding_dblks() const { return
_curr_fc->rd_aio_outstanding_dblks(); }
inline bool file_rotate() const { return _curr_fc->rd_file_rotate(); }
- inline bool is_wr_aio_outstanding() const
- { return _curr_fc->wr_aio_outstanding_dblks() > 0; }
+ inline bool is_wr_aio_outstanding() const { return
_curr_fc->wr_aio_outstanding_dblks() > 0; }
// Debug aid
std::string status_str() const;
+
+ protected:
+ void open_fh(const std::string& fn);
+ void close_fh();
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -935,9 +935,9 @@
) const
{
// Check status of current file
- if (!_wrfc.is_reset())
+ if (!_wrfc.is_wr_reset())
{
- if (!_wrfc.reset())
+ if (!_wrfc.wr_reset())
return RHM_IORES_FULL;
}
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -40,7 +40,7 @@
{
wrfc::wrfc():
- rrfc(),
+ rfc(),
_fsize_sblks(0),
_fsize_dblks(0),
_enq_cap_offs_dblks(0),
@@ -55,7 +55,8 @@
_frot(true)
{}
-wrfc::~wrfc() {}
+wrfc::~wrfc()
+{}
void
wrfc::initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, fcntl** fc_arr,
rcvdat* rdp)
@@ -76,7 +77,8 @@
}
else
{
- rrfc::initialize(nfiles, fc_arr);
+ rfc::initialize(nfiles, fc_arr);
+ rfc::set_findex(0);
#ifdef DRHM_TESTVALS
_rid = u_int64_t(0xffeeddcc) << 32;
#else
@@ -93,8 +95,7 @@
_enq_cap_offs_dblks = _fsize_dblks;
}
-iores
-wrfc::rotate()
+iores wrfc::rotate()
{
if (!_nfiles)
throw jexception(jerrno::JERR__NINIT, "wrfc", "rotate");
@@ -108,13 +109,12 @@
_curr_fc = _fc_arr[_fc_index];
if (_curr_fc->aio_cnt())
return RHM_IORES_FILE_AIOWAIT;
- if (!reset()) //Checks if file is still in use (ie not fully dequeued yet)
+ if (!wr_reset()) //Checks if file is still in use (ie not fully dequeued yet)
return RHM_IORES_FULL;
return RHM_IORES_SUCCESS;
}
-u_int16_t
-wrfc::earliest_index() const
+u_int16_t wrfc::earliest_index() const
{
if (_frot)
return 0;
@@ -125,31 +125,6 @@
}
bool
-wrfc::reset()
-{
- _reset_ok = _curr_fc->reset(); // returns false if full (ie file still contains
enqueued recs)
- return _reset_ok;
-}
-
-
-/**
-* The following routine finds whether the next write will take the write pointer to
beyond the
-* enqueue limit threshold. The following illustrates how this is achieved.
-*
-* Current file index: 4 +---+----------+
-* X's mark still-enqueued records |msg| 1-thresh |
-* msg = current msg size + unwritten cache +---+----------+
-* thresh = JRNL_ENQ_THRESHOLD as a fraction ^ V
-* +-------+-------+-------+-------+--+----+-------+-+-----+-------+
-* file num ->| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
-* enq recs ->| X XX |XX XXX |XX XXXX|XXXXXXX|XX | | | X |
-* +-------+-------+-------+-------+--+----+-------+-+-----+-------+
-* ^ ^ ^
-* subm_dblks --+ | |
-* These files must be free of
enqueues
-* If not, return true.
-*/
-bool
wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
{
u_int32_t subm_dblks = subm_cnt_dblks(); // includes file hdr if > 0
@@ -176,6 +151,13 @@
return findex != _fc_index && in_use;
}
+bool wrfc::wr_reset()
+{
+ _reset_ok = _curr_fc->reset(); // returns false if full (ie file still contains
enqueued recs)
+ return _reset_ok;
+}
+
+// TODO: update this to reflect all status data
std::string
wrfc::status_str() const
{
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -28,8 +28,8 @@
* The GNU Lesser General Public License is available in the file COPYING.
*/
-#ifndef mrg_journal_rfc_hpp
-#define mrg_journal_rfc_hpp
+#ifndef mrg_journal_wrfc_hpp
+#define mrg_journal_wrfc_hpp
namespace mrg
{
@@ -52,7 +52,7 @@
* \class wrfc
* \brief Class to handle write manangement of a journal rotating file controller.
*/
- class wrfc : public rrfc
+ class wrfc : public rfc
{
private:
u_int32_t _fsize_sblks; ///< Size of journal files in sblks
@@ -62,6 +62,7 @@
bool _reset_ok; ///< Flag set when reset succeeds
bool _owi; ///< Overwrite indicator
bool _frot; ///< Flag is true for first rotation, false
otherwise
+
public:
wrfc();
virtual ~wrfc();
@@ -91,10 +92,33 @@
*/
u_int16_t earliest_index() const;
+ /**
+ * \brief Determines if a paroposed write would cause the enqueue threshold to be
exceeded.
+ *
+ * The following routine finds whether the next write will take the write pointer
to beyond the
+ * enqueue limit threshold. The following illustrates how this is achieved.
+ * <pre>
+ * Current file index: 4 +---+----------+
+ * X's mark still-enqueued records |msg| 1-thresh |
+ * msg = current msg size + unwritten cache +---+----------+
+ * thresh = JRNL_ENQ_THRESHOLD as a fraction ^ V
+ * +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+ * file num ->| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7
|
+ * enq recs ->| X XX |XX XXX |XX XXXX|XXXXXXX|XX | | | X
|
+ * +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+ * ^ ^ ^
+ * subm_dblks --+ | |
+ * These files must be free
of enqueues
+ * If not, return true.
+ * </pre>
+ * \param enq_dsize_dblks Proposed size of write in dblocks
+ */
+ bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
+
inline u_int64_t rid() const { return _rid; }
inline u_int64_t get_incr_rid() { return _rid++; }
- bool reset();
- inline bool is_reset() const { return _reset_ok; }
+ bool wr_reset();
+ inline bool is_wr_reset() const { return _reset_ok; }
inline bool owi() const { return _owi; }
inline bool frot() const { return _frot; }
@@ -104,13 +128,11 @@
inline u_int32_t subm_cnt_dblks() const { return
_curr_fc->wr_subm_cnt_dblks(); }
inline std::size_t subm_offs() const { return _curr_fc->wr_subm_offs(); }
- inline u_int32_t add_subm_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_wr_subm_cnt_dblks(a); }
+ inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return
_curr_fc->add_wr_subm_cnt_dblks(a); }
inline u_int32_t cmpl_cnt_dblks() const { return
_curr_fc->wr_cmpl_cnt_dblks(); }
inline std::size_t cmpl_offs() const { return _curr_fc->wr_cmpl_offs(); }
- inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_wr_cmpl_cnt_dblks(a); }
+ inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return
_curr_fc->add_wr_cmpl_cnt_dblks(a); }
inline u_int16_t aio_cnt() const { return _curr_fc->aio_cnt(); }
inline u_int16_t incr_aio_cnt() { return _curr_fc->incr_aio_cnt(); }
@@ -121,16 +143,14 @@
inline u_int32_t remaining_dblks() const { return
_curr_fc->wr_remaining_dblks(); }
inline bool is_full() const { return _curr_fc->is_wr_full(); };
inline bool is_compl() const { return _curr_fc->is_wr_compl(); };
- inline u_int32_t aio_outstanding_dblks() const
- { return _curr_fc->wr_aio_outstanding_dblks(); }
+ inline u_int32_t aio_outstanding_dblks() const { return
_curr_fc->wr_aio_outstanding_dblks(); }
inline bool file_rotate() const { return _curr_fc->wr_file_rotate(); }
- bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
-
+
// Debug aid
std::string status_str() const;
- }; // class wrfc
+ };
} // namespace journal
} // namespace mrg
-#endif // ifndef mrg_journal_rfc_hpp
+#endif // ifndef mrg_journal_wrfc_hpp
Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -98,6 +98,39 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(multi_page_enqueue_read_dequeue_block)
+{
+ string test_name = get_test_name(test_filename, "multi_page_read");
+ try
+ {
+ string msg;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ test_jrnl jc(test_name, test_dir, test_name);
+ jc.initialize(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ {
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ deq_msg(jc, m);
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(enqueue_read_dequeue_interleaved)
{
string test_name = get_test_name(test_filename,
"enqueue_read_dequeue_interleaved");
@@ -171,6 +204,48 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(multi_page_enqueue_recovered_read_dequeue)
+{
+ string test_name = get_test_name(test_filename,
"multi_page_enqueue_recovered_read_dequeue");
+ try
+ {
+ {
+ string msg;
+
+ test_jrnl jc(test_name, test_dir, test_name);
+ jc.initialize(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ }
+ {
+ string msg;
+ u_int64_t hrid;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ test_jrnl jc(test_name, test_dir, test_name);
+ jc.recover(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS, 0, hrid);
+ BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS*2000 - 1));
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ {
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ deq_msg(jc, m);
+ }
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(enqueue_recover_read_recovered_read_dequeue)
{
string test_name = get_test_name(test_filename,
"enqueue_recover_read_recovered_read_dequeue");