rhmessaging commits: r2232 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-29 16:13:17 -0400 (Tue, 29 Jul 2008)
New Revision: 2232
Added:
store/trunk/cpp/lib/jrnl/aio.cpp
store/trunk/cpp/lib/jrnl/aio.hpp
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
Log:
Placed all libaio functions into static class aio.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/Makefile.am 2008-07-29 20:13:17 UTC (rev 2232)
@@ -39,6 +39,7 @@
StoreException.h \
StringDbt.h \
TxnCtxt.h \
+ jrnl/aio.cpp \
jrnl/arr_cnt.cpp \
jrnl/cvar.cpp \
jrnl/data_tok.cpp \
@@ -62,6 +63,7 @@
jrnl/txn_rec.cpp \
jrnl/wmgr.cpp \
jrnl/wrfc.cpp \
+ jrnl/aio.hpp \
jrnl/aio_cb.hpp \
jrnl/arr_cnt.hpp \
jrnl/cvar.hpp \
Added: store/trunk/cpp/lib/jrnl/aio.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/aio.cpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -0,0 +1,11 @@
+#include "aio.hpp"
+
+
+namespace rhm
+{
+namespace journal
+{
+
+
+} // namespace journal
+} // namespace rhm
Added: store/trunk/cpp/lib/jrnl/aio.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/aio.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/aio.hpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -0,0 +1,86 @@
+
+#ifndef rhm_journal_aio_hpp
+#define rhm_journal_aio_hpp
+
+#include <libaio.h>
+#include <cstring>
+#include <sys/types.h>
+#include <string.h>
+
+namespace rhm
+{
+namespace journal
+{
+
+typedef iocb aio_cb;
+typedef io_event aio_event;
+
+class aio
+{
+public:
+ static inline int queue_init(int maxevents, io_context_t* ctxp)
+ {
+ return ::io_queue_init(maxevents, ctxp);
+ }
+
+ static inline int queue_release(io_context_t ctx)
+ {
+ return ::io_queue_release(ctx);
+ }
+
+ static inline int submit(io_context_t ctx, long nr, aio_cb* aios[])
+ {
+ return ::io_submit(ctx, nr, aios);
+ }
+
+ static inline int getevents(io_context_t ctx, long min_nr, long nr, aio_event* events,
+ timespec* timeout)
+ {
+ return ::io_getevents(ctx, min_nr, nr, events, timeout);
+ }
+
+ static inline void prep_pread(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+ int64_t offset)
+ {
+ ::io_prep_pread(aiocbp, fd, buf, count, offset);
+ }
+
+ // Special version of libaio's io_prep_pread() which preserves the value of the data
+ // pointer. This allows iocbs to be initialized with a pointer that can be re-used.
+ static inline void prep_pread_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+ int64_t offset)
+ {
+ std::memset((void*)((char*)aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+ aiocbp->aio_fildes = fd;
+ aiocbp->aio_lio_opcode = IO_CMD_PREAD;
+ aiocbp->aio_reqprio = 0;
+ aiocbp->u.c.buf = buf;
+ aiocbp->u.c.nbytes = count;
+ aiocbp->u.c.offset = offset;
+ }
+
+ static inline void prep_pwrite(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+ int64_t offset)
+ {
+ ::io_prep_pwrite(aiocbp, fd, buf, count, offset);
+ }
+
+ // Special version of libaio's io_prep_pwrite() which preserves the value of the data
+ // pointer. This allows iocbs to be initialized with a pointer that can be re-used.
+ static inline void prep_pwrite_2(aio_cb* aiocbp, int fd, void* buf, std::size_t count,
+ int64_t offset)
+ {
+ std::memset((void*)((char*)aiocbp + sizeof(void*)), 0, sizeof(aio_cb) - sizeof(void*));
+ aiocbp->aio_fildes = fd;
+ aiocbp->aio_lio_opcode = IO_CMD_PWRITE;
+ aiocbp->aio_reqprio = 0;
+ aiocbp->u.c.buf = buf;
+ aiocbp->u.c.nbytes = count;
+ aiocbp->u.c.offset = offset;
+ }
+};
+
+} // namespace journal
+} // namespace rhm
+
+#endif
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -83,8 +83,8 @@
_page_base_ptr(0),
_page_ptr_arr(0),
_page_cb_arr(0),
- _iocb_arr(0),
- _ioevt_arr(0),
+ _aio_cb_arr(0),
+ _aio_event_arr(0),
_ioctx(0),
_pg_index(0),
_pg_cntr(0),
@@ -132,8 +132,8 @@
std::memset(_page_cb_arr, 0, _cache_num_pages * sizeof(page_cb));
// 5. Allocate IO control block (iocb) array
- _iocb_arr = (iocb*)std::malloc(_cache_num_pages * sizeof(iocb));
- MALLOC_CHK(_iocb_arr, "_iocb_arr", "pmgr", "initialize");
+ _aio_cb_arr = (aio_cb*)std::malloc(_cache_num_pages * sizeof(aio_cb));
+ MALLOC_CHK(_aio_cb_arr, "_aio_cb_arr", "pmgr", "initialize");
// 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
for (u_int16_t i=0; i<_cache_num_pages; i++)
@@ -144,16 +144,16 @@
_page_cb_arr[i]._pbuff = _page_ptr_arr[i];
_page_cb_arr[i]._pdtokl = new std::deque<data_tok*>;
_page_cb_arr[i]._pdtokl->clear();
- _iocb_arr[i].data = (void*)&_page_cb_arr[i];
+ _aio_cb_arr[i].data = (void*)&_page_cb_arr[i];
}
// 7. Allocate io_event array, max one event per cache page plus one for each file
const u_int16_t max_aio_evts = _cache_num_pages + _jc->num_jfiles();
- _ioevt_arr = (io_event*)std::malloc(max_aio_evts * sizeof(io_event));
- MALLOC_CHK(_ioevt_arr, "_ioevt_arr", "pmgr", "initialize");
+ _aio_event_arr = (aio_event*)std::malloc(max_aio_evts * sizeof(aio_event));
+ MALLOC_CHK(_aio_event_arr, "_aio_event_arr", "pmgr", "initialize");
// 8. Initialize AIO context
- if (int ret = ::io_queue_init(max_aio_evts, &_ioctx))
+ if (int ret = aio::queue_init(max_aio_evts, &_ioctx))
{
std::ostringstream oss;
oss << "io_queue_init() failed: " << std::strerror(-ret) << " (" << ret << ")";
@@ -167,7 +167,7 @@
// clean up allocated memory here
if (_ioctx)
- ::io_queue_release(_ioctx);
+ aio::queue_release(_ioctx);
std::free(_page_base_ptr);
_page_base_ptr = 0;
@@ -183,11 +183,11 @@
std::free(_page_cb_arr);
_page_cb_arr = 0;
- std::free(_iocb_arr);
- _iocb_arr = 0;
+ std::free(_aio_cb_arr);
+ _aio_cb_arr = 0;
- std::free(_ioevt_arr);
- _ioevt_arr = 0;
+ std::free(_aio_event_arr);
+ _aio_event_arr = 0;
}
const char*
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -41,7 +41,7 @@
}
#include <deque>
-#include <libaio.h>
+#include "jrnl/aio.hpp"
#include "jrnl/data_tok.hpp"
#include "jrnl/deq_rec.hpp"
#include "jrnl/enq_map.hpp"
@@ -103,8 +103,8 @@
void* _page_base_ptr; ///< Base pointer to page memory
void** _page_ptr_arr; ///< Array of pointers to cache_num_pages in page memory
page_cb* _page_cb_arr; ///< Array of page_cb structs
- iocb* _iocb_arr; ///< Array of iocb structs
- io_event* _ioevt_arr; ///< Array of io_events
+ aio_cb* _aio_cb_arr; ///< Array of iocb structs
+ aio_event* _aio_event_arr; ///< Array of io_events
io_context_t _ioctx; ///< AIO context for read/write operations
u_int16_t _pg_index; ///< Index of current page being used
u_int32_t _pg_cntr; ///< Page counter; determines if file rotation req'd
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -48,7 +48,7 @@
_hdr(),
_valid(false),
_fhdr_buffer(0),
- _iocbp(0),
+ _fhdr_aio_cb_ptr(0),
_fhdr_rd_outstanding(false)
{}
@@ -71,8 +71,8 @@
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "rmgr", "initialize");
}
- _iocbp = new iocb;
- std::memset(_iocbp, 0, sizeof(iocb*));
+ _fhdr_aio_cb_ptr = new aio_cb;
+ std::memset(_fhdr_aio_cb_ptr, 0, sizeof(aio_cb*));
}
void
@@ -81,10 +81,10 @@
std::free(_fhdr_buffer);
_fhdr_buffer = 0;
- if (_iocbp)
+ if (_fhdr_aio_cb_ptr)
{
- delete _iocbp;
- _iocbp = 0;
+ delete _fhdr_aio_cb_ptr;
+ _fhdr_aio_cb_ptr = 0;
}
}
@@ -253,7 +253,7 @@
rmgr::get_events(page_state state)
{
int ret = 0;
- if ((ret = ::io_getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _ioevt_arr, 0)) < 0)
+ if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
{
if (ret == -EINTR) // No events
return 0;
@@ -273,17 +273,17 @@
throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "rmgr", "get_events");
}
_aio_evt_rem--;
- iocb* iocbp = _ioevt_arr[i].obj; // This I/O control block (iocb)
- page_cb* pcbp = (page_cb*)(iocbp->data); // This page control block (pcb)
- long aioret = (long)_ioevt_arr[i].res;
+ aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+ page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+ long aioret = (long)_aio_event_arr[i].res;
if (aioret < 0)
{
std::ostringstream oss;
oss << "AIO read operation failed: " << std::strerror(-aioret) << " (" << aioret << ")";
- oss << " [pg=" << pcbp->_index << " buf=" << iocbp->u.c.buf;
- oss << " rsize=0x" << std::hex << iocbp->u.c.nbytes;
- oss << " offset=0x" << iocbp->u.c.offset << std::dec;
- oss << " fh=" << iocbp->aio_fildes << "]";
+ oss << " [pg=" << pcbp->_index << " buf=" << aiocbp->u.c.buf;
+ oss << " rsize=0x" << std::hex << aiocbp->u.c.nbytes;
+ oss << " offset=0x" << aiocbp->u.c.offset << std::dec;
+ oss << " fh=" << aiocbp->aio_fildes << "]";
throw jexception(jerrno::JERR__AIO, oss.str(), "rmgr", "get_events");
}
@@ -292,7 +292,7 @@
// Increment the completed read offset
// NOTE: We cannot use _rrfc here, as it may have rotated since submitting count.
// Use stored pointer to nlfh in the pcb instead.
- pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
+ pcbp->_rdblks = aiocbp->u.c.nbytes / JRNL_DBLK_SIZE;
pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
pcbp->_state = state;
pil[i] = pcbp->_index;
@@ -588,10 +588,10 @@
int16_t pi = (i + first_uninit) % _cache_num_pages;
// TODO: For perf, combine contiguous pages into single read
// 1 or 2 AIOs needed depending on whether read block folds
- iocb* this_iocb_ptr = &_iocb_arr[pi];
- rhm_prep_pread(this_iocb_ptr, _rrfc.fh(), _page_ptr_arr[pi],
+ aio_cb* aiocbp = &_aio_cb_arr[pi];
+ aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi],
rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
- if (::io_submit(_ioctx, 1, &this_iocb_ptr) < 0)
+ if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
_rrfc.add_subm_cnt_dblks(rd_size);
_aio_evt_rem++;
@@ -653,8 +653,8 @@
rmgr::init_file_header_read(u_int16_t fid)
{
int rfh = _rrfc.file_handle(fid)->rd_fh();
- rhm_prep_pread(_iocbp, rfh, _fhdr_buffer, _sblksize, 0);
- if (::io_submit(_ioctx, 1, &_iocbp) < 0)
+ aio::prep_pread_2(_fhdr_aio_cb_ptr, rfh, _fhdr_buffer, _sblksize, 0);
+ if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read");
_aio_evt_rem++;
_rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -61,15 +61,15 @@
class rmgr : public pmgr
{
private:
- rrfc& _rrfc; ///< Ref to read rotating file controller
- rec_hdr _hdr; ///< Header used to determind record type
- rd_aio_cb _cb; ///< Callback function pointer for AIO events
+ rrfc& _rrfc; ///< Ref to read rotating file controller
+ rec_hdr _hdr; ///< Header used to determind record type
+ rd_aio_cb _cb; ///< Callback function pointer for AIO events
- bool _valid; ///< Flag is true when read pages contain vailid data
- void* _fhdr_buffer; ///< Buffer used for fhdr reads
- iocb* _iocbp; ///< iocb pointer for fhdr reads
- file_hdr _fhdr; ///< file header instance for reading file headers
- bool _fhdr_rd_outstanding; ///< true if a fhdr read is outstanding
+ bool _valid; ///< Flag is true when read pages contain vailid data
+ void* _fhdr_buffer; ///< Buffer used for fhdr reads
+ aio_cb* _fhdr_aio_cb_ptr; ///< iocb pointer for fhdr reads
+ file_hdr _fhdr; ///< file header instance for reading file headers
+ bool _fhdr_rd_outstanding; ///< true if a fhdr read is outstanding
public:
rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
@@ -106,22 +106,6 @@
void set_params_null(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize);
void init_file_header_read(u_int16_t fid);
-
-
- // Special version of libaio's io_prep_pread() which preserves the value of the data
- // pointer. This allows iocbs to be initialized with a pointer that can be re-used. Note
- // that C++ does not support type long long, this has been replaced with int64_t.
- static inline void rhm_prep_pread(struct iocb *iocb, int fd, void *buf, std::size_t count,
- int64_t offset)
- {
- std::memset((void*)((char*)iocb + sizeof(void*)), 0, sizeof(*iocb) - sizeof(void*));
- iocb->aio_fildes = fd;
- iocb->aio_lio_opcode = IO_CMD_PREAD;
- iocb->aio_reqprio = 0;
- iocb->u.c.buf = buf;
- iocb->u.c.nbytes = count;
- iocb->u.c.offset = offset;
- }
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -51,7 +51,7 @@
_max_io_wait_us(0),
_fhdr_base_ptr(0),
_fhdr_ptr_arr(0),
- _iocba(0),
+ _fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
_jfsize_dblks(0),
_jfsize_pgs(0),
@@ -71,7 +71,7 @@
_max_io_wait_us(max_iowait_us),
_fhdr_base_ptr(0),
_fhdr_ptr_arr(0),
- _iocba(0),
+ _fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
_jfsize_dblks(0),
_jfsize_pgs(0),
@@ -707,14 +707,14 @@
dblk_roundup();
std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE;
- rhm_prep_pwrite(&_iocb_arr[_pg_index], _wrfc.fh(),
+ aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
+ aio::prep_pwrite_2(aiocbp, _wrfc.fh(),
(char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks * JRNL_DBLK_SIZE,
_wrfc.subm_offs());
- iocb* this_iocb_ptr = &_iocb_arr[_pg_index];
- page_cb* pcbp = (page_cb*)(this_iocb_ptr->data); // This page control block (pcb)
+ page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
pcbp->_wdblks = _cached_offset_dblks;
pcbp->_wfh = _wrfc.file_handle();
- if (::io_submit(_ioctx, 1, &this_iocb_ptr) < 0)
+ if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
_wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
_wrfc.incr_aio_cnt();
@@ -746,7 +746,7 @@
wmgr::get_events(page_state state)
{
int ret = 0;
- if ((ret = ::io_getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _ioevt_arr, 0)) < 0)
+ if ((ret = aio::getevents(_ioctx, 0, _cache_num_pages + _jc->num_jfiles(), _aio_event_arr, 0)) < 0)
{
std::ostringstream oss;
oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
@@ -763,9 +763,9 @@
throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events");
}
_aio_evt_rem--;
- iocb* iocbp = _ioevt_arr[i].obj; // This I/O control block (iocb)
- page_cb* pcbp = (page_cb*)(iocbp->data); // This page control block (pcb)
- long aioret = (long)_ioevt_arr[i].res;
+ aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
+ page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
+ long aioret = (long)_aio_event_arr[i].res;
if (aioret < 0)
{
std::ostringstream oss;
@@ -774,11 +774,11 @@
oss << "pg=" << pcbp->_index;
else
{
- file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
+ file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
oss << "fid=" << fhp->_fid;
}
- oss << " size=" << iocbp->u.c.nbytes;
- oss << " offset=" << iocbp->u.c.offset << " fh=" << iocbp->aio_fildes << "]";
+ oss << " size=" << aiocbp->u.c.nbytes;
+ oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]";
throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
}
if (pcbp) // Page writes have pcb
@@ -872,7 +872,7 @@
else // File header writes have no pcb
{
// get fid from original file header record, update pointers for that fid
- file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
+ file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
u_int32_t fid = fhp->_fid;
nlfh* nlfhp = _wrfc.file_handle(fid);
nlfhp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
@@ -915,13 +915,13 @@
}
_fhdr_ptr_arr = (void**)std::malloc(_num_jfiles * sizeof(void*));
MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize");
- _iocba = (iocb**)std::malloc(sizeof(iocb*) * _num_jfiles);
- MALLOC_CHK(_iocba, "_iocba", "wmgr", "initialize");
- std::memset(_iocba, 0, sizeof(iocb*) * _num_jfiles);
+ _fhdr_aio_cb_arr = (aio_cb**)std::malloc(sizeof(aio_cb*) * _num_jfiles);
+ MALLOC_CHK(_fhdr_aio_cb_arr, "_fhdr_aio_cb_arr", "wmgr", "initialize");
+ std::memset(_fhdr_aio_cb_arr, 0, sizeof(aio_cb*) * _num_jfiles);
for (u_int16_t i=0; i<_num_jfiles; i++)
{
_fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i);
- _iocba[i] = new iocb;
+ _fhdr_aio_cb_arr[i] = new aio_cb;
}
_page_cb_arr[0]._state = IN_USE;
_ddtokl.clear();
@@ -1034,9 +1034,9 @@
#ifdef RHM_CLEAN
std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
#endif
- iocb* iocbp = _iocba[fid];
- ::io_prep_pwrite(iocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0);
- if (::io_submit(_ioctx, 1, &iocbp) < 0)
+ aio_cb* aiocbp = _fhdr_aio_cb_arr[fid];
+ aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0);
+ if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
_aio_evt_rem++;
_wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
@@ -1064,12 +1064,12 @@
std::free(_fhdr_ptr_arr);
_fhdr_ptr_arr = 0;
- if (_iocba)
+ if (_fhdr_aio_cb_arr)
{
for (u_int32_t i=0; i<_num_jfiles; i++)
- delete _iocba[i];
- std::free(_iocba);
- _iocba = 0;
+ delete _fhdr_aio_cb_arr[i];
+ std::free(_fhdr_aio_cb_arr);
+ _fhdr_aio_cb_arr = 0;
}
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-07-28 15:27:55 UTC (rev 2231)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2008-07-29 20:13:17 UTC (rev 2232)
@@ -75,7 +75,7 @@
u_int32_t _max_io_wait_us; ///< Max wait in microseconds till submit
void* _fhdr_base_ptr; ///< Base pointer to file header memory
void** _fhdr_ptr_arr; ///< Array of pointers to file headers memory
- iocb** _iocba; ///< Array of iocb pointers for file header writes
+ aio_cb** _fhdr_aio_cb_arr; ///< Array of iocb pointers for file header writes
u_int32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
u_int32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!)
@@ -136,21 +136,6 @@
void write_fhdr(u_int64_t rid, u_int32_t fid, std::size_t fro);
void rotate_page();
void clean();
-
- // Special version of libaio's io_prep_pwrite() which preserves the value of the data
- // pointer. This allows iocbs to be initialized with a pointer that can be re-used. Note
- // that C++ does not support type long long, this has been replaced with int64_t.
- static inline void rhm_prep_pwrite(struct iocb *iocb, int fd, void *buf,
- std::size_t count, int64_t offset)
- {
- std::memset((void*)((char*)iocb + sizeof(void*)), 0, sizeof(*iocb) - sizeof(void*));
- iocb->aio_fildes = fd;
- iocb->aio_lio_opcode = IO_CMD_PWRITE;
- iocb->aio_reqprio = 0;
- iocb->u.c.buf = buf;
- iocb->u.c.nbytes = count;
- iocb->u.c.offset = offset;
- }
};
} // namespace journal
16 years, 5 months
rhmessaging commits: r2231 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-28 11:27:55 -0400 (Mon, 28 Jul 2008)
New Revision: 2231
Modified:
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Removed some unneeded #defines left over from old tests
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2008-07-25 20:43:46 UTC (rev 2230)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2008-07-28 15:27:55 UTC (rev 2231)
@@ -54,13 +54,8 @@
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
_rd_cmpl_cnt_dblks(0),
-#ifdef RHM_RDONLY
- _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
-#else
_wr_subm_cnt_dblks(0),
_wr_cmpl_cnt_dblks(0),
-#endif
_aio_cnt(0)
{}
@@ -74,13 +69,8 @@
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
_rd_cmpl_cnt_dblks(0),
-#ifdef RHM_RDONLY
- _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
-#else
_wr_subm_cnt_dblks(0),
_wr_cmpl_cnt_dblks(0),
-#endif
_aio_cnt(0)
{
initialize(fbasename, fid, jfsize_sblks, ro);
@@ -167,16 +157,12 @@
return true;
}
}
-#ifndef RHM_WRONLY
// Journal overflow test - checks if the file to be reset still contains enqueued records
// or outstanding aios
if (_rec_enqcnt || _aio_cnt)
return false;
-#endif
-#ifndef RHM_RDONLY
_wr_subm_cnt_dblks = 0;
_wr_cmpl_cnt_dblks = 0;
-#endif
return true;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-07-25 20:43:46 UTC (rev 2230)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-07-28 15:27:55 UTC (rev 2231)
@@ -152,9 +152,6 @@
{
_enq_rec.reset(); // sets enqueue rec size
// Check if RID of this rec is still enqueued, if so read it, else skip
-#ifdef RHM_RDONLY
- bool is_enq = true;
-#else
u_int16_t fid = 0;
bool is_enq = false;
try
@@ -187,7 +184,7 @@
if (!_jc->is_read_only() && is_enq)
return RHM_IORES_TXPENDING;
}
-#endif
+
if (is_enq) // ok, this record is enqueued, check it, then read it...
{
if (dtokp->rid())
@@ -205,7 +202,7 @@
}
else
dtokp->set_rid(_hdr._rid);
-#ifndef RHM_RDONLY
+
// TODO: Add member _fid to pmgr::page_cb which indicates the fid from which this page was
// populated. When this value is set in wmgr::flush() somewehere, then uncomment the following
// check:
@@ -219,7 +216,7 @@
// throw jexception(jerrno::JERR_RMGR_FIDMISMATCH, oss.str(), "rmgr",
// "read");
// }
-#endif
+
const iores res = read_enq(_hdr, rptr, dtokp);
dsize = _enq_rec.get_data(datapp);
xidsize = _enq_rec.get_xid(xidpp);
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-07-25 20:43:46 UTC (rev 2230)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-07-28 15:27:55 UTC (rev 2231)
@@ -74,20 +74,9 @@
if (next_fh_index == _nfiles)
next_fh_index = 0;
nlfh* next_fh = _fh_arr[next_fh_index];
-#ifdef RHM_RDONLY
- // Since write constrains read, it is wrfc::rotate() that normally resets both the
- // read and write states. However, in the RHM_RDONLY test mode (no writes), it must
- // be done here.
- bool ret = next_fh->reset();
-#endif
-
_fh_index = next_fh_index;
_curr_fh = next_fh;
-#ifdef RHM_RDONLY
- return ret;
-#else
return true;
-#endif
}
nlfh*
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-25 20:43:46 UTC (rev 2230)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-28 15:27:55 UTC (rev 2231)
@@ -190,7 +190,6 @@
// enqueued.
_wrfc.incr_enqcnt(dtokp->fid());
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
if (xid_len) // If part of transaction, add to transaction map
{
std::string xid((char*)xid_ptr, xid_len);
@@ -198,7 +197,6 @@
}
else
_emap.insert_fid(rid, dtokp->fid());
-#endif
done = true;
}
@@ -324,7 +322,6 @@
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
dtokp->set_wstate(data_tok::DEQ_SUBM);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
if (xid_len) // If part of transaction, add to transaction map
{
// If the enqueue is part of a pending txn, it will not yet be in emap
@@ -341,7 +338,6 @@
u_int16_t fid = _emap.get_remove_fid(dtokp->dequeue_rid());
_wrfc.decr_enqcnt(fid);
}
-#endif
done = true;
}
@@ -457,7 +453,6 @@
{
dtokp->set_wstate(data_tok::ABORT_SUBM);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((char*)xid_ptr, xid_len);
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
@@ -482,7 +477,6 @@
oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "abort");
}
-#endif
done = true;
}
@@ -598,7 +592,6 @@
{
dtokp->set_wstate(data_tok::COMMIT_SUBM);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
// Delete this txn from tmap, process records into emap
std::string xid((char*)xid_ptr, xid_len);
txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
@@ -619,7 +612,6 @@
oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
}
-#endif
done = true;
}
@@ -820,7 +812,6 @@
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::ABORTED);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
it = _txn_pending_set.find(dtokp->xid());
if (it == _txn_pending_set.end())
{
@@ -831,13 +822,11 @@
"get_events");
}
_txn_pending_set.erase(it);
-#endif
break;
case data_tok::COMMIT_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::COMMITTED);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
it = _txn_pending_set.find(dtokp->xid());
if (it == _txn_pending_set.end())
{
@@ -848,7 +837,6 @@
"get_events");
}
_txn_pending_set.erase(it);
-#endif
break;
case data_tok::ENQ_PART:
case data_tok::DEQ_PART:
@@ -943,11 +931,7 @@
iores
wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
-#if defined(RHM_WRONLY)
- const std::size_t /*xidsize*/, const std::size_t /*dsize*/, const bool /*external*/
-#else
const std::size_t xidsize, const std::size_t dsize, const bool external
-#endif
) const
{
// Check status of current file
@@ -978,13 +962,11 @@
{
case WMGR_ENQUEUE:
{
-#if !defined(RHM_WRONLY)
// Check for enqueue reaching cutoff threshold
u_int32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
external));
if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
return RHM_IORES_ENQCAPTHRESH;
-#endif
if (!dtokp->is_writable())
{
std::ostringstream oss;
16 years, 5 months
rhmessaging commits: r2230 - store/branches/mrg-1.0/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-25 16:43:46 -0400 (Fri, 25 Jul 2008)
New Revision: 2230
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/PreparedTransaction.cpp
store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h
Log:
Backport of trunk r.2229: Bugfix for 456524: "Broker crashes during recovery when XA tx are indoubt"
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-25 20:19:52 UTC (rev 2229)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-25 20:43:46 UTC (rev 2230)
@@ -731,18 +731,17 @@
msg->decodeContent(contentBuff);
}
- if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
- std::string xid((char*)xidbuff, xidbuffSize);
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
- if (citr->second.commit_flag)
- queue->recover(msg);
+ PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
+ if (i == prepared.end()) { // not locked
+ queue->recover(msg);
+ } else {
+ TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
+ if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ if (citr->second.commit_flag) queue->recover(msg); // treat as non-tx, roll forward (else aborted, throw away)
} else {
messages[dtokp.rid()] = msg;
}
- } else {
- queue->recover(msg);
}
dtokp.reset();
Modified: store/branches/mrg-1.0/cpp/lib/PreparedTransaction.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/PreparedTransaction.cpp 2008-07-25 20:19:52 UTC (rev 2229)
+++ store/branches/mrg-1.0/cpp/lib/PreparedTransaction.cpp 2008-07-25 20:43:46 UTC (rev 2230)
@@ -65,6 +65,16 @@
return false;
}
+PreparedTransaction::list::iterator PreparedTransaction::getLockedPreparedTransaction(PreparedTransaction::list& txns, queue_id queue, message_id message)
+{
+ for (PreparedTransaction::list::iterator i = txns.begin(); i != txns.end(); i++) {
+ if (i->isLocked(queue, message)) {
+ return i;
+ }
+ }
+ return txns.end();
+}
+
PreparedTransaction::PreparedTransaction(const std::string& _xid,
LockedMappings::shared_ptr _enqueues,
LockedMappings::shared_ptr _dequeues)
Modified: store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h 2008-07-25 20:19:52 UTC (rev 2229)
+++ store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h 2008-07-25 20:43:46 UTC (rev 2230)
@@ -67,6 +67,7 @@
PreparedTransaction(const std::string& xid, LockedMappings::shared_ptr enqueues, LockedMappings::shared_ptr dequeues);
bool isLocked(queue_id queue, message_id message);
static bool isLocked(PreparedTransaction::list& txns, queue_id queue, message_id message);
+ static PreparedTransaction::list::iterator getLockedPreparedTransaction(PreparedTransaction::list& txns, queue_id queue, message_id message);
};
}}
16 years, 5 months
rhmessaging commits: r2229 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-07-25 16:19:52 -0400 (Fri, 25 Jul 2008)
New Revision: 2229
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/PreparedTransaction.cpp
store/trunk/cpp/lib/PreparedTransaction.h
Log:
Bugfix for 456524: "Broker crashes during recovery when XA tx are indoubt"
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-25 19:39:57 UTC (rev 2228)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-25 20:19:52 UTC (rev 2229)
@@ -757,18 +757,17 @@
msg->decodeContent(contentBuff);
}
- if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
- std::string xid((char*)xidbuff, xidbuffSize);
- TplRecoverMapCitr citr = tplRecoverMap.find(xid);
- if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
- if (citr->second.commit_flag)
- queue->recover(msg);
+ PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
+ if (i == prepared.end()) { // not locked
+ queue->recover(msg);
+ } else {
+ TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
+ if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ if (citr->second.commit_flag) queue->recover(msg); // treat as non-tx, roll forward (else aborted, throw away)
} else {
messages[dtokp.rid()] = msg;
}
- } else {
- queue->recover(msg);
}
dtokp.reset();
Modified: store/trunk/cpp/lib/PreparedTransaction.cpp
===================================================================
--- store/trunk/cpp/lib/PreparedTransaction.cpp 2008-07-25 19:39:57 UTC (rev 2228)
+++ store/trunk/cpp/lib/PreparedTransaction.cpp 2008-07-25 20:19:52 UTC (rev 2229)
@@ -65,6 +65,16 @@
return false;
}
+PreparedTransaction::list::iterator PreparedTransaction::getLockedPreparedTransaction(PreparedTransaction::list& txns, queue_id queue, message_id message)
+{
+ for (PreparedTransaction::list::iterator i = txns.begin(); i != txns.end(); i++) {
+ if (i->isLocked(queue, message)) {
+ return i;
+ }
+ }
+ return txns.end();
+}
+
PreparedTransaction::PreparedTransaction(const std::string& _xid,
LockedMappings::shared_ptr _enqueues,
LockedMappings::shared_ptr _dequeues)
Modified: store/trunk/cpp/lib/PreparedTransaction.h
===================================================================
--- store/trunk/cpp/lib/PreparedTransaction.h 2008-07-25 19:39:57 UTC (rev 2228)
+++ store/trunk/cpp/lib/PreparedTransaction.h 2008-07-25 20:19:52 UTC (rev 2229)
@@ -67,6 +67,7 @@
PreparedTransaction(const std::string& xid, LockedMappings::shared_ptr enqueues, LockedMappings::shared_ptr dequeues);
bool isLocked(queue_id queue, message_id message);
static bool isLocked(PreparedTransaction::list& txns, queue_id queue, message_id message);
+ static PreparedTransaction::list::iterator getLockedPreparedTransaction(PreparedTransaction::list& txns, queue_id queue, message_id message);
};
}}
16 years, 5 months
rhmessaging commits: r2228 - mgmt/trunk/cumin/python/wooly.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-07-25 15:39:57 -0400 (Fri, 25 Jul 2008)
New Revision: 2228
Modified:
mgmt/trunk/cumin/python/wooly/forms.py
mgmt/trunk/cumin/python/wooly/forms.strings
Log:
Added <label>s to radio buttons and checkboxes
Modified: mgmt/trunk/cumin/python/wooly/forms.py
===================================================================
--- mgmt/trunk/cumin/python/wooly/forms.py 2008-07-25 19:38:33 UTC (rev 2227)
+++ mgmt/trunk/cumin/python/wooly/forms.py 2008-07-25 19:39:57 UTC (rev 2228)
@@ -159,6 +159,9 @@
super(RadioInput, self).__init__(app, name, form)
self.value = None
+
+ def set_value(self, value):
+ self.value = value
def render_value(self, session, *args):
return self.value
Modified: mgmt/trunk/cumin/python/wooly/forms.strings
===================================================================
--- mgmt/trunk/cumin/python/wooly/forms.strings 2008-07-25 19:38:33 UTC (rev 2227)
+++ mgmt/trunk/cumin/python/wooly/forms.strings 2008-07-25 19:39:57 UTC (rev 2228)
@@ -8,7 +8,7 @@
<input type="checkbox" name="{name}" value="{value}" tabindex="{tab_index}" {checked_attr} {disabled_attr}/>
[RadioInput.html]
-<input type="radio" name="{name}" value="{value}" tabindex="{tab_index}" {checked_attr} {disabled_attr}/>
+<input type="radio" id="{id}" name="{name}" value="{value}" tabindex="{tab_index}" {checked_attr} {disabled_attr}/>
[CheckboxInputSet.html]
<ul>{items}</ul>
@@ -70,9 +70,9 @@
[RadioFieldOption.html]
<div>
- <input type="radio" name="{name}" value="{value}"
+ <input type="radio" name="{name}" id="{id}" value="{value}"
tabindex="{tab_index}" {checked_attr} {disabled_attr}/>
- {title}
+ <label for="{id}">{title}</label>
</div>
[FieldForm.html]
16 years, 5 months
rhmessaging commits: r2227 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-07-25 15:38:33 -0400 (Fri, 25 Jul 2008)
New Revision: 2227
Added:
mgmt/trunk/cumin/python/cumin/brokerlink.py
mgmt/trunk/cumin/python/cumin/brokerlink.strings
Log:
New files for Broker Links and Link Routes(bridges)
Added: mgmt/trunk/cumin/python/cumin/brokerlink.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.py (rev 0)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.py 2008-07-25 19:38:33 UTC (rev 2227)
@@ -0,0 +1,203 @@
+from wooly import *
+from wooly.widgets import *
+from wooly.forms import *
+from wooly.resources import *
+
+from stat import *
+from model import *
+from widgets import *
+from parameters import *
+from formats import *
+from util import *
+
+strings = StringCatalog(__file__)
+
+class PeerSet(CuminTable):
+ def __init__(self, app, name):
+ super(PeerSet, self).__init__(app, name)
+
+ col = self.AddressColumn(app, "addr")
+ self.add_column(col)
+
+ col = self.DurableColumn(app, "durable")
+ self.add_column(col)
+
+ col = self.StateColumn(app, "state")
+ self.add_column(col)
+
+ col = self.LastErrorColumn(app, "last_error")
+ self.add_column(col)
+
+ #col = self.FromPeerColumn(app, "from_peer")
+ #self.add_column(col)
+
+ #col = self.ToPeerColumn(app, "to_peer")
+ #self.add_column(col)
+
+ def get_args(self, session):
+ reg = self.frame.get_object(session)
+ return (reg.getDefaultVhost(),)
+
+ def render_title(self, session, vhost):
+ count = self.get_item_count(session, vhost)
+ return "Broker Links %s" % fmt_count(count)
+
+ def render_sql_where(self, session, vhost):
+ return "where v.id = %(vhost_id)r"
+
+ def get_sql_values(self, session, vhost):
+ return {"vhost_id": vhost.id}
+
+ class AddressColumn(SqlTableColumn):
+ def render_title(self, session, data):
+ return "Address"
+
+ def render_content(self, session, data):
+ peer = Identifiable(data["id"])
+ branch = session.branch()
+ self.frame.show_peer(branch, peer).show_view(branch)
+ name = "%s:%i" % (data["host"], data["port"])
+ return fmt_olink(branch, peer, name=name)
+
+ def get_orderby_sql(self, session):
+ dir = self.parent.is_reversed(session) and "desc" or "asc"
+ return "order by host, port %s" % dir
+
+ class StateColumn(SqlTableColumn):
+ def render_title(self, session, data):
+ return "State"
+
+ class LastErrorColumn(SqlTableColumn):
+ def render_title(self, session, data):
+ return "Last Error"
+
+ class DurableColumn(SqlTableColumn):
+ def render_title(self, session, data):
+ return "Durable"
+
+ class FromPeerColumn(NullSortColumn, FreshDataOnlyColumn):
+ def render_title(self, session, data):
+ return "Bytes from Broker"
+
+ def render_value(self, session, value):
+ return fmt_rate(value)
+
+ class ToPeerColumn(NullSortColumn, FreshDataOnlyColumn):
+ def render_title(self, session, data):
+ return "Bytes to Broker"
+
+ def render_value(self, session, value):
+ return fmt_rate(value)
+
+class PeerFrame(CuminFrame):
+ def __init__(self, app, name):
+ super(PeerFrame, self).__init__(app, name)
+
+ self.object = PeerParameter(app, "id")
+ self.add_parameter(self.object)
+
+ view = PeerView(app, "view")
+ self.add_mode(view)
+ self.set_view_mode(view)
+
+ def render_title(self, session, peer):
+ return super(PeerFrame, self).render_title(session, peer)
+
+class PeerStatus(CuminStatus):
+ def render_messages_received(self, session, peer):
+ return self.app.model.exchange.msgReceives.rate_html(peer)
+
+ def render_messages_routed(self, session, peer):
+ return self.app.model.exchange.msgRoutes.rate_html(peer)
+
+ def render_messages_dropped(self, session, peer):
+ return self.app.model.exchange.msgDrops.value(peer)
+
+ def render_bytes_received(self, session, peer):
+ return self.app.model.exchange.byteReceives.rate_html(peer)
+
+ def render_bytes_routed(self, session, peer):
+ return self.app.model.exchange.byteRoutes.rate_html(peer)
+
+ def render_bytes_dropped(self, session, peer):
+ return self.app.model.exchange.byteDrops.value(peer)
+
+class PeerView(CuminView):
+ def __init__(self, app, name):
+ super(PeerView, self).__init__(app, name)
+
+ status = PeerStatus(app, "status")
+ self.add_child(status)
+
+ self.__tabs = TabbedModeSet(app, "tabs")
+ self.add_child(self.__tabs)
+
+ #self.__tabs.add_tab(PeerStats(app, "stats"))
+
+ self.__routes = PeerRouteSet(app, "routes")
+ self.__tabs.add_tab(self.__routes)
+
+ self.__tabs.add_tab(CuminDetails(app, "details"))
+
+ def show_routes(self, session):
+ return self.__tabs.show_mode(session, self.__routes);
+
+ def render_script(self, session, peer):
+ data = "model.xml?class=peer;id=%i" % peer.id
+ return "wooly.setIntervalUpdate('%s', updatePeer, 3000)" % data
+
+class PeerRouteSet(RouteSet):
+ def get_args(self, session):
+ return self.frame.get_args(session)
+
+ def render_title(self, session, peer):
+ return "Link Routes %s" % \
+ fmt_count(self.get_item_count(session, peer))
+
+ def get_item_count(self, session, peer):
+ return peer.bridges.count()
+
+ def do_get_items(self, session, peer):
+ start, end = self.get_bounds(session)
+ return peer.bridges[start:end]
+
+ def render_item_destination(self, session, route):
+ return route.link.managedBroker
+
+ def render_item_src(self, session, route):
+ return "%s:%d" % (route.link.host, route.link.port)
+
+ def render_item_exchange(self, session, route):
+ return route.dest
+
+ def render_item_routing_key(self, session, route):
+ return route.key
+
+class PeerStats(TabbedModeSet):
+ def __init__(self, app, name):
+ super(PeerStats, self).__init__(app, name)
+
+ self.add_child(StatSet(app, "io", "io"))
+ self.add_child(StatSet(app, "general", "general"))
+
+ chart = self.ReceiveRouteDropRateChart(app, "recvroutedrop")
+ self.add_child(chart)
+
+ chart = StatValueChart(app, "producers")
+ chart.stats = ("producerCount",)
+ self.add_child(chart)
+
+ def render_title(self, session):
+ return "Statistics"
+
+ class ReceiveRouteDropRateChart(StatValueChart):
+ def __init__(self, app, name):
+ super(PeerStats.ReceiveRouteDropRateChart, self).__init__ \
+ (app, name)
+
+ self.stats = ("msgReceives", "msgRoutes", "msgDrops")
+ self.mode = "rate"
+
+ def render_title(self, session, exchange):
+ return "Messages Received, Routed, and Dropped"
+
Added: mgmt/trunk/cumin/python/cumin/brokerlink.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.strings (rev 0)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.strings 2008-07-25 19:38:33 UTC (rev 2227)
@@ -0,0 +1,44 @@
+[PeerSet.sql]
+select
+ l.id,
+ l.host,
+ l.port,
+ l.durable,
+ c.state,
+ c.last_error
+from link as l
+join vhost as v on v.id = l.vhost_id
+left outer join link_stats as c on c.id = l.stats_curr_id
+left outer join link_stats as p on p.id = l.stats_prev_id
+{sql_where}
+{sql_orderby}
+{sql_limit}
+
+[PeerSet.count_sql]
+select count(*)
+from link as l
+join vhost as v on v.id = l.vhost_id
+{sql_where}
+
+[PeerRouteSet.html]
+<div class="rfloat">{page}</div>
+<ul class="radiotabs"> </ul>
+
+<table class="ExchangeBindingSet mobjects">
+ <tr>
+ <th>Destination</th>
+ <th>Source</th>
+ <th>Exchange</th>
+ <th>Routing Key</th>
+ </tr>
+
+ {items}
+</table>
+
+[PeerRouteSet.item_html]
+<tr>
+ <td>{item_destination}</a></td>
+ <td>{item_src}</td>
+ <td>{item_exchange}</td>
+ <td>{item_routing_key}</td>
+</tr>
16 years, 5 months
rhmessaging commits: r2226 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-07-25 15:38:03 -0400 (Fri, 25 Jul 2008)
New Revision: 2226
Modified:
mgmt/trunk/cumin/python/cumin/brokercluster.strings
mgmt/trunk/cumin/python/cumin/brokerprofile.strings
mgmt/trunk/cumin/python/cumin/client.strings
mgmt/trunk/cumin/python/cumin/page.strings
mgmt/trunk/cumin/python/cumin/stat.strings
mgmt/trunk/cumin/python/cumin/system.strings
Log:
Minor tweaks to be more xhtml 1.1 compliant
Modified: mgmt/trunk/cumin/python/cumin/brokercluster.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokercluster.strings 2008-07-25 19:37:24 UTC (rev 2225)
+++ mgmt/trunk/cumin/python/cumin/brokercluster.strings 2008-07-25 19:38:03 UTC (rev 2226)
@@ -96,7 +96,7 @@
{cancel}
</div>
</form>
-<script defer="defer">
+<script type="text/javascript" defer="defer">
(function() {
var elem = wooly.doc().elembyid("{id}").node.elements[1];
elem.focus();
Modified: mgmt/trunk/cumin/python/cumin/brokerprofile.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerprofile.strings 2008-07-25 19:37:24 UTC (rev 2225)
+++ mgmt/trunk/cumin/python/cumin/brokerprofile.strings 2008-07-25 19:38:03 UTC (rev 2226)
@@ -92,7 +92,7 @@
{cancel}
</div>
</form>
-<script defer="defer">
+<script type="text/javascript" defer="defer">
(function() {
var elem = wooly.doc().elembyid("{id}").node.elements[1];
elem.focus();
Modified: mgmt/trunk/cumin/python/cumin/client.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/client.strings 2008-07-25 19:37:24 UTC (rev 2225)
+++ mgmt/trunk/cumin/python/cumin/client.strings 2008-07-25 19:38:03 UTC (rev 2226)
@@ -51,7 +51,7 @@
</thead>
<tbody>{items}</tbody>
</table>
- {hidden_inputs}
+ <div>{hidden_inputs}</div>
</form>
[ConnectionStatus.javascript]
@@ -90,7 +90,7 @@
</tr>
</table>
</div>
-<script>
+<script type="text/javascript">
cumin.objectListeners["{id}"] = updateConnectionStatus, 3000;
</script>
@@ -163,5 +163,5 @@
</thead>
<tbody>{items}</tbody>
</table>
- {hidden_inputs}
+ <div>{hidden_inputs}</div>
</form>
Modified: mgmt/trunk/cumin/python/cumin/page.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.strings 2008-07-25 19:37:24 UTC (rev 2225)
+++ mgmt/trunk/cumin/python/cumin/page.strings 2008-07-25 19:38:03 UTC (rev 2226)
@@ -415,6 +415,18 @@
}
}())
+function addEvent(obj, evType, fn){
+ if (obj.addEventListener){
+ obj.addEventListener(evType, fn, false);
+ return true;
+ } else if (obj.attachEvent){
+ var r = obj.attachEvent("on"+evType, fn);
+ return r;
+ } else {
+ return false;
+ }
+}
+
[CuminPage.html]
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
@@ -424,8 +436,8 @@
<link rel="stylesheet" type="text/css" href="cumin.css"/>
<link rel="shortcut icon" href="resource?name=favicon.ico" type="image/x-icon"/>
<!-- XXX import this via cumin.js instead -->
- <script src="resource?name=wooly.js"> </script>
- <script src="cumin.js"> </script>
+ <script src="resource?name=wooly.js" type="text/javascript"> </script>
+ <script src="cumin.js" type="text/javascript"> </script>
</head>
<body class="{class}">
{mode}
@@ -535,7 +547,7 @@
#trans {
padding: 0;
- marging: 0;
+ margin: 0;
border-top: 1px solid #bbb;
border-bottom: 1px solid #ddd;
}
@@ -591,7 +603,7 @@
<span>{failed_count}</span> failed
</a>
</div>
- <script>
+ <script type="text/javascript">
cumin.modelListeners["actions"] = updateActions
</script>
@@ -612,7 +624,7 @@
}
[MessagingView.html]
-<script>
+<script type="text/javascript">
wooly.setIntervalUpdate("{data_url}", updateMain, 3000);
</script>
<div class="oblock">
@@ -623,7 +635,7 @@
</div>
[GridView.html]
-<script>
+<script type="text/javascript">
wooly.setIntervalUpdate("{data_url}", updateMain, 3000);
</script>
<div class="oblock">
@@ -632,7 +644,7 @@
</div>
[SystemsView.html]
-<script>
+<script type="text/javascript">
wooly.setIntervalUpdate("{data_url}", updateMain, 3000);
</script>
<div class="oblock">
Modified: mgmt/trunk/cumin/python/cumin/stat.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/stat.strings 2008-07-25 19:37:24 UTC (rev 2225)
+++ mgmt/trunk/cumin/python/cumin/stat.strings 2008-07-25 19:38:03 UTC (rev 2226)
@@ -88,7 +88,7 @@
{items}
</tbody>
</table>
-<script>
+<script type="text/javascript">
cumin.objectListeners["{id}"] = updateStats;
</script>
@@ -155,7 +155,7 @@
<img id="{id}" src="{href}" height="100" width="360"/>
</div>
-<script>
+<script type="text/javascript">
cumin.objectListeners["{id}"] = updateChart
</script>
Modified: mgmt/trunk/cumin/python/cumin/system.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/system.strings 2008-07-25 19:37:24 UTC (rev 2225)
+++ mgmt/trunk/cumin/python/cumin/system.strings 2008-07-25 19:38:03 UTC (rev 2226)
@@ -21,5 +21,5 @@
</thead>
<tbody>{items}</tbody>
</table>
- {hidden_inputs}
+ <div>{hidden_inputs}</div>
</form>
16 years, 5 months
rhmessaging commits: r2225 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-07-25 15:37:24 -0400 (Fri, 25 Jul 2008)
New Revision: 2225
Modified:
mgmt/trunk/cumin/python/cumin/widgets.py
mgmt/trunk/cumin/python/cumin/widgets.strings
Log:
Added RouteSet, ExchangeNameField, and ExclusivityField
Modified: mgmt/trunk/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/widgets.py 2008-07-25 19:35:19 UTC (rev 2224)
+++ mgmt/trunk/cumin/python/cumin/widgets.py 2008-07-25 19:37:24 UTC (rev 2225)
@@ -485,6 +485,23 @@
def render_item_messages_matched_rate(self, session, binding):
return self.app.model.binding.msgMatched.rate_html(binding)
+class RouteSet(PaginatedItemSet):
+ def __init__(self, app, name):
+ super(RouteSet, self).__init__(app, name)
+
+ def render_title(self, session, *args):
+ return "Routes"
+
+ def get_item_count(self, session, *args):
+ return Bridge.select().count()
+
+ def do_get_items(self, session, *args):
+ start, end = self.get_bounds(session)
+ return Bridge.select()[start:end]
+
+ def render_item_route_key(self, session, route):
+ return route.key
+
class CheckboxIdColumn(SqlTableColumn):
def __init__(self, app, name, form):
super(CheckboxIdColumn, self).__init__(app, name)
@@ -546,6 +563,16 @@
break
+class ExchangeNameField(NameField):
+ def __init__(self, app, name, form):
+ super(ExchangeNameField, self).__init__(app, name, form)
+
+ self.illegal_chars = " (){}[]-<>&%"
+ self.legal_chars_desc = """
+ The exchange name is invalid; allowed characters are
+ letters, digits, ".", and "_"
+ """
+
class UniqueNameField(NameField):
def __init__(self, app, name, form, cls, fld="name"):
super(UniqueNameField, self).__init__(app, name, form)
@@ -611,3 +638,61 @@
def render_title(self, session):
return "Transient"
+
+class ExclusivityField(RadioField):
+ def __init__(self, app, name, form):
+ super(ExclusivityField, self).__init__(app, name, form)
+
+ param = Parameter(app, "param")
+ param.default = "exclusive"
+ self.add_parameter(param)
+ self.set_parameter(param)
+
+ self.add_option(self.Exclusive(app, "exclusive", form))
+ self.add_option(self.NonExclusive(app, "non-exclusive", form))
+
+ def render_title(self, session):
+ return "Exclusive?"
+
+ class Exclusive(RadioFieldOption):
+ def render_value(self, session):
+ return "exclusive"
+
+ def render_title(self, session):
+ return "Exclusive"
+
+ class NonExclusive(RadioFieldOption):
+ def render_value(self, session):
+ return "non-exclusive"
+
+ def render_title(self, session):
+ return "Non-exclusive"
+
+class AutoDeleteField(RadioField):
+ def __init__(self, app, name, form):
+ super(AutoDeleteField, self).__init__(app, name, form)
+
+ param = Parameter(app, "param")
+ param.default = "autodel"
+ self.add_parameter(param)
+ self.set_parameter(param)
+
+ self.add_option(self.AutoDel(app, "autodel", form))
+ self.add_option(self.Preserve(app, "preserve", form))
+
+ def render_title(self, session):
+ return "Auto-Delete?"
+
+ class AutoDel(RadioFieldOption):
+ def render_value(self, session):
+ return "autodel"
+
+ def render_title(self, session):
+ return "Auto-Delete"
+
+ class Preserve(RadioFieldOption):
+ def render_value(self, session):
+ return "preserve"
+
+ def render_title(self, session):
+ return "Preserve"
Modified: mgmt/trunk/cumin/python/cumin/widgets.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/widgets.strings 2008-07-25 19:35:19 UTC (rev 2224)
+++ mgmt/trunk/cumin/python/cumin/widgets.strings 2008-07-25 19:37:24 UTC (rev 2225)
@@ -17,9 +17,9 @@
{submit}
{cancel}
</div>
- {hidden_inputs}
+ <div>{hidden_inputs}</div>
</form>
-<script>
+<script type="text/javascript">
wooly.doc().elembyid("{id}").node.elements[0].focus();
</script>
16 years, 5 months
rhmessaging commits: r2224 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-07-25 15:35:19 -0400 (Fri, 25 Jul 2008)
New Revision: 2224
Modified:
mgmt/trunk/cumin/python/cumin/queue.py
mgmt/trunk/cumin/python/cumin/queue.strings
Log:
Added Queue Add and Remove
Modified: mgmt/trunk/cumin/python/cumin/queue.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/queue.py 2008-07-25 19:34:31 UTC (rev 2223)
+++ mgmt/trunk/cumin/python/cumin/queue.py 2008-07-25 19:35:19 UTC (rev 2224)
@@ -60,6 +60,14 @@
self.__purge = self.Purge(app, "purge", self)
self.add_child(self.__purge)
+ self.__remove = self.Remove(app, "remove", self)
+ self.add_child(self.__remove)
+
+ def render_add_queue_url(self, session, vhost):
+ branch = session.branch()
+ self.frame.show_queue_add(branch)
+ return branch.marshal()
+
def get_args(self, session):
reg = self.frame.get_object(session)
return (reg.getDefaultVhost(),)
@@ -88,7 +96,21 @@
def render_content(self, session):
return "Purge"
+
+ class Remove(FormButton):
+ def process_submit(self, session):
+ ids = self.parent.ids.get(session)
+ self.parent.ids.clear(session)
+ branch = session.branch()
+ frame = self.frame.show_queues_remove(branch)
+ frame.ids.set(branch, ids)
+ self.page.set_redirect_url(session, branch.marshal())
+
+ def render_content(self, session):
+ return "Remove"
+
+
class NameColumn(SqlTableColumn):
def render_title(self, session, data):
return "Name"
@@ -180,6 +202,10 @@
self.purge = QueuePurge(app, "purge")
self.add_mode(self.purge)
+ remove = QueueRemove(app, "remove")
+ self.add_mode(remove)
+ self.set_remove_mode(remove)
+
def show_purge(self, session):
return self.show_mode(session, self.purge)
@@ -282,13 +308,122 @@
self.durable = DurabilityField(app, "durable", self)
self.add_field(self.durable)
+
+ self.exclusive = ExclusivityField(app, "exclusive", self)
+ self.add_field(self.exclusive)
+
+ self.autodelete = AutoDeleteField(app, "autodelete", self)
+ self.add_field(self.autodelete)
+ self.bindings = ExchangeKeysField(app, "bindings", self)
+ self.add_field(self.bindings)
+
+
+class ExchangeKeysInput(FormInput):
+ def __init__(self, app, name, form):
+ super(ExchangeKeysInput, self).__init__(app, name, form)
+
+ self.name_param = Parameter(app, "name_param");
+ self.add_parameter(self.name_param)
+ form.add_form_parameter(self.name_param)
+
+ self.names = ListParameter(app, "name", self.name_param)
+ self.add_parameter(self.names)
+ form.add_form_parameter(self.names)
+
+ self.binding_param = Parameter(app, "binding_param")
+ self.add_parameter(self.binding_param)
+ form.add_form_parameter(self.binding_param)
+
+ self.bindings = ListParameter(app, "binding", self.binding_param)
+ self.add_parameter(self.bindings)
+ form.add_form_parameter(self.bindings)
+
+ self.extra_param = Parameter(app, "extra_param")
+ self.add_parameter(self.extra_param)
+ form.add_form_parameter(self.extra_param)
+
+ self.extras = ListParameter(app, "extra", self.extra_param)
+ self.add_parameter(self.extras)
+ form.add_form_parameter(self.extras)
+
+ self.exchangekey_tmpl = Template(self, "exchangekey_html")
+
+ # info to help create the <input> elements for various exchange types
+ self.binding_typemap = {"direct":{"type":"text",
+ "enable":"disabled='disabled'"},
+ "topic":{"type":"text", "enable":None},
+ "fanout":{"type":"hidden", "enable":None},
+# "headers":{"type":"text", "enable":None},
+# "xml":{"type":"hidden", "enable":None},
+ }
+ # exchange names to suppress
+ self.binding_omit_by_name = ("", "qpid.management", "amq.match")
+ self.binding_omit_by_type = ("xml", "headers")
+
+ def do_render(self, session, *args):
+ writer = Writer()
+ # get the registration from the broker frame
+ reg = self.frame.frame.get_object(session)
+ vhost = reg.getDefaultVhost()
+
+ sortedExchanges = sorted_by(vhost.exchanges)
+ # render a row for each exchange we support
+ for exchange in sortedExchanges:
+ if not exchange.name in self.binding_omit_by_name:
+ if not exchange.type in self.binding_omit_by_type:
+ self.exchangekey_tmpl.render(writer, session, exchange)
+
+ return writer.to_string()
+
+ def render_exchange_name(self, session, exchange):
+ return exchange.name
+
+ def render_exchange_fmt_name(self, session, exchange):
+ return fmt_shorten(exchange.name)
+
+ def render_exchange_path(self, session, *args):
+ return self.names.path
+
+ def render_exchange_id(self, session, exchange):
+ return exchange.id
+
+ def render_binding_path(self, session, exchange):
+ return self.bindings.path
+
+ def render_exchange_type(self, session, exchange):
+ return exchange.type
+
+ def render_binding_input_type(self, session, exchange):
+ return self.binding_typemap[exchange.type]["type"]
+
+ def render_binding_disabled(self, session, exchange):
+ return self.binding_typemap[exchange.type]["enable"]
+
+class ExchangeKeysField(FormField):
+ def __init__(self, app, name, form):
+ super(ExchangeKeysField, self).__init__(app, name, form)
+
+ self.__input = ExchangeKeysInput(app, "inputs", form)
+ self.add_child(self.__input)
+ self.inputs = self.__input
+
+ def render_title(self, session):
+ return "Exchange Bindings:"
+
+ def render_exchange_keys(self, session, *args):
+ return self.__input.render(session)
+
+ def render_exchange_path(self, session, *args):
+ return self.inputs.names.path
+
+ def render_queue_name_path(self, session, *args):
+ return self.inputs.form.namef.get_parameter().path
+
class QueueAdd(QueueForm):
def process_cancel(self, session):
branch = session.branch()
- self.page.pop_current_frame(branch)
- self.page.pop_current_frame(branch)
- self.page.get_current_frame(branch).show_view(branch)
+ self.frame.show_view(branch)
self.page.set_redirect_url(session, branch.marshal())
def process_submit(self, session):
@@ -297,20 +432,77 @@
if errors:
pass
else:
- reg = self.frame.frame.get_object(session)
+ reg = self.frame.get_object(session)
vhost = reg.getDefaultVhost()
- name = self.namef.get(session)
+ queue_name = self.namef.get(session)
durable = self.durable.get(session)
+ exclusive = self.exclusive.get(session)
+ autodelete = self.autodelete.get(session)
+ binding_names = self.bindings.inputs.names.get(session)
+ binding_keys = self.bindings.inputs.bindings.get(session)
+ #assumes a 1:1 correlation between names and keys
+ exchange_keys = dict(zip(binding_names, binding_keys))
- log.info("Adding queue '%s'" % name)
+ # sanity check: this should have been handled by the javascript
+ # on the form submit
+ for binding_name in binding_names:
+ exchange = self.__get_exchange_named(binding_name, vhost.exchanges)
+ if exchange.type == 'direct':
+ exchange_keys[exchange.name] = queue_name
+
+ args = {
+ "queue": queue_name,
+ "durable": durable,
+ "exclusive": exclusive,
+ "auto_delete": autodelete,
+ "exchange_keys": exchange_keys
+ }
+ log.info("Adding queue:\n\tname: \
+ '%s'\n\tdurablility: \
+ %s\n\texclusivity: \
+ %s\n\tlongevity: %s" % \
+ (queue_name, durable, exclusive, autodelete) )
+ action = self.app.model.queue.add
+ action.invoke(reg, args)
+
+ # navigate back to main queue frame
self.process_cancel(session)
- def render_title(self, session):
- reg = self.frame.frame.get_object(session)
- return "Add Queue to Broker '%s'" % reg.name
+ def render_title(self, session, *args):
+ reg = self.frame.get_object(session)
+ return "Add Queue to the Broker '%s'" % reg.name
+ def __get_exchange_named(self, name, exchanges):
+ for exchange in exchanges:
+ if exchange.name == name:
+ return exchange
+
+class QueueRemove(CuminConfirmForm):
+ def get_args(self, session):
+ return self.frame.get_args(session)
+
+ def render_title(self, session, queue):
+ return "Remove Queue '%s'" % queue.name
+
+ def process_cancel(self, session, queue):
+ branch = session.branch()
+ self.frame.show_view(branch)
+ self.page.set_redirect_url(session, branch.marshal())
+
+ def process_submit(self, session, queue):
+ action = self.app.model.queue.remove
+ action.invoke(queue)
+
+ self.process_cancel(session, queue)
+
+ def render_submit_content(self, session, queue):
+ return "Yes, Remove Queue '%s'" % queue.name
+
+ def render_cancel_content(self, session, queue):
+ return "No, Cancel"
+
class QueuePurge(CuminConfirmForm):
def get_args(self, session):
return self.frame.get_args(session)
@@ -353,6 +545,24 @@
def render_item_content(self, session, id):
return "Purge Queue '%s'" % Queue.get(id).name
+class QueueSetRemove(CuminBulkActionForm):
+ def process_return(self, session):
+ branch = session.branch()
+ self.frame.show_view(branch)
+ self.page.set_current_frame(branch, self.frame)
+ self.page.set_redirect_url(session, branch.marshal())
+
+ def process_item(self, session, id):
+ queue = Queue.get(id)
+ action = self.app.model.queue.remove
+ action.invoke(queue)
+
+ def render_title(self, session):
+ return "Remove Queues"
+
+ def render_item_content(self, session, id):
+ return "Remove Queue '%s'" % Queue.get(id).name
+
class QueueBindingAdd(CuminForm):
def __init__(self, app, name):
super(QueueBindingAdd, self).__init__(app, name)
Modified: mgmt/trunk/cumin/python/cumin/queue.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/queue.strings 2008-07-25 19:34:31 UTC (rev 2223)
+++ mgmt/trunk/cumin/python/cumin/queue.strings 2008-07-25 19:35:19 UTC (rev 2224)
@@ -36,13 +36,16 @@
[QueueSet.html]
<form id="{id}" method="post" action="?">
+
+ <ul class="actions">
+ <li><a class="nav" href="{add_queue_url}">Add New Queue</a></li>
+ </ul>
<div class="rfloat">{phase}</div>
-
{unit}
<div class="sactions">
<h2>Act on Selected Queues:</h2>
- {purge}
+ {purge} {remove}
</div>
<table class="mobjects">
@@ -57,7 +60,7 @@
</thead>
<tbody>{items}</tbody>
</table>
- {hidden_inputs}
+ <div>{hidden_inputs}</div>
</form>
[QueueStatus.javascript]
@@ -102,7 +105,7 @@
</tr>
</table>
</div>
-<script>
+<script type="text/javascript">
cumin.objectListeners["{id}"] = updateQueueStatus
</script>
@@ -167,7 +170,7 @@
{cancel}
</div>
</form>
-<script defer="defer">
+<script type="text/javascript" defer="defer">
(function() {
var elem = wooly.doc().elembyid("{id}").node.elements[1];
elem.focus();
@@ -276,3 +279,106 @@
<td class="ralign">{item_bytes_consumed}</td>
<td class="ralign">{item_unacked_messages}</td>
</tr>
+
+[QueueAdd.css]
+td.exchange_type {
+ font-style: italic;
+}
+
+[QueueAdd.javascript]
+var direct_keys = [];
+var queue_name_field = null;
+
+function attach_submit() {
+ var theForm = document.forms[0];
+ addEvent(theForm, "submit", associate_keys);
+
+ // get list of binding key fields that are 'direct'
+ // and add a listener to echo the queue_name
+ var inputs = document.getElementsByTagName("input");
+ if (inputs) {
+ for (var i=0; i<inputs.length; i++) {
+ if (inputs[i].className.indexOf("binding_class_direct") != -1) {
+ direct_keys[direct_keys.length] = inputs[i];
+ }
+ }
+ var oQueueName = document.forms[0].elements[queue_name];
+ if (oQueueName) {
+ addEvent(oQueueName, "blur", echo_queue_name);
+ addEvent(oQueueName, "keyup", echo_queue_name);
+ queue_name_field = oQueueName;
+ }
+ }
+
+}
+// When the form is submitted, make sure we can correctly associate
+// the input boxes with the checkboxes
+function associate_keys() {
+
+ // get the list of exchange chechboxes
+ var exchanges = document.forms[0].elements[exchange_path];
+ if (exchanges) {
+ for (var i=0; i<exchanges.length; i++) {
+ var chk_value = exchanges[i].value;
+ // get the associated input box
+ var oInput = document.getElementById(chk_value);
+ if (oInput) {
+ oInput.disabled = false; // disabled inputs don't get submitted
+ // so there should be a 1:1 relationship
+ // between checked boxes and input values
+ if (!exchanges[i].checked) {
+ oInput.disabled = true;
+ oInput.value = '';
+ }
+ }
+ }
+ }
+}
+
+// echos the queue_name field into the binding key fields of type 'direct'
+function echo_queue_name() {
+ if (queue_name_field) {
+ for (var i=0; i<direct_keys.length; i++) {
+ direct_keys[i].value = queue_name_field.value;
+ }
+ }
+}
+
+[ExchangeKeysInput.exchangekey_html]
+ <tr>
+ <td><input type="checkbox"
+ id="{id}.{exchange_id}"
+ name="{exchange_path}" value="{exchange_name}" size="15"
+ tabindex="100"/></td>
+ <td><label for="{id}.{exchange_id}">{exchange_fmt_name}</label></td>
+ <td class="exchange_type">{exchange_type}</td>
+ <td><input class="binding_class_{exchange_type}"
+ type="{binding_input_type}"
+ name="{binding_path}" id="{exchange_name}"
+ value="" size="32" maxlength="256" tabindex="100" {binding_disabled}/></td>
+ </tr>
+
+[ExchangeKeysField.html]
+<div class="field">
+ <div class="title">{title}</div>
+
+<div class="inputs">
+<table class="mobjects">
+ <thead>
+ <tr>
+ <th colspan="2">Exchange Name</th>
+ <th>Exchange Type</th>
+ <th>Binding Key</th>
+ </tr>
+ </thead>
+ <tbody>
+ {exchange_keys}
+ </tbody>
+</table>
+</div>
+</div>
+<script type="text/javascript">
+ var exchange_path="{exchange_path}";
+ var queue_name="{queue_name_path}";
+ addEvent(window, 'load', attach_submit);
+</script>
16 years, 5 months
rhmessaging commits: r2223 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-07-25 15:34:31 -0400 (Fri, 25 Jul 2008)
New Revision: 2223
Modified:
mgmt/trunk/cumin/python/cumin/parameters.py
Log:
Added PeerParameter class
Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py 2008-07-25 19:33:12 UTC (rev 2222)
+++ mgmt/trunk/cumin/python/cumin/parameters.py 2008-07-25 19:34:31 UTC (rev 2223)
@@ -65,6 +65,13 @@
def do_marshal(self, exchange):
return str(exchange.id)
+class PeerParameter(Parameter):
+ def do_unmarshal(self, string):
+ return Link.get(int(string))
+
+ def do_marshal(self, peer):
+ return str(peer.id)
+
class QueueParameter(Parameter):
def do_unmarshal(self, string):
return Queue.get(int(string))
16 years, 5 months