rhmessaging commits: r2562 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-10-01 16:51:12 -0400 (Wed, 01 Oct 2008)
New Revision: 2562
Modified:
mgmt/trunk/cumin/python/cumin/submitter.py
Log:
Fix stats javascript error
Modified: mgmt/trunk/cumin/python/cumin/submitter.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/submitter.py 2008-10-01 19:42:31 UTC (rev 2561)
+++ mgmt/trunk/cumin/python/cumin/submitter.py 2008-10-01 20:51:12 UTC (rev 2562)
@@ -70,7 +70,7 @@
self.__tabs.add_tab(details)
def render_script(self, session, sched):
- data = "model.xml?class=scheduler;id=%i" % sched.id
+ data = "model.xml?class=submitter;id=%i" % sched.id
return "wooly.setIntervalUpdate('%s', updateSubmitter, 3000)" % data
class SubmitterStatus(CuminStatus):
16 years, 2 months
rhmessaging commits: r2561 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-10-01 15:42:31 -0400 (Wed, 01 Oct 2008)
New Revision: 2561
Added:
store/trunk/cpp/lib/jrnl/rfc.cpp
store/trunk/cpp/lib/jrnl/rfc.hpp
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/fcntl.cpp
store/trunk/cpp/lib/jrnl/fcntl.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Removed read file handles from class fcntl, moved a single read file to class rrfc where it is opened and closed as needed. This reduces the number of consumed file handles to a half of the previous number; now only one file handle plus one when lazy-load is active is consumed per journal file instead of two per file. Additional code tidy-up also performed, including adding class rfc as a superclass to both rrfc and wrfc.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/Makefile.am 2008-10-01 19:42:31 UTC (rev 2561)
@@ -58,6 +58,7 @@
jrnl/lf_map.cpp \
jrnl/pmgr.cpp \
jrnl/rmgr.cpp \
+ jrnl/rfc.cpp \
jrnl/rrfc.cpp \
jrnl/slock.cpp \
jrnl/time_ns.cpp \
@@ -91,6 +92,7 @@
jrnl/rec_hdr.hpp \
jrnl/rec_tail.hpp \
jrnl/rmgr.hpp \
+ jrnl/rfc.hpp \
jrnl/rrfc.hpp \
jrnl/slock.hpp \
jrnl/time_ns.hpp \
Modified: store/trunk/cpp/lib/jrnl/fcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/fcntl.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/fcntl.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -51,7 +51,6 @@
_fid(fid),
_lid(lid),
_ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
- _rd_fh(-1),
_wr_fh(-1),
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
@@ -61,12 +60,12 @@
_aio_cnt(0)
{
initialize(fbasename, fid, lid, jfsize_sblks, ro);
- open_fh();
+ open_wr_fh();
}
fcntl::~fcntl()
{
- close_fh();
+ close_wr_fh();
}
bool
@@ -113,6 +112,32 @@
return true;
}
+int
+fcntl::open_wr_fh()
+{
+ if (_wr_fh < 0)
+ {
+ _wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
+ if (_wr_fh < 0)
+ {
+ std::ostringstream oss;
+ oss << "file=\"" << _fname << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "open_fh");
+ }
+ }
+ return _wr_fh;
+}
+
+void
+fcntl::close_wr_fh()
+{
+ if (_wr_fh >= 0)
+ {
+ ::close(_wr_fh);
+ _wr_fh = -1;
+ }
+}
+
u_int32_t
fcntl::add_enqcnt(u_int32_t a)
{
@@ -263,41 +288,6 @@
#endif
}
-void
-fcntl::open_fh()
-{
- _rd_fh = ::open(_fname.c_str(), O_RDONLY | O_DIRECT);
- if (_rd_fh < 0)
- {
- std::ostringstream oss;
- oss << "file=\"" << _fname << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_FCNTL_OPENRD, oss.str(), "fcntl", "open_fh");
- }
- _wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
- if (_wr_fh < 0)
- {
- std::ostringstream oss;
- oss << "file=\"" << _fname << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "open_fh");
- }
-}
-
-void
-fcntl::close_fh()
-{
- if (_rd_fh >= 0)
- {
- ::close(_rd_fh);
- _rd_fh = -1;
- }
- if (_wr_fh >= 0)
- {
- ::close(_wr_fh);
- _wr_fh = -1;
- }
-}
-
std::string
fcntl::filename(const std::string& fbasename, const u_int16_t fid)
{
Modified: store/trunk/cpp/lib/jrnl/fcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/fcntl.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/fcntl.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -61,7 +61,6 @@
u_int16_t _fid; ///< File ID (file number in order of creation)
u_int16_t _lid; ///< Logical ID (ordinal number in ring store)
const u_int32_t _ffull_dblks; ///< File size in dblks (incl. file header)
- int _rd_fh; ///< Read file handle
int _wr_fh; ///< Write file handle
u_int32_t _rec_enqcnt; ///< Count of enqueued records
u_int32_t _rd_subm_cnt_dblks; ///< Read file count (data blocks) for submitted AIO
@@ -80,11 +79,14 @@
virtual void rd_reset();
virtual bool wr_reset(const rcvdat* const ro = 0);
+ virtual int open_wr_fh();
+ virtual void close_wr_fh();
+ inline bool is_wr_fh_open() const { return _wr_fh >= 0; }
+
inline const std::string& fname() const { return _fname; }
inline u_int16_t fid() const { return _fid; }
inline u_int16_t lid() const { return _lid; }
inline void set_lid(const u_int16_t lid) { _lid = lid; }
- inline int rd_fh() const { return _rd_fh; }
inline int wr_fh() const { return _wr_fh; }
inline u_int32_t enqcnt() const { return _rec_enqcnt; }
inline u_int32_t incr_enqcnt() { return ++_rec_enqcnt; }
@@ -137,8 +139,6 @@
virtual void initialize(const std::string& fbasename, const u_int16_t fid, const u_int16_t lid,
const u_int32_t jfsize_sblks, const rcvdat* const ro);
- virtual void open_fh();
- virtual void close_fh();
static std::string filename(const std::string& fbasename, const u_int16_t fid);
void clean_file(const u_int32_t jfsize_sblks);
void create_jfile(const u_int32_t jfsize_sblks);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -140,6 +140,7 @@
_wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr);
_rrfc.initialize(_num_jfiles, _fc_arr);
+ _rrfc.set_findex(0);
_rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS);
@@ -206,7 +207,8 @@
}
_wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
- _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, _fc_arr);
+ _rrfc.set_findex(_rcvdat.ffid(_num_jfiles));
_rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS, (_rcvdat._lffull ? 0 : _rcvdat._eo));
@@ -223,7 +225,8 @@
for (u_int16_t i=0; i<_num_jfiles; i++)
_fc_arr[i]->reset(&_rcvdat);
_wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
- _rrfc.initialize(_num_jfiles, _fc_arr, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, _fc_arr);
+ _rrfc.set_findex(_rcvdat.ffid(_num_jfiles));
_rmgr.recover_complete();
_readonly_flag = false;
}
@@ -416,6 +419,7 @@
_stop_flag = true;
if (!_readonly_flag)
flush(block_till_aio_cmpl);
+ _rrfc.reset();
}
u_int16_t
@@ -428,8 +432,8 @@
if (++ffid >= _num_jfiles)
ffid = 0;
}
- if (ffid != _rrfc.fid())
- _rrfc.reset(ffid);
+ if (!_rrfc.is_active())
+ _rrfc.set_findex(ffid);
return ffid;
}
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -75,15 +75,15 @@
const u_int32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a;
// class fcntl
-const u_int32_t jerrno::JERR_FCNTL_OPENRD = 0x0400;
-const u_int32_t jerrno::JERR_FCNTL_OPENWR = 0x0401;
-const u_int32_t jerrno::JERR_FCNTL_WRITE = 0x0402;
-const u_int32_t jerrno::JERR_FCNTL_CLOSE = 0x0403;
-const u_int32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0404;
-const u_int32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0405;
-const u_int32_t jerrno::JERR_FCNTL_RDOFFSOVFL = 0x0406;
+const u_int32_t jerrno::JERR_FCNTL_OPENWR = 0x0400;
+const u_int32_t jerrno::JERR_FCNTL_WRITE = 0x0401;
+const u_int32_t jerrno::JERR_FCNTL_CLOSE = 0x0402;
+const u_int32_t jerrno::JERR_FCNTL_FILEOFFSOVFL = 0x0403;
+const u_int32_t jerrno::JERR_FCNTL_CMPLOFFSOVFL = 0x0404;
+const u_int32_t jerrno::JERR_FCNTL_RDOFFSOVFL = 0x0405;
-// class file_hdr
+// class rrfc
+const u_int32_t jerrno::JERR_RRFC_OPENRD = 0x0500;
// class jrec, enq_rec, deq_rec, txn_rec
const u_int32_t jerrno::JERR_JREC_BADRECHDR = 0x0600;
@@ -162,7 +162,6 @@
_err_map[JERR_JDIR_BADFTYPE] = "JERR_JDIR_BADFTYPE: Bad or unknown file type (stat mode).";
// class fcntl
- _err_map[JERR_FCNTL_OPENRD] = "JERR_FCNTL_OPENRD: Unable to open file for read.";
_err_map[JERR_FCNTL_OPENWR] = "JERR_FCNTL_OPENWR: Unable to open file for write.";
_err_map[JERR_FCNTL_WRITE] = "JERR_FCNTL_WRITE: Unable to write to file.";
_err_map[JERR_FCNTL_CLOSE] = "JERR_FCNTL_CLOSE: File close failed.";
@@ -174,7 +173,8 @@
"Attempted increase read offset past write offset.";
- // class file_hdr
+ // class rrfc
+ _err_map[JERR_RRFC_OPENRD] = "JERR_RRFC_OPENRD: Unable to open file for read.";
// class jrec, enq_rec, deq_rec, txn_rec
_err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header.";
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -93,7 +93,6 @@
static const u_int32_t JERR_JDIR_BADFTYPE; ///< Bad or unknown file type (stat mode)
// class fcntl
- static const u_int32_t JERR_FCNTL_OPENRD; ///< Unable to open file for read
static const u_int32_t JERR_FCNTL_OPENWR; ///< Unable to open file for write
static const u_int32_t JERR_FCNTL_WRITE; ///< Unable to write to file
static const u_int32_t JERR_FCNTL_CLOSE; ///< File close failed
@@ -101,7 +100,8 @@
static const u_int32_t JERR_FCNTL_CMPLOFFSOVFL; ///< Increased cmpl offs past subm offs
static const u_int32_t JERR_FCNTL_RDOFFSOVFL; ///< Increased read offs past write offs
- // class file_hdr
+ // class rrfc
+ static const u_int32_t JERR_RRFC_OPENRD; ///< Unable to open file for read
// class jrec, enq_rec, deq_rec, txn_rec
static const u_int32_t JERR_JREC_BADRECHDR; ///< Invalid data record header
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -89,6 +89,18 @@
_enq_cnt_list.resize(num_jfiles, 0);
}
+ // Find first fid with enqueued records
+ u_int16_t ffid(const u_int16_t num_jfiles)
+ {
+ u_int16_t index = _ffid;
+ while (index != _lfid && _enq_cnt_list[index] == 0)
+ {
+ if (++index >= num_jfiles)
+ index = 0;
+ }
+ return index;
+ }
+
std::string to_string(std::string& jid)
{
std::ostringstream oss;
Added: store/trunk/cpp/lib/jrnl/rfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/rfc.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -0,0 +1,84 @@
+/**
+* \file rfc.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class mrg::journal::rfc (rotating
+* file controller). See comments in file rfc.hpp for details.
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include "jrnl/rfc.hpp"
+
+#include <cassert>
+
+namespace mrg
+{
+namespace journal
+{
+
+rfc::rfc(): _nfiles(0), _fc_arr(0), _fc_index(0), _curr_fc(0)
+{}
+
+rfc::~rfc()
+{}
+
+void
+rfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
+{
+ _nfiles = nfiles;
+ _fc_arr = fc_arr;
+}
+
+void
+rfc::reset()
+{
+ unset_findex();
+ _nfiles = 0;
+ _fc_arr = 0;
+}
+
+void
+rfc::set_findex(const u_int16_t fc_index)
+{
+ _fc_index = fc_index;
+ _curr_fc = _fc_arr[_fc_index];
+ _curr_fc->rd_reset();
+}
+
+void
+rfc::unset_findex()
+{
+ _fc_index = 0;
+ _curr_fc = 0;
+}
+
+fcntl*
+rfc::file_controller(const u_int16_t pg_index) const
+{
+ assert(pg_index < _nfiles);
+ return _fc_arr[pg_index];
+}
+
+} // namespace journal
+} // namespace mrg
Added: store/trunk/cpp/lib/jrnl/rfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/rfc.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -0,0 +1,199 @@
+/**
+* \file rfc.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class mrg::journal::rfc (rotating
+* file controller). See class documentation for details.
+*
+* \author Kim van der Riet
+*
+* Copyright 2008 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef mrg_journal_rfc_hpp
+#define mrg_journal_rfc_hpp
+
+namespace mrg
+{
+namespace journal
+{
+class rfc;
+}
+}
+
+#include "jrnl/fcntl.hpp"
+#include "jrnl/enums.hpp"
+
+namespace mrg
+{
+namespace journal
+{
+
+ /**
+ * \class rfc
+ * \brief Rotating File Controller (rfc) - Class to handle the manangement of an array of file controllers (fcntl)
+ * objects for use in a circular disk buffer (journal). Each fcntl object corresponds to a file in the journal.
+ *
+ * The following states exist in this class:
+ *
+ * <pre>
+ * is_initialized() is_active()
+ * +===+ _nfiles == 0
+ * +---------->| | Uninitialized: _fc_arr == 0 F F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | |
+ * | | |
+ * | reset() initialize()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * reset() | | Inactive: _fc_arr != 0 T F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | |
+ * | | |
+ * | unset_findex() set_findex()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * +---------- | | Active: _fc_arr != 0 T T
+ * +===+ _curr_fc != 0
+ * </pre>
+ *
+ * The Uninitialized state is where the class starts after construction. Once the number of files is known and
+ * the array of file controllers allocated, then initialize() is called to set these, causing the state to move
+ * to Inactive.
+ *
+ * Once the index of the active file is known, then calling activate() will set the index and internal pointer
+ * to the currently active file controller. This moves the state to Active.
+ *
+ * Note that if the number of files change, then the object will have to be reset and reinitialized with a new array
+ * of fcntl objects of the appropriate size.
+ */
+ class rfc
+ {
+ protected:
+ u_int16_t _nfiles; ///< Number of data files
+ fcntl** _fc_arr; ///< Array of pointers to data file controllers
+ u_int16_t _fc_index; ///< Index of current file controller
+ fcntl* _curr_fc; ///< Pointer to current file controller
+
+ public:
+ rfc();
+ virtual ~rfc();
+
+ /**
+ * \brief Initialize the controller, moving from state Uninitialized to Inactive. The main function of
+ * initialize() is to set the number of files and the pointer to the fcntl array.
+ * \param nfiles Number of files in the rotating file group.
+ * \param fc_arr Pointer to an array of file controller, each of which correspond to one of
+ * the physical journal files.
+ */
+ virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
+
+ /**
+ * \brief Reset the controller to Uninitialized state, usually called when the journal is stopped. Once called,
+ * initialize() must be called to reuse an instance.
+ */
+ virtual void reset();
+
+ /**
+ * /brief Check initialization state: true = Not Uninitialized, ie Initialized or Active; false = Uninitialized.
+ */
+ virtual inline bool is_initialized() const { return _nfiles == 0 || _fc_arr == 0; }
+
+ /**
+ * /brief Sets the current file index and active fcntl object. Moves to state Active.
+ */
+ virtual void set_findex(const u_int16_t fc_index);
+
+ /**
+ * /brief Nulls the current file index and active fcntl pointer, moves to state Inactive.
+ */
+ virtual void unset_findex();
+
+ /**
+ * /brief Check the file handle state: true = Active; false = Not Active, ie Inactive or Uninitialized.
+ */
+ virtual inline bool is_active() const { return _curr_fc != 0; }
+
+ /**
+ * \brief Rotate active file controller to next file in rotating file group.
+ * \exception jerrno::JERR__NINIT if called before calling initialize().
+ */
+ virtual iores rotate() = 0;
+
+ /**
+ * \brief Returns the index of the currently active file within the rotating file group.
+ */
+ inline u_int16_t index() const { return _fc_index; }
+
+ /**
+ * \brief Returns the currently active journal file controller within the rotating file group.
+ */
+ inline fcntl* file_controller() const { return _curr_fc; }
+
+ /**
+ * \brief Returns the journal file controller for the given page index within the rotating file group.
+ */
+ fcntl* file_controller(const u_int16_t pg_index) const;
+
+ /**
+ * \brief Returns the currently active file id (fid)
+ */
+ inline u_int16_t fid() const { return _curr_fc->fid(); }
+
+ // Convenience access methods to current file controller
+ // Note: Call only when in open state
+
+ virtual inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
+ virtual inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
+ virtual inline u_int32_t incr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->incr_enqcnt(); }
+ virtual inline u_int32_t add_enqcnt(u_int32_t a) { return _curr_fc->add_enqcnt(a); }
+ virtual inline u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a) { return _fc_arr[fid]->add_enqcnt(a); }
+ virtual inline u_int32_t decr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->decr_enqcnt(); }
+ virtual inline u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s) { return _fc_arr[fid]->subtr_enqcnt(s); }
+
+ virtual inline u_int32_t subm_cnt_dblks() const = 0;
+ virtual inline std::size_t subm_offs() const = 0;
+ virtual inline u_int32_t add_subm_cnt_dblks(u_int32_t a) = 0;
+
+ virtual inline u_int32_t cmpl_cnt_dblks() const = 0;
+ virtual inline std::size_t cmpl_offs() const = 0;
+ virtual inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) = 0;
+
+ virtual inline bool is_void() const = 0;
+ virtual inline bool is_empty() const = 0;
+ virtual inline u_int32_t remaining_dblks() const = 0;
+ virtual inline bool is_full() const = 0;
+ virtual inline bool is_compl() const = 0;
+ virtual inline u_int32_t aio_outstanding_dblks() const = 0;
+ virtual inline bool file_rotate() const = 0;
+
+ // Debug aid
+ virtual std::string status_str() const = 0;
+ }; // class rfc
+
+} // namespace journal
+} // namespace mrg
+
+#endif // ifndef mrg_journal_rfc_hpp
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -46,7 +46,6 @@
pmgr(jc, emap, tmap),
_rrfc(rrfc),
_hdr(),
- _valid(false),
_fhdr_buffer(0),
_fhdr_aio_cb_ptr(0),
_fhdr_rd_outstanding(false)
@@ -303,8 +302,6 @@
std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr));
_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
-// std::size_t fro_dblks = _fhdr._fro / JRNL_DBLK_SIZE;
-// _pg_offset_dblks += fro_dblks - JRNL_SBLK_SIZE;
u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
_pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
@@ -314,7 +311,7 @@
_rrfc.add_cmpl_cnt_dblks(tot_pg_offs_dblks);
_fhdr_rd_outstanding = false;
- _valid = true;
+ _rrfc.set_valid();
}
}
@@ -331,12 +328,11 @@
void
rmgr::invalidate()
{
- if (_valid)
+ if (_rrfc.is_valid())
{
- _valid = false;
for (int i=0; i<_cache_num_pages; i++)
_page_cb_arr[i]._state = UNUSED;
- _rrfc.reset();
+ _rrfc.unset_findex();
_pg_offset_dblks = 0;
}
}
@@ -353,13 +349,8 @@
if (_aio_evt_rem)
get_events();
- if (!_valid)
- {
- if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
- return RHM_IORES_EMPTY;
- else
+ if (!_rrfc.is_valid())
return RHM_IORES_RCINVALID;
- }
// block reads until outstanding file header read completes as fro is needed to read
if (_fhdr_rd_outstanding)
@@ -564,18 +555,18 @@
return RHM_IORES_SUCCESS;
for (int16_t i=0; i<num_uninit; i++)
{
- if (_rrfc.is_void()) // Nothing to do; this file not yet written to
- break;
- if (!_valid)
+ if (!_rrfc.is_valid())
{
- u_int16_t fid = _jc->get_earliest_fid();
+ _jc->get_earliest_fid(); // calls _rrfc.set_findex()
// If this file has not yet been written to, return RHM_IORES_EMPTY
- if (_rrfc.file_controller(fid)->rd_void())
+ if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
return RHM_IORES_EMPTY;
- init_file_header_read(fid);
+ init_file_header_read();
break;
}
+ if (_rrfc.is_void()) // Nothing to do; this file not yet written to
+ break;
if (_rrfc.subm_offs() == 0)
{
@@ -657,9 +648,9 @@
}
void
-rmgr::init_file_header_read(u_int16_t fid)
+rmgr::init_file_header_read()
{
- int rfh = _rrfc.file_controller(fid)->rd_fh();
+ int rfh = _rrfc.fh();
aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
@@ -673,136 +664,13 @@
rmgr::get(const u_int64_t& rid, const std::size_t& dsize, const std::size_t& dsize_avail,
const void** const data, bool auto_discard)
{
- iores res = pre_read_check(0);
- if (res != RHM_IORES_SUCCESS)
- return res;
-
- _hdr.reset();
- // Read header, determine next record type
- while (true)
- {
- if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
- {
- aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_EMPTY;
- }
- if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
- {
- aio_cycle();
- return RHM_IORES_PAGE_AIOWAIT;
- }
- void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
- (_pg_offset_dblks * JRNL_DBLK_SIZE));
- std::memcpy(&_hdr, rptr, sizeof(hdr));
- switch (_hdr._magic)
- {
- case RHM_JDAT_ENQ_MAGIC:
- {
- std::size_t xid_size = *((std::size_t*)((char*)rptr + sizeof(hdr)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- + sizeof(u_int32_t) // filler0
-#endif
- ));
- std::size_t data_size = *((std::size_t*)((char*)rptr + sizeof(hdr) + sizeof(u_int64_t)
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- + sizeof(u_int32_t) // filler1
-#endif
- ));
- // TODO: Check if transaction is still in transaction map. If so, block read
- // (unless in recovery, in whcih case return info normally
-// std::string xid = ?? (decode xid here)
-// if (xid_size && !readonly && tx_map.exists(xid))
-// return RHM_IORES_TXPENDING;
- rid = _hdr._rid;
- dsize = data_size;
-
- // Analyze how much of message is available
- void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * _cache_num_pages;
- u_int16_t data_start_pg_index = _pg_index;
- u_int16_t data_start_pg_index = _pg_index;
- for (u_int16_t i=0; i<_cache_num_pages; i++)
- {
- pi = (i + _pg_index) % _cache_num_pages;
- if (data_ptr >= _page_ptr_arr[pi] &&
- data_ptr < (char*)_page_ptr_arr[pi] + _cache_pgsize_sblks * _sblksize)
- data_end_pg_index = pi; // found start page index
-
- }
- u_int16_t data_end_pg_index;
- u_int16_t last_pg_avail_index;
-
- void* data_ptr = (char*)rptr + sizeof(enq_hdr) + xid_size;
- void* page_end_ptr = (char*)_page_base_ptr + _cache_pgsize_sblks * _sblksize * _cache_num_pages;
- if (data_ptr >= page_end_ptr) // folded, go back to first page...
- data_ptr = (char*)_page_base_ptr + data_ptr - page_end_ptr;
- void* data_end_ptr = (char*)data_ptr + data_size;
- if (data_end_ptr >= page_end_ptr) // folded, go back to first page...
- data_end_ptr = (char*)_page_base_ptr + data_end_ptr - page_end_ptr;
- dsize_avail = ??;
- if(data_ptr folded)
- else
- *data = data_ptr;
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- consume_deq();
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- consume_filler();
- break;
- default:
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
- throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "get");
- } // switch(_hdr._magic)
- } // while
+ return RHM_IORES_SUCCESS;
}
const iores
rmgr::discard(data_tok* dtokp)
{
- iores res = pre_read_check(dtokp);
- if (res != RHM_IORES_SUCCESS)
- return res;
-
- _hdr.reset();
- // Read header, determine next record type
- while (true)
- {
- if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
- {
- aio_cycle(); // check if any AIOs have returned
- return RHM_IORES_EMPTY;
- }
- if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
- {
- aio_cycle();
- return RHM_IORES_PAGE_AIOWAIT;
- }
- void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
- (_pg_offset_dblks * JRNL_DBLK_SIZE));
- std::memcpy(&_hdr, rptr, sizeof(hdr));
- switch (_hdr._magic)
- {
- case RHM_JDAT_ENQ_MAGIC:
- {
- }
- break;
- case RHM_JDAT_DEQ_MAGIC:
- consume_deq();
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- consume_filler();
- break;
- default:
- std::ostringstream oss;
- oss << std::hex << std::setfill('0');
- oss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
- throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, oss.str(), "rmgr", "discard");
- } // switch
- } // while
+ return RHM_IORES_SUCCESS;
}
*/
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -65,7 +65,6 @@
rec_hdr _hdr; ///< Header used to determind record type
rd_aio_cb _cb; ///< Callback function pointer for AIO events
- bool _valid; ///< Flag is true when read pages contain vailid data
void* _fhdr_buffer; ///< Buffer used for fhdr reads
aio_cb* _fhdr_aio_cb_ptr; ///< iocb pointer for fhdr reads
file_hdr _fhdr; ///< file header instance for reading file headers
@@ -81,8 +80,7 @@
bool ignore_pending_txns);
u_int32_t get_events(page_state state = AIO_COMPLETE);
void recover_complete();
- inline bool is_valid() const {return _valid; }
- inline iores synchronize() { if (!_valid) return aio_cycle(); return RHM_IORES_SUCCESS; }
+ inline iores synchronize() { if (_rrfc.is_valid()) return RHM_IORES_SUCCESS; return aio_cycle(); }
void invalidate();
/* TODO (if required)
@@ -106,7 +104,7 @@
u_int32_t dblks_rem() const;
void set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize);
- void init_file_header_read(u_int16_t fid);
+ void init_file_header_read();
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -31,7 +31,8 @@
#include "jrnl/rrfc.hpp"
-#include <cassert>
+#include <cerrno>
+#include <fcntl.h>
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
@@ -40,44 +41,44 @@
namespace journal
{
-rrfc::rrfc():
- _nfiles(0),
- _fc_arr(0),
- _fc_index(0),
- _curr_fc(0)
+rrfc::rrfc(): rfc(), _fh(-1), _valid(false)
{}
-rrfc::~rrfc() {}
+rrfc::~rrfc()
+{
+ close_fh();
+}
void
-rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp)
+rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
{
- _nfiles = nfiles;
- _fc_arr = fc_arr;
- if (rdp)
- {
- // Find first file with enqueued records
- u_int16_t index = rdp->_ffid;
- while (index != rdp->_lfid && rdp->_enq_cnt_list[index] == 0)
- {
- if (++index >= _nfiles)
- index = 0;
- }
- reset(index);
- }
- else
- reset(0);
+ rfc::initialize(nfiles, fc_arr);
+ _valid = false;
}
void
-rrfc::reset(u_int16_t fc_index)
+rrfc::reset()
{
- _fc_index = fc_index;
- _curr_fc = _fc_arr[_fc_index];
- _curr_fc->rd_reset();
+ unset_findex();
+ rfc::reset();
}
-bool
+void
+rrfc::set_findex(const u_int16_t fc_index)
+{
+ rfc::set_findex(fc_index);
+ open_fh(_curr_fc->fname());
+}
+
+void
+rrfc::unset_findex()
+{
+ set_invalid();
+ close_fh();
+ rfc::unset_findex();
+}
+
+iores
rrfc::rotate()
{
if (!_nfiles)
@@ -88,16 +89,11 @@
fcntl* next_fc = _fc_arr[next_fc_index];
_fc_index = next_fc_index;
_curr_fc = next_fc;
- return true;
+ open_fh(_curr_fc->fname());
+ return RHM_IORES_SUCCESS;
}
-fcntl*
-rrfc::file_controller(u_int16_t pg_index) const
-{
- assert(pg_index < _nfiles);
- return _fc_arr[pg_index];
-}
-
+// TODO: update this to reflect all status data
std::string
rrfc::status_str() const
{
@@ -106,5 +102,30 @@
return oss.str();
}
+// === protected functions ===
+
+void
+rrfc::open_fh(const std::string& fn)
+{
+ close_fh();
+ _fh = ::open(fn.c_str(), O_RDONLY | O_DIRECT);
+ if (_fh < 0)
+ {
+ std::ostringstream oss;
+ oss << "file=\"" << fn << "\"" << FORMAT_SYSERR(errno);
+ throw jexception(jerrno::JERR_RRFC_OPENRD, oss.str(), "rrfc", "open_fh");
+ }
+}
+
+void
+rrfc::close_fh()
+{
+ if (_fh >= 0)
+ {
+ ::close(_fh);
+ _fh = -1;
+ }
+}
+
} // namespace journal
} // namespace mrg
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
-* Copyright 2007 Red Hat, Inc.
+* Copyright 2007, 2008 Red Hat, Inc.
*
* This file is part of Red Hat Messaging.
*
@@ -41,8 +41,8 @@
}
}
-#include <cstddef>
#include "jrnl/fcntl.hpp"
+#include "jrnl/rfc.hpp"
namespace mrg
{
@@ -51,93 +51,130 @@
/**
* \class rrfc
- * \brief Class to handle read manangement of a journal rotating file controller.
+ * \brief Read Rotating File Controller (rrfc) - Subclassed from pure virtual class rfc. Used to control the read
+ * pipeline in a rotating file buffer or journal. See class rfc for further details.
+ *
+ * The states that exist in this class are identical to class rfc from which it inherits, but in addition, the value
+ * of the read file handle _fh is also considered. The calls to set_findex also opens the file handle _fh to the
+ * active file for reading. Similarly, unset_findex() closes this file handle.
+ *
+ * <pre>
+ * is_initialized() is_active()
+ * +===+ _nfiles == 0
+ * +---------->| | Uninitialized: _fc_arr == 0 F F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | | _fh == -1
+ * | | |
+ * | reset() initialize()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * reset() | | Inactive: _fc_arr != 0 T F
+ * | +-->+===+ --+ _curr_fc == 0
+ * | | | _fh == -1
+ * | | |
+ * | unset_findex() set_findex()
+ * | | |
+ * | | |
+ * | +-- +===+<--+ _nfiles > 0
+ * +---------- | | Active: _fc_arr != 0 T T
+ * +===+ _curr_fc != 0
+ * _fh >= 0
+ * </pre>
+ *
+ * In adition to the states above, class rrfc contains a validity flag. This is operated indepenedently of the state
+ * machine. This flag (_valid) indicates when the read buffers are valid for reading. This is not strictly a state,
+ * but simply a flag used to keep track of the status, and is set/unset with calls to set_valid() and set_invalid()
+ * respectively.
*/
- class rrfc
+ class rrfc : public rfc
{
protected:
- u_int16_t _nfiles; ///< Number of data files
- fcntl** _fc_arr; ///< Array of pointers to data file controllers
- u_int16_t _fc_index; ///< Index of current file controller
- fcntl* _curr_fc; ///< Pointer to current file controller
+ int _fh; ///< Read file handle
+ bool _valid; ///< Flag is true when read pages contain vailid data
public:
rrfc();
virtual ~rrfc();
/**
- * \brief Initialize the controller.
+ * \brief Initialize the controller, moving from state Uninitialized to Initialized. The main function of
+ * initialize() is to set the number of files and the pointer to the fcntl array.
* \param nfiles Number of files in the rotating file group.
* \param fc_arr Pointer to an array of file controller, each of which correspond to one of
* the physical journal files.
- * \param rdp Struct carrying restore information. Optional for non-restore use, defaults to
- * 0 (NULL).
*/
- virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr, rcvdat* rdp = 0);
+ virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
- void reset(u_int16_t fc_index = 0);
+ /**
+ * \brief Reset the controller to Uninitialized state, usually called when the journal is stopped. Once called,
+ * initialize() must be called to reuse an instance.
+ */
+ void reset();
/**
- * \brief Rotate active file controller to next file in rotating file group.
- * \exception jerrno::JERR__NINIT if called before calling initialize().
+ * /brief Opens the file handle for reading a particular fid. Moves to state open.
*/
- bool rotate();
+ virtual void set_findex(const u_int16_t fc_index);
/**
- * \brief Returns the index of the currently active file within the rotating
- * file group.
+ * /brief Closes the read file handle and nulls the active fcntl pointer. Moves to state closed.
*/
- inline u_int16_t index() const { return _fc_index; }
+ virtual void unset_findex();
/**
- * \brief Returns the currently active journal file controller within the rotating
- * file group.
+ * /brief Check the state: true = open; false = closed.
*/
- inline fcntl* file_controller() const { return _curr_fc; }
+ virtual inline bool is_active() const { return _curr_fc != 0 && _fh >= 0; }
/**
- * \brief Returns the journal file controller for the given page index within the rotating
- * file group.
+ * /brief Sets the validity flag which indicates that the read buffers contain valid data for reading.
*/
- fcntl* file_controller(u_int16_t pg_index) const;
+ inline void set_invalid() { _valid = false; }
- // Convenience access methods to current file controller
+ /**
+ * /brief Resets the validity flag wich indicates that the read buffers are no longer synchronized and cannot
+ * be read whithout resynchronization.
+ */
+ inline void set_valid() { _valid = true; }
- inline u_int16_t fid() const { return _curr_fc->fid(); }
- inline int fh() const { return _curr_fc->rd_fh(); }
- inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
- inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
- inline u_int32_t incr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->incr_enqcnt(); }
- inline u_int32_t add_enqcnt(u_int32_t a) { return _curr_fc->add_enqcnt(a); }
- inline u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a)
- { return _fc_arr[fid]->add_enqcnt(a); }
- inline u_int32_t decr_enqcnt(u_int16_t fid) { return _fc_arr[fid]->decr_enqcnt(); }
- inline u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s)
- { return _fc_arr[fid]->subtr_enqcnt(s); }
+ /**
+ * /brief Checks the read buffer validity status: true = valid, can be read; false = invalid, synchronization
+ * required.
+ */
+ inline bool is_valid() const { return _valid; }
+ /**
+ * \brief Rotate active file controller to next file in rotating file group.
+ * \exception jerrno::JERR__NINIT if called before calling initialize().
+ */
+ iores rotate();
+
+ inline int fh() const { return _fh; }
+
inline u_int32_t subm_cnt_dblks() const { return _curr_fc->rd_subm_cnt_dblks(); }
inline std::size_t subm_offs() const { return _curr_fc->rd_subm_offs(); }
- inline u_int32_t add_subm_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_rd_subm_cnt_dblks(a); }
+ inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return _curr_fc->add_rd_subm_cnt_dblks(a); }
inline u_int32_t cmpl_cnt_dblks() const { return _curr_fc->rd_cmpl_cnt_dblks(); }
inline std::size_t cmpl_offs() const { return _curr_fc->rd_cmpl_offs(); }
- inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_rd_cmpl_cnt_dblks(a); }
+ inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return _curr_fc->add_rd_cmpl_cnt_dblks(a); }
inline bool is_void() const { return _curr_fc->rd_void(); }
inline bool is_empty() const { return _curr_fc->rd_empty(); }
- inline u_int32_t remaining_dblks() { return _curr_fc->rd_remaining_dblks(); }
+ inline u_int32_t remaining_dblks() const { return _curr_fc->rd_remaining_dblks(); }
inline bool is_full() const { return _curr_fc->is_rd_full(); }
inline bool is_compl() const { return _curr_fc->is_rd_compl(); }
- inline u_int32_t aio_outstanding_dblks() const
- { return _curr_fc->rd_aio_outstanding_dblks(); }
+ inline u_int32_t aio_outstanding_dblks() const { return _curr_fc->rd_aio_outstanding_dblks(); }
inline bool file_rotate() const { return _curr_fc->rd_file_rotate(); }
- inline bool is_wr_aio_outstanding() const
- { return _curr_fc->wr_aio_outstanding_dblks() > 0; }
+ inline bool is_wr_aio_outstanding() const { return _curr_fc->wr_aio_outstanding_dblks() > 0; }
// Debug aid
std::string status_str() const;
+
+ protected:
+ void open_fh(const std::string& fn);
+ void close_fh();
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -935,9 +935,9 @@
) const
{
// Check status of current file
- if (!_wrfc.is_reset())
+ if (!_wrfc.is_wr_reset())
{
- if (!_wrfc.reset())
+ if (!_wrfc.wr_reset())
return RHM_IORES_FULL;
}
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -40,7 +40,7 @@
{
wrfc::wrfc():
- rrfc(),
+ rfc(),
_fsize_sblks(0),
_fsize_dblks(0),
_enq_cap_offs_dblks(0),
@@ -55,7 +55,8 @@
_frot(true)
{}
-wrfc::~wrfc() {}
+wrfc::~wrfc()
+{}
void
wrfc::initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, fcntl** fc_arr, rcvdat* rdp)
@@ -76,7 +77,8 @@
}
else
{
- rrfc::initialize(nfiles, fc_arr);
+ rfc::initialize(nfiles, fc_arr);
+ rfc::set_findex(0);
#ifdef DRHM_TESTVALS
_rid = u_int64_t(0xffeeddcc) << 32;
#else
@@ -93,8 +95,7 @@
_enq_cap_offs_dblks = _fsize_dblks;
}
-iores
-wrfc::rotate()
+iores wrfc::rotate()
{
if (!_nfiles)
throw jexception(jerrno::JERR__NINIT, "wrfc", "rotate");
@@ -108,13 +109,12 @@
_curr_fc = _fc_arr[_fc_index];
if (_curr_fc->aio_cnt())
return RHM_IORES_FILE_AIOWAIT;
- if (!reset()) //Checks if file is still in use (ie not fully dequeued yet)
+ if (!wr_reset()) //Checks if file is still in use (ie not fully dequeued yet)
return RHM_IORES_FULL;
return RHM_IORES_SUCCESS;
}
-u_int16_t
-wrfc::earliest_index() const
+u_int16_t wrfc::earliest_index() const
{
if (_frot)
return 0;
@@ -125,31 +125,6 @@
}
bool
-wrfc::reset()
-{
- _reset_ok = _curr_fc->reset(); // returns false if full (ie file still contains enqueued recs)
- return _reset_ok;
-}
-
-
-/**
-* The following routine finds whether the next write will take the write pointer to beyond the
-* enqueue limit threshold. The following illustrates how this is achieved.
-*
-* Current file index: 4 +---+----------+
-* X's mark still-enqueued records |msg| 1-thresh |
-* msg = current msg size + unwritten cache +---+----------+
-* thresh = JRNL_ENQ_THRESHOLD as a fraction ^ V
-* +-------+-------+-------+-------+--+----+-------+-+-----+-------+
-* file num ->| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
-* enq recs ->| X XX |XX XXX |XX XXXX|XXXXXXX|XX | | | X |
-* +-------+-------+-------+-------+--+----+-------+-+-----+-------+
-* ^ ^ ^
-* subm_dblks --+ | |
-* These files must be free of enqueues
-* If not, return true.
-*/
-bool
wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
{
u_int32_t subm_dblks = subm_cnt_dblks(); // includes file hdr if > 0
@@ -176,6 +151,13 @@
return findex != _fc_index && in_use;
}
+bool wrfc::wr_reset()
+{
+ _reset_ok = _curr_fc->reset(); // returns false if full (ie file still contains enqueued recs)
+ return _reset_ok;
+}
+
+// TODO: update this to reflect all status data
std::string
wrfc::status_str() const
{
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -28,8 +28,8 @@
* The GNU Lesser General Public License is available in the file COPYING.
*/
-#ifndef mrg_journal_rfc_hpp
-#define mrg_journal_rfc_hpp
+#ifndef mrg_journal_wrfc_hpp
+#define mrg_journal_wrfc_hpp
namespace mrg
{
@@ -52,7 +52,7 @@
* \class wrfc
* \brief Class to handle write manangement of a journal rotating file controller.
*/
- class wrfc : public rrfc
+ class wrfc : public rfc
{
private:
u_int32_t _fsize_sblks; ///< Size of journal files in sblks
@@ -62,6 +62,7 @@
bool _reset_ok; ///< Flag set when reset succeeds
bool _owi; ///< Overwrite indicator
bool _frot; ///< Flag is true for first rotation, false otherwise
+
public:
wrfc();
virtual ~wrfc();
@@ -91,10 +92,33 @@
*/
u_int16_t earliest_index() const;
+ /**
+ * \brief Determines if a paroposed write would cause the enqueue threshold to be exceeded.
+ *
+ * The following routine finds whether the next write will take the write pointer to beyond the
+ * enqueue limit threshold. The following illustrates how this is achieved.
+ * <pre>
+ * Current file index: 4 +---+----------+
+ * X's mark still-enqueued records |msg| 1-thresh |
+ * msg = current msg size + unwritten cache +---+----------+
+ * thresh = JRNL_ENQ_THRESHOLD as a fraction ^ V
+ * +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+ * file num ->| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+ * enq recs ->| X XX |XX XXX |XX XXXX|XXXXXXX|XX | | | X |
+ * +-------+-------+-------+-------+--+----+-------+-+-----+-------+
+ * ^ ^ ^
+ * subm_dblks --+ | |
+ * These files must be free of enqueues
+ * If not, return true.
+ * </pre>
+ * \param enq_dsize_dblks Proposed size of write in dblocks
+ */
+ bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
+
inline u_int64_t rid() const { return _rid; }
inline u_int64_t get_incr_rid() { return _rid++; }
- bool reset();
- inline bool is_reset() const { return _reset_ok; }
+ bool wr_reset();
+ inline bool is_wr_reset() const { return _reset_ok; }
inline bool owi() const { return _owi; }
inline bool frot() const { return _frot; }
@@ -104,13 +128,11 @@
inline u_int32_t subm_cnt_dblks() const { return _curr_fc->wr_subm_cnt_dblks(); }
inline std::size_t subm_offs() const { return _curr_fc->wr_subm_offs(); }
- inline u_int32_t add_subm_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_wr_subm_cnt_dblks(a); }
+ inline u_int32_t add_subm_cnt_dblks(u_int32_t a) { return _curr_fc->add_wr_subm_cnt_dblks(a); }
inline u_int32_t cmpl_cnt_dblks() const { return _curr_fc->wr_cmpl_cnt_dblks(); }
inline std::size_t cmpl_offs() const { return _curr_fc->wr_cmpl_offs(); }
- inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a)
- { return _curr_fc->add_wr_cmpl_cnt_dblks(a); }
+ inline u_int32_t add_cmpl_cnt_dblks(u_int32_t a) { return _curr_fc->add_wr_cmpl_cnt_dblks(a); }
inline u_int16_t aio_cnt() const { return _curr_fc->aio_cnt(); }
inline u_int16_t incr_aio_cnt() { return _curr_fc->incr_aio_cnt(); }
@@ -121,16 +143,14 @@
inline u_int32_t remaining_dblks() const { return _curr_fc->wr_remaining_dblks(); }
inline bool is_full() const { return _curr_fc->is_wr_full(); };
inline bool is_compl() const { return _curr_fc->is_wr_compl(); };
- inline u_int32_t aio_outstanding_dblks() const
- { return _curr_fc->wr_aio_outstanding_dblks(); }
+ inline u_int32_t aio_outstanding_dblks() const { return _curr_fc->wr_aio_outstanding_dblks(); }
inline bool file_rotate() const { return _curr_fc->wr_file_rotate(); }
- bool enq_threshold(const u_int32_t enq_dsize_dblks) const;
-
+
// Debug aid
std::string status_str() const;
- }; // class wrfc
+ };
} // namespace journal
} // namespace mrg
-#endif // ifndef mrg_journal_rfc_hpp
+#endif // ifndef mrg_journal_wrfc_hpp
Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-10-01 16:07:22 UTC (rev 2560)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-10-01 19:42:31 UTC (rev 2561)
@@ -98,6 +98,39 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(multi_page_enqueue_read_dequeue_block)
+{
+ string test_name = get_test_name(test_filename, "multi_page_read");
+ try
+ {
+ string msg;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ test_jrnl jc(test_name, test_dir, test_name);
+ jc.initialize(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ jc.flush();
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ {
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ deq_msg(jc, m);
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(enqueue_read_dequeue_interleaved)
{
string test_name = get_test_name(test_filename, "enqueue_read_dequeue_interleaved");
@@ -171,6 +204,48 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(multi_page_enqueue_recovered_read_dequeue)
+{
+ string test_name = get_test_name(test_filename, "multi_page_enqueue_recovered_read_dequeue");
+ try
+ {
+ {
+ string msg;
+
+ test_jrnl jc(test_name, test_dir, test_name);
+ jc.initialize(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
+ }
+ {
+ string msg;
+ u_int64_t hrid;
+ string rmsg;
+ string xid;
+ bool transientFlag;
+ bool externalFlag;
+
+ test_jrnl jc(test_name, test_dir, test_name);
+ jc.recover(2*NUM_TEST_JFILES, 10*TEST_JFSIZE_SBLKS, 0, hrid);
+ BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS*2000 - 1));
+ jc.recover_complete();
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ {
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ for (int m=0; m<NUM_MSGS*2000; m++)
+ deq_msg(jc, m);
+ }
+ }
+ catch(const exception& e) { BOOST_FAIL(e.what()); }
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(enqueue_recover_read_recovered_read_dequeue)
{
string test_name = get_test_name(test_filename, "enqueue_recover_read_recovered_read_dequeue");
16 years, 2 months
rhmessaging commits: r2560 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-10-01 12:07:22 -0400 (Wed, 01 Oct 2008)
New Revision: 2560
Modified:
mgmt/trunk/cumin/python/cumin/__init__.py
Log:
Fix default spec file name
Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py 2008-10-01 15:13:42 UTC (rev 2559)
+++ mgmt/trunk/cumin/python/cumin/__init__.py 2008-10-01 16:07:22 UTC (rev 2560)
@@ -172,7 +172,7 @@
hdef = os.path.normpath("/var/lib/cumin")
self.home = os.environ.get("CUMIN_HOME", hdef)
- sdef = os.path.normpath("/usr/share/amqp/amqp-0.10-qpid-errata.xml")
+ sdef = os.path.normpath("/usr/share/amqp/amqp.0-10-qpid-errata.xml")
self.spec = os.environ.get("AMQP_SPEC", sdef)
if not os.path.isdir(self.home):
16 years, 2 months
rhmessaging commits: r2559 - in mgmt/trunk/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-10-01 11:13:42 -0400 (Wed, 01 Oct 2008)
New Revision: 2559
Modified:
mgmt/trunk/cumin/bin/cumin
mgmt/trunk/cumin/bin/cumin-bench
mgmt/trunk/cumin/bin/cumin-test
mgmt/trunk/cumin/python/cumin/__init__.py
mgmt/trunk/cumin/python/cumin/tools.py
mgmt/trunk/cumin/python/cumin/util.py
Log:
Move commands over to the common parsley code.
Remove util.Config in favor of the Config classes in parsley.
Modified: mgmt/trunk/cumin/bin/cumin
===================================================================
--- mgmt/trunk/cumin/bin/cumin 2008-10-01 15:11:54 UTC (rev 2558)
+++ mgmt/trunk/cumin/bin/cumin 2008-10-01 15:13:42 UTC (rev 2559)
@@ -1,79 +1,12 @@
#!/usr/bin/python
-import sys, os, socket
+import sys, os
-from logging import getLogger
+from cumin.tools import CuminServerTool
-from cumin import *
-from cumin.util import *
-
-log = getLogger("cumin")
-
-def do_main(config):
- app = Cumin(config.home, config.data, config.spec)
-
- if config.debug:
- app.enable_debug()
-
- try:
- app.check()
- except Exception, e:
- log.exception(e)
- sys.exit(1)
-
- app.init()
-
- server = CuminServer(app, config.addr, config.port)
-
- if config.ssl:
- cpath = os.path.join(config.home, "etc", "cumin.crt")
- kpath = os.path.join(config.home, "etc", "cumin.key")
-
- if os.path.isfile(cpath):
- server.set_ssl_cert_path(cpath)
- else:
- log.error("SSL certificate file '%s' not found" % cpath)
- sys.exit(1)
-
- if os.path.isfile(kpath):
- server.set_ssl_key_path(kpath)
- else:
- log.error("SSL key file '%s' not found" % kpath)
- sys.exit(1)
-
- try:
- app.start()
-
- try:
- server.start()
- finally:
- server.stop()
- finally:
- app.stop()
-
def main():
- config = CuminConfig()
+ CuminServerTool("cumin").main()
- summ = ("ADDR", "Run web server at address ADDR")
- config.add_param("addr", str, "localhost", summ)
-
- summ = ("PORT", "Run web server on port PORT")
- config.add_param("port", int, 45672, summ)
-
- summ = "Serve web pages using SSL"
- config.add_param("ssl", bool, False, summ)
-
- config.init()
-
- if "-h" in sys.argv or "--help" in sys.argv:
- config.print_usage("cumin")
- sys.exit(0)
-
- if config.debug:
- config.prt()
-
- do_main(config)
-
if __name__ == "__main__":
if "--profile" in sys.argv:
from profile import Profile
Modified: mgmt/trunk/cumin/bin/cumin-bench
===================================================================
--- mgmt/trunk/cumin/bin/cumin-bench 2008-10-01 15:11:54 UTC (rev 2558)
+++ mgmt/trunk/cumin/bin/cumin-bench 2008-10-01 15:13:42 UTC (rev 2559)
@@ -1,69 +1,14 @@
#!/usr/bin/python
import sys, os
-from time import time
-from wooly.devel import BenchmarkHarness
-from cumin import *
-from cumin.test import *
-from cumin.util import *
+from cumin.tools import CuminBenchTool
-def do_main(home, data, spec, hits, check_xml):
- app = Cumin(home, data, spec)
+def do_main():
+ CuminBenchTool("cumin-bench").main()
- app.enable_debug()
-
- try:
- app.check()
- except Exception, e:
- if hasattr(e, "message"):
- print e.message
-
- sys.exit(1)
-
- app.init()
-
- harness = BenchmarkHarness(app, check_xml)
-
- try:
- try:
- app.start()
-
- harness.run(hits)
- finally:
- app.stop()
- except KeyboardInterrupt:
- pass
-
def main():
- config = CuminConfig()
-
- summ = ("COUNT", "Stop after COUNT page hits")
- config.add_param("hits", int, 1000, summ)
-
- summ = "Enable profiling"
- config.add_param("profile", bool, False, summ)
-
- summ = "Check that page output is well-formed XML"
- config.add_param("check-xml", bool, False, summ)
-
- config.init()
-
- if "-h" in sys.argv or "--help" in sys.argv:
- config.print_usage("cumin-bench")
- sys.exit(0)
-
- if config.debug:
- config.prt()
-
- home = config.home
- data = config.data
- spec = config.spec
- hits = config.hits
- profile = config.profile
- check_xml = config.check_xml
-
- if config.profile:
+ if "--profile" in sys.argv:
from profile import Profile
from pstats import Stats
@@ -83,9 +28,7 @@
print "Using bias %f" % prof.bias
try:
- statement = "do_main('%s', '%s', '%s', %i, %r)" % \
- (home, data, spec, hits, check_xml)
-
+ statement = "do_main()"
prof.run(statement)
raise KeyboardInterrupt()
@@ -106,7 +49,10 @@
stats.strip_dirs()
else:
- do_main(home, data, spec, hits, check_xml)
+ do_main()
if __name__ == "__main__":
- main()
+ try:
+ main()
+ except KeyboardInterrupt:
+ pass
Modified: mgmt/trunk/cumin/bin/cumin-test
===================================================================
--- mgmt/trunk/cumin/bin/cumin-test 2008-10-01 15:11:54 UTC (rev 2558)
+++ mgmt/trunk/cumin/bin/cumin-test 2008-10-01 15:13:42 UTC (rev 2559)
@@ -1,68 +1,11 @@
#!/usr/bin/python
import sys, os
-from time import time, sleep
-from cumin import *
-from cumin.test import *
-from cumin.util import *
+from cumin.tools import CuminTestTool
-def do_main(config):
- app = Cumin(config.home, config.data, config.spec)
-
- if config.debug:
- app.enable_debug()
-
- try:
- app.check()
- except Exception, e:
- if hasattr(e, "message"):
- print e.message
-
- sys.exit(1)
-
- usr_sbin = "/usr/sbin"
- if usr_sbin not in sys.path:
- sys.path.append(usr_sbin)
-
- app.init()
-
- if config.broker:
- host, port = parse_broker_addr(config.broker)
- else:
- # XXX change this to 49152..65535 when the underlying datatype
- # supports it
- host, port = "localhost", randint(16384, 32767)
-
- broker = TestBroker("qpidd", port)
- broker.start()
-
- env = TestEnvironment(app, host, port, config.spec)
- env.init();
-
- app.start()
- try:
- session = env.run_test(MainTest(env))
- session.report(sys.stdout)
- finally:
- app.stop()
-
-def main():
- config = CuminConfig()
-
- summ = ("ADDR", "Use existing broker at ADDR")
- config.add_param("broker", str, None, summ)
-
- config.init()
-
- if "-h" in sys.argv or "--help" in sys.argv:
- config.print_usage("cumin-test")
- sys.exit(0)
-
- if config.debug:
- config.prt()
-
- do_main(config)
-
if __name__ == "__main__":
- main()
+ try:
+ CuminTestTool("cumin-test").main()
+ except KeyboardInterrupt:
+ pass
Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py 2008-10-01 15:11:54 UTC (rev 2558)
+++ mgmt/trunk/cumin/python/cumin/__init__.py 2008-10-01 15:13:42 UTC (rev 2559)
@@ -1,6 +1,7 @@
import sys, os, logging
from random import randint
+from parsley.config import Config, ConfigParameter
from wooly import Application, Session, Page
from wooly.pages import CssPage, JavascriptPage, ResourcePage
from wooly.server import WebServer
@@ -17,7 +18,6 @@
from stat import StatChartPage
from action import ActionPage
from user import LoginPage
-from util import Config
from wooly import Session
@@ -172,43 +172,36 @@
hdef = os.path.normpath("/var/lib/cumin")
self.home = os.environ.get("CUMIN_HOME", hdef)
+ sdef = os.path.normpath("/usr/share/amqp/amqp-0.10-qpid-errata.xml")
+ self.spec = os.environ.get("AMQP_SPEC", sdef)
+
if not os.path.isdir(self.home):
raise Exception("Home path '%s' is not a directory")
- sdef = os.path.normpath("/usr/share/amqp/amqp.0-10.xml")
- spec = os.environ.get("AMQP_SPEC", sdef)
+ param = ConfigParameter(self, "data", str)
+ param.default = "postgresql://cumin@localhost/cumin"
- summ = "Print this message"
- self.add_param("help", bool, False, summ)
+ param = ConfigParameter(self, "log-file", str)
+ param.default = os.path.join(self.home, "log", "cumin.log")
- summ = ("URI", "Connect to database at URI")
- self.add_param("data", str, "postgresql://cumin@localhost/cumin", summ)
+ param = ConfigParameter(self, "log-level", str)
+ param.default = "warning"
- summ = ("PATH", "Use AMQP spec file at PATH")
- self.add_param("spec", str, spec, summ)
+ param = ConfigParameter(self, "debug", bool)
+ param.default = False
- lpath = os.path.join(self.home, "log", "cumin.log")
- summ = ("PATH", "Log to file at PATH")
- self.add_param("log-file", str, lpath, summ)
-
- summ = ("LEVEL", "Log messages at or above LEVEL " +
- "('debug', 'info', 'warning', 'error')")
- self.add_param("log-level", str, "warning", summ)
-
- summ = "Enable debug mode; print debug logging to console"
- self.add_param("debug", bool, False, summ)
-
def init(self):
- self.load_defaults()
- self.load_args(sys.argv)
+ super(CuminConfig, self).init()
handler = logging.StreamHandler()
log.addHandler(handler)
self.load_file(os.path.join(self.home, "etc", "cumin.conf"))
self.load_file(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
- self.load_args(sys.argv)
log.removeHandler(handler)
+ self.init_logging()
+
+ def init_logging(self):
mlog = logging.getLogger("mint")
level = self.get_log_level()
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2008-10-01 15:11:54 UTC (rev 2558)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2008-10-01 15:13:42 UTC (rev 2559)
@@ -1,12 +1,15 @@
import sys, os
+from parsley.config import *
from parsley.command import *
+from wooly.devel import *
from mint import *
from getpass import getpass
from psycopg2 import IntegrityError
-from cumin import CuminConfig
from util import *
+from cumin import Cumin, CuminServer, CuminConfig
+from test import *
def prompt_password():
password = None
@@ -55,57 +58,11 @@
def init(self):
super(BaseCuminTool, self).init()
- args = sys.argv[1:]
-
- try:
- opts, remaining = self.parse_options(args)
- except CommandException, e:
- print "Error: %s" % e.message
- e.command.print_help()
- sys.exit(1)
-
self.config.init()
- self.config.load_string_params(opts)
def run(self):
- if self.config.debug:
- self.config.prt()
+ pass
- args = sys.argv[1:]
-
- try:
- opts, remaining = self.parse_options(args)
- except CommandException, e:
- print "Error: %s" % e.message
- e.command.print_help()
- sys.exit(1)
-
- try:
- scommand = remaining[0]
- except IndexError:
- self.print_help()
- sys.exit(1)
-
- try:
- command = self.commands_by_name[scommand]
- except KeyError:
- print "Error: Command '%s' is unrecognized" % scommand
- self.print_help()
- sys.exit(1)
-
- try:
- opts, args = command.parse(remaining)
-
- if "help" in opts:
- command.print_help()
- return
-
- command.run(opts, args)
- except CommandException, e:
- print "Error: %s" % e.message
- e.command.print_help()
- sys.exit(1)
-
def main(self):
self.check()
self.init()
@@ -163,6 +120,45 @@
self.database.check()
self.database.init()
+ def run(self):
+ if self.config.debug:
+ self.config.prt()
+
+ args = sys.argv[1:]
+
+ try:
+ opts, remaining = self.parse_options(args)
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+
+ try:
+ scommand = remaining[0]
+ except IndexError:
+ self.print_help()
+ sys.exit(1)
+
+ try:
+ command = self.commands_by_name[scommand]
+ except KeyError:
+ print "Error: Command '%s' is unrecognized" % scommand
+ self.print_help()
+ sys.exit(1)
+
+ try:
+ opts, args = command.parse(remaining)
+
+ if "help" in opts:
+ command.print_help()
+ return
+
+ command.run(opts, args)
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+
class CreateSchema(Command):
def run(self, opts, args):
main = os.path.join(self.parent.config.home, "sql", "schema.sql")
@@ -332,3 +328,208 @@
subject.syncUpdate()
print "Password of user '%s' is changed" % subject.name
+
+class CuminServerTool(BaseCuminTool):
+ def __init__(self, name):
+ super(CuminServerTool, self).__init__(name)
+
+ self.description = "Cumin web server"
+
+ param = ConfigParameter(self.config, "addr", str)
+ param.default = "localhost"
+
+ param = ConfigParameter(self.config, "port", int)
+ param.default = 45672
+
+ param = ConfigParameter(self.config, "ssl", bool)
+ param.default = False
+
+ opt = CommandOption(self, "addr")
+ opt.argument = "ADDR"
+ opt.description = "Run web server at address ADDR"
+
+ opt = CommandOption(self, "port")
+ opt.argument = "PORT"
+ opt.description = "Run web server on port PORT"
+
+ opt = CommandOption(self, "ssl")
+ opt.description = "Serve web pages using SSL"
+
+ def run(self):
+ try:
+ opts, args = self.parse(sys.argv)
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+
+ if "help" in opts:
+ self.print_help()
+ sys.exit(0)
+
+ self.config.load_dict(opts)
+
+ self.config.ssl = "ssl" in opts
+
+ if self.config.debug:
+ self.config.prt()
+
+ app = Cumin(self.config.home, self.config.data, self.config.spec)
+
+ try:
+ app.check()
+ except Exception, e:
+ log.exception(e)
+ sys.exit(1)
+
+ app.init()
+
+ server = CuminServer(app, self.config.addr, self.config.port)
+
+ if self.config.ssl:
+ cpath = os.path.join(self.config.home, "etc", "cumin.crt")
+ kpath = os.path.join(self.config.home, "etc", "cumin.key")
+
+ if os.path.isfile(cpath):
+ server.set_ssl_cert_path(cpath)
+ else:
+ log.error("SSL certificate file '%s' not found" % cpath)
+ sys.exit(1)
+
+ if os.path.isfile(kpath):
+ server.set_ssl_key_path(kpath)
+ else:
+ log.error("SSL key file '%s' not found" % kpath)
+ sys.exit(1)
+
+ try:
+ app.start()
+
+ try:
+ server.start()
+ finally:
+ server.stop()
+ finally:
+ app.stop()
+
+class CuminTestTool(BaseCuminTool):
+ def __init__(self, name):
+ super(CuminTestTool, self).__init__(name)
+
+ self.description = "Cumin test tool"
+
+ opt = CommandOption(self, "broker")
+ opt.argument = "ADDR"
+ opt.description = "Use existing broker at ADDR"
+
+ def run(self):
+ try:
+ opts, args = self.parse(sys.argv)
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+
+ if "help" in opts:
+ self.print_help()
+ sys.exit(0)
+
+ self.config.load_dict(opts)
+
+ if self.config.debug:
+ self.config.prt()
+
+ app = Cumin(self.config.home, self.config.data, self.config.spec)
+
+ if self.config.debug:
+ app.enable_debug()
+
+ try:
+ app.check()
+ except Exception, e:
+ if hasattr(e, "message"):
+ print e.message
+
+ sys.exit(1)
+
+ usr_sbin = "/usr/sbin"
+ if usr_sbin not in sys.path:
+ sys.path.append(usr_sbin)
+
+ app.init()
+
+ if "broker" in opts:
+ host, port = parse_broker_addr(opts["broker"])
+ else:
+ host, port = "localhost", randint(49152, 65535)
+
+ broker = TestBroker("qpidd", port)
+ broker.start()
+
+ env = TestEnvironment(app, host, port, self.config.spec)
+ env.init();
+
+ app.start()
+
+ try:
+ session = env.run_test(MainTest(env))
+ session.report(sys.stdout)
+ finally:
+ app.stop()
+
+class CuminBenchTool(BaseCuminTool):
+ def __init__(self, name):
+ super(CuminBenchTool, self).__init__(name)
+
+ self.description = "Cumin benchmarking tool"
+
+ opt = CommandOption(self, "hits")
+ opt.argument = "COUNT"
+ opt.description = "Stop after COUNT page hits"
+
+ opt = CommandOption(self, "profile")
+ opt.description = "Enable profiling"
+
+ opt = CommandOption(self, "check-xml")
+ opt.description = "Check that page output is well-formed xml"
+
+ def run(self):
+ try:
+ opts, args = self.parse(sys.argv)
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+
+ if "help" in opts:
+ self.print_help()
+ sys.exit(0)
+
+ self.config.load_dict(opts)
+
+ if self.config.debug:
+ self.config.prt()
+
+ app = Cumin(self.config.home, self.config.data, self.config.spec)
+
+ if self.config.debug:
+ app.enable_debug()
+
+ try:
+ app.check()
+ except Exception, e:
+ if hasattr(e, "message"):
+ print e.message
+
+ sys.exit(1)
+
+ app.init()
+
+ harness = BenchmarkHarness(app, "check-xml" in opts)
+
+ app.start()
+
+ try:
+ harness.run(int(opts.get("hits", "1000")))
+ finally:
+ app.stop()
Modified: mgmt/trunk/cumin/python/cumin/util.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/util.py 2008-10-01 15:11:54 UTC (rev 2558)
+++ mgmt/trunk/cumin/python/cumin/util.py 2008-10-01 15:13:42 UTC (rev 2559)
@@ -1,5 +1,4 @@
import sys
-from ConfigParser import SafeConfigParser
from datetime import datetime, timedelta
from logging import getLogger
from time import mktime, time, sleep
@@ -67,127 +66,6 @@
ret = None
return ret
-class Config(object):
- __log = getLogger("cumin.config")
-
- def __init__(self):
- self.__params = list()
- self.__params_by_name = dict()
-
- def add_param(self, name, type, default=None, summary=None):
- param = self.Parameter(name, type, default, summary)
- self.__params.append(param)
- self.__params_by_name[name] = param
-
- def unmarshal(self, param, string):
- if param.type is int:
- value = int(string)
- elif param.type is bool:
- value = string is None or \
- string.lower() in ("y", "yes", "t", "true", "1")
- elif param.type is str:
- value = string
- else:
- raise Exception("Invalid type %s" % type)
-
- return value
-
- def load_defaults(self):
- for param in self.__params:
- name = param.name.replace("-", "_")
-
- if hasattr(self, name):
- raise Exception("Parameter '%s' already present" % name)
-
- setattr(self, name, param.default)
-
- def load_file(self, file):
- conf = SafeConfigParser()
- found = conf.read(file)
-
- if found:
- self.__log.info("Read config file '%s'" % file)
- else:
- self.__log.info("Config file '%s' not found" % file)
-
- params = dict()
-
- if (conf.has_section("main")):
- for key, value in conf.items("main"):
- params[key] = value
-
- self.load_string_params(params)
-
- def load_args(self, argv):
- params = dict()
- key = None
-
- for arg in argv:
- if arg.startswith("--"):
- key = arg[2:]
- params[key] = None
- elif key:
- params[key] = arg
- key = None
-
- self.load_string_params(params)
-
- def load_string_params(self, params):
- for name, value in params.items():
- param = self.__params_by_name.get(name)
-
- if param:
- setattr(self, param.name.replace("-", "_"),
- self.unmarshal(param, value))
- else:
- self.__log.info("Ignoring unrecognized parameter '%s'" % name)
-
- def print_usage(self, cmd):
- print "Usage: %s OPTIONS..." % cmd
- print "Options:"
-
- for param in self.__params:
- if param.summary:
- if type(param.summary) is str:
- opt = "--%s" % param.name
- text = param.summary
- else:
- opt = "--%s %s" % (param.name, param.summary[0])
- text = param.summary[1]
-
- main = " %-18s %s" % (opt, text)
- extra = ""
-
- if param.default not in (None, False):
- extra = "(default %s)" % param.default
-
- if len(main) + len(extra) > 79:
- print main
- print " %18s %s" % ("", extra)
- else:
- print main, extra
-
- def prt(self):
- print "Configuration:"
-
- for param in self.__params:
- value = getattr(self, param.name.replace("-", "_"))
-
- if value == param.default:
- flag = " [default]"
- else:
- flag = ""
-
- print " %s = %s%s" % (param.name, value, flag)
-
- class Parameter(object):
- def __init__(self, name, type, default=None, summary=None):
- self.name = name
- self.type = type
- self.default = default
- self.summary = summary
-
-
def wait(predicate, timeout=30):
start = time()
16 years, 2 months
rhmessaging commits: r2558 - mgmt/trunk/parsley/python/parsley.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-10-01 11:11:54 -0400 (Wed, 01 Oct 2008)
New Revision: 2558
Added:
mgmt/trunk/parsley/python/parsley/config.py
Log:
Add a common config api to parsley
Added: mgmt/trunk/parsley/python/parsley/config.py
===================================================================
--- mgmt/trunk/parsley/python/parsley/config.py (rev 0)
+++ mgmt/trunk/parsley/python/parsley/config.py 2008-10-01 15:11:54 UTC (rev 2558)
@@ -0,0 +1,76 @@
+import logging
+
+from ConfigParser import SafeConfigParser
+
+log = logging.getLogger("parsley.config")
+
+class Config(object):
+ def __init__(self):
+ self._params = list()
+ self._params_by_name = dict()
+
+ def init(self):
+ for param in self._params:
+ param.init()
+
+ def load_file(self, file):
+ conf = SafeConfigParser()
+ found = conf.read(file)
+
+ if found:
+ log.info("Read config file '%s'" % file)
+ else:
+ log.info("Config file '%s' not found" % file)
+
+ params = dict()
+
+ if (conf.has_section("main")):
+ for key, value in conf.items("main"):
+ params[key] = value
+
+ self.load_dict(params)
+
+ def load_dict(self, params):
+ for sname, svalue in params.items():
+ param = self._params_by_name.get(sname)
+
+ if param:
+ name = param.name.replace("-", "_")
+ value = param.unmarshal(svalue)
+
+ setattr(self, name, value)
+
+ else:
+ log.info("Ignoring unrecognized parameter '%s'" % sname)
+
+ def prt(self):
+ print "Configuration:"
+
+ for param in self._params:
+ value = getattr(self, param.name.replace("-", "_"))
+
+ if value == param.default:
+ flag = " [default]"
+ else:
+ flag = ""
+
+ print " %s = %s%s" % (param.name, value, flag)
+
+class ConfigParameter(object):
+ def __init__(self, config, name, type):
+ self.config = config
+ self.name = name
+ self.type = type
+ self.default = None
+
+ self.config._params.append(self)
+ self.config._params_by_name[self.name] = self
+
+ def init(self):
+ if hasattr(self.config, self.name):
+ raise Exception("Parameter '%s' already present" % self.name)
+
+ setattr(self.config, self.name.replace("-", "_"), self.default)
+
+ def unmarshal(self, string):
+ return self.type(string)
16 years, 2 months
rhmessaging commits: r2557 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-10-01 08:53:01 -0400 (Wed, 01 Oct 2008)
New Revision: 2557
Added:
mgmt/trunk/cumin/python/cumin/negotiator.py
mgmt/trunk/cumin/python/cumin/negotiator.strings
Modified:
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/parameters.py
mgmt/trunk/cumin/python/cumin/pool.py
Log:
Adding Negotiator tab under Pool
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-09-30 18:24:39 UTC (rev 2556)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-10-01 12:53:01 UTC (rev 2557)
@@ -51,6 +51,7 @@
CuminPool(self)
CuminCollector(self)
+ CuminNegotiator(self)
# Systems
@@ -2067,6 +2068,51 @@
return frame
+class CuminNegotiator(RemoteClass):
+ def __init__(self, model):
+ super(CuminNegotiator, self).__init__(model, "negotiator",
+ Negotiator, NegotiatorStats)
+
+ prop = CuminProperty(self, "Name")
+ prop.title = "Name"
+
+ prop = CuminProperty(self, "Machine")
+ prop.title = "Machine"
+
+ prop = CuminProperty(self, "MyAddress")
+ prop.title = "Address"
+
+ stat = CuminStat(self, "MonitorSelfAge")
+ stat.title = "Age"
+
+ stat = CuminStat(self, "MonitorSelfCPUUsage")
+ stat.title = "CPU Usage"
+
+ stat = CuminStat(self, "MonitorSelfImageSize")
+ stat.title = "Image Size"
+
+ stat = CuminStat(self, "MonitorSelfRegisteredSocketCount")
+ stat.title = "Registered Socket Count"
+
+ stat = CuminStat(self, "MonitorSelfResidentSetSize")
+ stat.title = "Resident Set Size"
+
+ stat = CuminStat(self, "MonitorSelfTime")
+ stat.title = "Time"
+
+ def get_title(self, session):
+ return "Negotiator"
+
+ def get_object_name(self, neg):
+ return neg.Name
+
+ def show_object(self, session, neg):
+ frame = self.cumin_model.show_main(session)
+ frame = frame.children_by_name["pool"]
+ frame = frame.show_negotiator(session, neg).show_view(session)
+
+ return frame
+
class ModelPage(Page):
def __init__(self, app, name):
super(ModelPage, self).__init__(app, name)
Added: mgmt/trunk/cumin/python/cumin/negotiator.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/negotiator.py (rev 0)
+++ mgmt/trunk/cumin/python/cumin/negotiator.py 2008-10-01 12:53:01 UTC (rev 2557)
@@ -0,0 +1,72 @@
+import logging
+
+from wooly import *
+from wooly.widgets import *
+from wooly.forms import *
+from wooly.resources import *
+from wooly.tables import *
+
+from stat import *
+from widgets import *
+from parameters import *
+from formats import *
+from util import *
+from job import *
+
+strings = StringCatalog(__file__)
+log = logging.getLogger("cumin.negotiator")
+
+class NegotiatorSet(CuminTable):
+ def __init__(self, app, name):
+ super(NegotiatorSet, self).__init__(app, name)
+
+ col = self.NameColumn(app, "name")
+ self.add_column(col)
+
+ self.set_default_column(col)
+
+ def render_title(self, session):
+ return "Negotiators %s" % fmt_count(Negotiator.select().count())
+
+ class NameColumn(SqlTableColumn):
+ def render_title(self, session, data):
+ return "Name"
+
+ def render_content(self, session, data):
+ sub = Identifiable(data["id"])
+ branch = session.branch()
+ self.frame.show_negotiator(branch, sub).show_view(branch)
+ return fmt_olink(branch, sub, name=data["name"])
+
+class NegotiatorFrame(CuminFrame, JobSetFrame):
+ def __init__(self, app, name):
+ super(NegotiatorFrame, self).__init__(app, name)
+
+ self.object = NegotiatorParameter(app, "id")
+ self.add_parameter(self.object)
+
+ view = NegotiatorView(app, "view")
+ self.add_mode(view)
+ self.set_view_mode(view)
+
+
+class NegotiatorView(CuminView):
+ def __init__(self, app, name):
+ super(NegotiatorView, self).__init__(app, name)
+
+ status = NegotiatorStatus(app, "status")
+ self.add_child(status)
+
+ self.__tabs = TabbedModeSet(app, "tabs")
+ self.add_child(self.__tabs)
+
+ details = CuminDetails(app, "details")
+ self.__tabs.add_tab(details)
+
+ def render_script(self, session, coll):
+ data = "model.xml?class=negotiator;id=%i" % coll.id
+ return "wooly.setIntervalUpdate('%s', updateNegotiator, 3000)" % data
+
+class NegotiatorStatus(CuminStatus):
+ pass
+
Added: mgmt/trunk/cumin/python/cumin/negotiator.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/negotiator.strings (rev 0)
+++ mgmt/trunk/cumin/python/cumin/negotiator.strings 2008-10-01 12:53:01 UTC (rev 2557)
@@ -0,0 +1,25 @@
+[NegotiatorSet.sql]
+select
+ n.id,
+ n.name
+from negotiator as n
+
+[NegotiatorSet.count_sql]
+select count(1) from negotiator
+
+[NegotiatorView.javascript]
+function updateNegotiator(data) {
+ var model = data.objectify();
+ var colls = model.negotiator;
+ var col;
+
+ for (var key in colls) {
+ col = colls[key];
+ break;
+ }
+
+ cumin.runModelListeners(model);
+ cumin.runObjectListeners(col);
+
+ //throw new Error();
+}
Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py 2008-09-30 18:24:39 UTC (rev 2556)
+++ mgmt/trunk/cumin/python/cumin/parameters.py 2008-10-01 12:53:01 UTC (rev 2557)
@@ -157,6 +157,13 @@
def do_marshal(self, coll):
return str(coll.id)
+class NegotiatorParameter(Parameter):
+ def do_unmarshal(self, string):
+ return Negotiator.get(int(string))
+
+ def do_marshal(self, neg):
+ return str(neg.id)
+
class SystemParameter(Parameter):
def do_unmarshal(self, string):
return System.get(int(string))
Modified: mgmt/trunk/cumin/python/cumin/pool.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/pool.py 2008-09-30 18:24:39 UTC (rev 2556)
+++ mgmt/trunk/cumin/python/cumin/pool.py 2008-10-01 12:53:01 UTC (rev 2557)
@@ -15,6 +15,7 @@
from scheduler import SchedulerSet, SchedulerFrame
from submitter import SubmitterSet, SubmitterFrame
from collector import CollectorSet, CollectorFrame
+from negotiator import NegotiatorSet, NegotiatorFrame
from limits import LimitsSet, LimitsFrame
strings = StringCatalog(__file__)
@@ -91,6 +92,9 @@
self.__coll = CollectorFrame(app, "coll")
self.add_mode(self.__coll)
+ self.__neg = NegotiatorFrame(app, "neg")
+ self.add_mode(self.__neg)
+
self.__jobs_hold = JobSetHold(app, "jobshold")
self.add_mode(self.__jobs_hold)
@@ -145,6 +149,11 @@
self.page.set_current_frame(session, self.__coll)
return self.show_mode(session, self.__coll)
+ def show_negotiator(self, session, sub):
+ self.__neg.set_object(session, sub)
+ self.page.set_current_frame(session, self.__neg)
+ return self.show_mode(session, self.__neg)
+
def show_jobs_hold(self, session):
self.page.set_current_frame(session, self.__jobs_hold)
return self.show_mode(session, self.__jobs_hold)
@@ -191,6 +200,9 @@
colls = self.CollectorsTab(app, "colls")
self.__tabs.add_tab(colls)
+ neg = self.NegotiatorsTab(app, "neg")
+ self.__tabs.add_tab(neg)
+
limits = self.LimitsTab(app, "limits")
self.__tabs.add_tab(limits)
@@ -203,6 +215,9 @@
class CollectorsTab(CollectorSet):
pass
+ class NegotiatorsTab(NegotiatorSet):
+ pass
+
class LimitsTab(LimitsSet):
pass
16 years, 2 months