rhmessaging commits: r2142 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-06 11:47:31 -0400 (Fri, 06 Jun 2008)
New Revision: 2142
Modified:
mgmt/mint/python/mint/__init__.py
Log:
Missing import
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-06 11:35:41 UTC (rev 2141)
+++ mgmt/mint/python/mint/__init__.py 2008-06-06 15:47:31 UTC (rev 2142)
@@ -1,4 +1,4 @@
-import os, socket, qpid, logging
+import sys, os, socket, qpid, logging
from qpid.datatypes import uuid4
from qpid.connection import Connection as QpidConnection
from qpid.util import connect
16 years, 9 months
rhmessaging commits: r2141 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-06 07:35:41 -0400 (Fri, 06 Jun 2008)
New Revision: 2141
Modified:
mgmt/cumin/python/cumin/model.py
mgmt/cumin/python/cumin/queue.strings
Log:
Restore the byte depth column in the queue table
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2008-06-05 23:02:57 UTC (rev 2140)
+++ mgmt/cumin/python/cumin/model.py 2008-06-06 11:35:41 UTC (rev 2141)
@@ -1017,11 +1017,13 @@
prop.title = "Name"
prop.summary = True
- prop = CuminProperty(self, "journalDirectory")
+ prop = CuminProperty(self, "directory")
prop.title = "Directory"
stat = CuminStat(self, "initialFileCount")
stat.title = "Initial File Count"
+ stat.unit = "file"
+ stat.category = "io.journal"
stat = CuminStat(self, "dataFileSize")
stat.title = "Data File Size"
Modified: mgmt/cumin/python/cumin/queue.strings
===================================================================
--- mgmt/cumin/python/cumin/queue.strings 2008-06-05 23:02:57 UTC (rev 2140)
+++ mgmt/cumin/python/cumin/queue.strings 2008-06-06 11:35:41 UTC (rev 2141)
@@ -17,7 +17,7 @@
/ (extract(epoch from (c.rec_time - p.rec_time)) + 0.0001) as bdequeued,
case when p.byte_total_dequeues is null then true else false end as bdequeued_is_null,
c.msg_depth as mdepth,
- 999 as bdepth,
+ c.byte_depth as bdepth,
1 as mdepthaccel,
1 as bdepthaccel,
c.rec_time
16 years, 9 months
rhmessaging commits: r2140 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-06-05 19:02:57 -0400 (Thu, 05 Jun 2008)
New Revision: 2140
Modified:
mgmt/mint/python/mint/__init__.py
Log:
additional locking around idMap
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-05 22:30:30 UTC (rev 2139)
+++ mgmt/mint/python/mint/__init__.py 2008-06-05 23:02:57 UTC (rev 2140)
@@ -7,6 +7,7 @@
from sqlobject import *
from threading import Lock
from traceback import print_exc
+from sys import exc_info
from mint import schema
@@ -124,7 +125,11 @@
self.lock = Lock()
def set(self, idOriginal, obj):
- self.idMap[idOriginal] = obj
+ self.lock.acquire()
+ try:
+ self.idMap[idOriginal] = obj
+ finally:
+ self.lock.release()
def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
obj = None
@@ -505,7 +510,7 @@
def updateObjWithDict(self, obj, d):
updateDone = False
reattemptCount = 0
- while not updateDone:
+ while not updateDone and len(d) > 0:
try:
obj.set(**d)
obj.syncUpdate()
@@ -531,6 +536,11 @@
except KeyError, detail:
self.log("KeyError: Schema mismatch: %s" % detail)
return None
+ except:
+ #TODO: better exception handling here
+ self.log("Unexpected Error: %s" % sys.exc_info()[0])
+ print "Unexpected Error: %s" % sys.exc_info()[0]
+ return obj
return obj
def methodCallback(self, brokerId, methodId, errorNo, errorText, args):
16 years, 9 months
rhmessaging commits: r2139 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-05 18:30:30 -0400 (Thu, 05 Jun 2008)
New Revision: 2139
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/util.py
Log:
Correct some places where we use the log parameter
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-06-05 22:22:38 UTC (rev 2138)
+++ mgmt/cumin/python/cumin/__init__.py 2008-06-05 22:30:30 UTC (rev 2139)
@@ -187,7 +187,7 @@
try:
h = logging.FileHandler(self.log_file)
except IOError, e:
- root.warn("Can't write to log file '%s': %s" % (self.log, e))
+ root.warn("Can't write to log file '%s': %s" % (self.log_file, e))
h.setLevel(self.debug and logging.DEBUG or logging.INFO)
root.addHandler(h)
Modified: mgmt/cumin/python/cumin/util.py
===================================================================
--- mgmt/cumin/python/cumin/util.py 2008-06-05 22:22:38 UTC (rev 2138)
+++ mgmt/cumin/python/cumin/util.py 2008-06-05 22:30:30 UTC (rev 2139)
@@ -108,7 +108,8 @@
param = self.__params_by_name.get(name)
if param:
- setattr(self, param.name, self.unmarshal(param, value))
+ setattr(self, param.name.replace("-", "_"),
+ self.unmarshal(param, value))
else:
self.__log.info("Ignoring unrecognized parameter '%s'" % name)
16 years, 9 months
rhmessaging commits: r2138 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-05 18:22:38 -0400 (Thu, 05 Jun 2008)
New Revision: 2138
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/util.py
Log:
Rename log to log-file to make room for future logging enhancements
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-06-05 22:07:06 UTC (rev 2137)
+++ mgmt/cumin/python/cumin/__init__.py 2008-06-05 22:22:38 UTC (rev 2138)
@@ -161,7 +161,7 @@
lpath = os.path.join(self.home, "log", "cumin.log")
summ = ("PATH", "Log to file at PATH")
- self.add_param("log", str, lpath, summ)
+ self.add_param("log-file", str, lpath, summ)
summ = "Enable debug mode"
self.add_param("debug", bool, False, summ)
@@ -185,7 +185,7 @@
root.addHandler(h)
try:
- h = logging.FileHandler(self.log)
+ h = logging.FileHandler(self.log_file)
except IOError, e:
root.warn("Can't write to log file '%s': %s" % (self.log, e))
Modified: mgmt/cumin/python/cumin/util.py
===================================================================
--- mgmt/cumin/python/cumin/util.py 2008-06-05 22:07:06 UTC (rev 2137)
+++ mgmt/cumin/python/cumin/util.py 2008-06-05 22:22:38 UTC (rev 2138)
@@ -65,10 +65,12 @@
def load_defaults(self):
for param in self.__params:
- if hasattr(self, param.name):
+ name = param.name.replace("-", "_")
+
+ if hasattr(self, name):
raise Exception("Parameter '%s' already present" % name)
- setattr(self, param.name, param.default)
+ setattr(self, name, param.default)
def load_file(self, file):
conf = SafeConfigParser()
@@ -139,7 +141,7 @@
print "Configuration:"
for param in self.__params:
- value = getattr(self, param.name)
+ value = getattr(self, param.name.replace("-", "_"))
if value == param.default:
flag = " [default]"
16 years, 9 months
rhmessaging commits: r2137 - in store/trunk/cpp: tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-06-05 18:07:06 -0400 (Thu, 05 Jun 2008)
New Revision: 2137
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/tests/failing_python_tests.txt
store/trunk/cpp/tests/python_tests/
store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
Fix for 448935: "Lazy-load with journal will fail for redelivered messages". Added additional flow-to-disk tests that test this mode of failure.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-06-05 22:07:06 UTC (rev 2137)
@@ -53,6 +53,7 @@
const qpid::sys::Duration flushTimeout):
jcntl(journalId, journalDirectory, journalBaseFilename),
getEventsTimerSetFlag(false),
+ lastReadRid(0),
writeActivityFlag(false),
flushTriggeredFlag(true),
_xidp(0),
@@ -101,14 +102,7 @@
}
(dynamic_cast<GetEventsFireEvent*>(getEventsFireEventsPtr.get()))->cancel();
(dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()))->cancel();
- if (_xidp) {
- ::free(_xidp);
- _xidp = 0;
- _datap = 0;
- } else if (_datap) {
- ::free(_datap);
- _datap = 0;
- }
+ free_read_buffers();
// TODO: Make this if() thread-safe
if (journalTimerPtr && --cnt == 0)
@@ -229,14 +223,9 @@
if (_dtok.rid() != rid)
{
// Free any previous msg
- if (_xidp) {
- ::free(_xidp);
- _xidp = 0;
- _datap = 0;
- } else if (_datap) {
- ::free(_datap);
- _datap = 0;
- }
+ free_read_buffers();
+ if (rid < lastReadRid)
+ _rmgr.invalidate();
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
@@ -249,30 +238,36 @@
unsigned aio_sleep_cnt = 0;
while (!done) {
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
- bool rid_low = _dtok.rid() < rid;
- rid_found = _dtok.rid() == rid;
- if (res == journal::RHM_IORES_SUCCESS && !rid_low) {
- done = true;
- } else if (res == journal::RHM_IORES_PAGE_AIOWAIT) {
- if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
- get_wr_events();
- usleep(AIO_SLEEP_TIME);
- } else {
+ switch (res) {
+ case journal::RHM_IORES_SUCCESS:
+ if (_dtok.rid() < rid) {
+ free_read_buffers();
+ // reset data token for next read
+ _dlen = 0;
+ _dtok.reset();
+ _dtok.set_wstate(DataTokenImpl::ENQ);
+ _dtok.set_rid(0);
+ } else {
+ rid_found = _dtok.rid() == rid;
+ lastReadRid = rid;
+ done = true;
+ }
+ break;
+ case journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
+ get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ } else {
+ std::stringstream ss;
+ ss << "read_data_record() returned " << journal::iores_str(res);
+ ss << "; exceeded maximum wait time";
+ throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+ }
+ break;
+ default:
std::stringstream ss;
ss << "read_data_record() returned " << journal::iores_str(res);
- ss << "; exceeded maximum wait time";
throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
- }
- } else if (rid_low) {
- // reset data token for next read
- _dlen = 0;
- _dtok.reset();
- _dtok.set_wstate(DataTokenImpl::ENQ);
- _dtok.set_rid(0);
- } else {
- std::stringstream ss;
- ss << "read_data_record() returned " << journal::iores_str(res);
- throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
}
}
if (!rid_found) {
@@ -447,6 +442,19 @@
}
void
+JournalImpl::free_read_buffers()
+{
+ if (_xidp) {
+ ::free(_xidp);
+ _xidp = 0;
+ _datap = 0;
+ } else if (_datap) {
+ ::free(_datap);
+ _datap = 0;
+ }
+}
+
+void
JournalImpl::handleIoResult(const iores r)
{
writeActivityFlag = true;
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-06-05 22:07:06 UTC (rev 2137)
@@ -74,6 +74,8 @@
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
+
+ u_int64_t lastReadRid; // rid of last read msg
bool writeActivityFlag;
bool flushTriggeredFlag;
@@ -182,6 +184,8 @@
qpid::management::Args&);
private:
+ void free_read_buffers();
+
inline void setGetEventTimer()
{
getEventsFireEventsPtr->addRef();
Modified: store/trunk/cpp/tests/failing_python_tests.txt
===================================================================
--- store/trunk/cpp/tests/failing_python_tests.txt 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/tests/failing_python_tests.txt 2008-06-05 22:07:06 UTC (rev 2137)
@@ -1,4 +0,0 @@
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_count_transient
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_count_persistent
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_size_transient
-python_tests.flow_to_disk.AsyncFlowToDiskTests.test_simple_max_size_persistent
Property changes on: store/trunk/cpp/tests/python_tests
___________________________________________________________________
Name: svn:ignore
+ *.pyc
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2008-06-05 22:05:26 UTC (rev 2136)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2008-06-05 22:07:06 UTC (rev 2137)
@@ -19,34 +19,51 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.testlib import TestBase010
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException
class AsyncFlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
- def test_simple_max_count_transient(self):
+ def test_01_simple_max_count_transient(self):
queue_args = {'qpid.max_count': 10}
- self.simple_limit("test_simple_max_count_transient", queue_args, self.session.delivery_mode.non_persistent)
+ self.simple_limit("test_simple_max_count_transient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
- def test_simple_max_count_persistent(self):
+ def test_02_simple_max_count_persistent(self):
queue_args = {'qpid.max_count': 10}
- self.simple_limit("test_simple_max_count_persistent", queue_args, self.session.delivery_mode.persistent)
+ self.simple_limit("test_simple_max_count_persistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
- def test_simple_max_size_transient(self):
+ def test_03_simple_max_size_transient(self):
queue_args = {'qpid.max_size': 100}
- self.simple_limit("test_simple_max_size_transient", queue_args, self.session.delivery_mode.non_persistent)
+ self.simple_limit("test_simple_max_size_transient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
- def test_simple_max_size_persistent(self):
+ def test_04_simple_max_size_persistent(self):
queue_args = {'qpid.max_size': 100}
- self.simple_limit("test_simple_max_size_persistent", queue_args, self.session.delivery_mode.persistent)
+ self.simple_limit("test_simple_max_size_persistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
- def simple_limit(self, queue_name, queue_args, delivery_mode):
+ def test_05_simple_max_count_transient_not_acquired(self):
+ queue_args = {'qpid.max_count': 10}
+ self.simple_limit("test_simple_max_count_transient_not_acquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+ def test_06_simple_max_count_persistent_not_acquired(self):
+ queue_args = {'qpid.max_count': 10}
+ self.simple_limit("test_simple_max_count_persistent_not_acquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+ def test_07_simple_max_size_transient_not_acquired(self):
+ queue_args = {'qpid.max_size': 100}
+ self.simple_limit("test_simple_max_size_transient_not_acquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+ def test_08_simple_max_size_persistent_not_acquired(self):
+ queue_args = {'qpid.max_size': 100}
+ self.simple_limit("test_simple_max_size_persistent_not_acquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+ def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode):
"""
- Test a simple case of max message count.
+ Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count 10 or max_size 100
- * 15 messages are added. The last five will flow to disk.
+ * 15 messages of size 10 are added. The last five will flow to disk.
* Consume 15 messages.
+ * Check the broker has no messages left.
"""
session = self.session
@@ -57,17 +74,96 @@
msg_str = "Message %02d" % msg_num
session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
- # Consume 15 messages
- session.message_subscribe(queue=queue_name, destination="tag")
+ # Consume/browse 15 messages
+ session.message_subscribe(queue=queue_name, destination="tag", acquire_mode=acquire_mode)
session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("tag")
+ ids = RangedSet()
for msg_num in range(0, 15):
expected_str = "Message %02d" % msg_num
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+ # If not_acquired, chek messages are still on queue, then acquire/accept
+ if acquire_mode == self.session.acquire_mode.not_acquired:
+ session.queue_declare(queue=queue_name)
+ self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+ response = session.message_acquire(ids)
+ for range_ in ids:
+ for msg_id in range_:
+ self.assert_(msg_id in response.transfers)
+ session.message_accept(ids)
+
# Check queue is empty
session.queue_declare(queue=queue_name)
- reply = session.queue_query(queue=queue_name)
- self.assertEqual(0, reply.message_count)
+ self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
+
+
+ def test_09_max_count_browse_consume_transient(self):
+ queue_args = {'qpid.max_count': 10}
+ self.not_acquired_browse_consume_limit("test_max_count_browse_consume_transient", queue_args, self.session.delivery_mode.non_persistent)
+
+ def test_10_max_count_browse_consume_persistent(self):
+ queue_args = {'qpid.max_count': 10}
+ self.not_acquired_browse_consume_limit("test_max_count_browse_consume_persistent", queue_args, self.session.delivery_mode.persistent)
+
+ def test_11_max_size_browse_consume_transient(self):
+ queue_args = {'qpid.max_size': 100}
+ self.not_acquired_browse_consume_limit("test_max_size_browse_consume_transient", queue_args, self.session.delivery_mode.non_persistent)
+
+ def test_12_max_size_browse_consume_persistent(self):
+ queue_args = {'qpid.max_size': 100}
+ self.not_acquired_browse_consume_limit("test_max_size_browse_consume_persistent", queue_args, self.session.delivery_mode.persistent)
+
+
+ def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode):
+ """
+ Test to check browsing then subsequent consumption of flow-to-disk messages.
+ * 15 messages of size 10 are added. The last five will flow to disk.
+ * Browse 15 messages, then release them.
+ * Checks the broker still has all messages.
+ * Consumes 15 messages
+ * Checks the broker has no messages left.
+ """
+
+ session = self.session
+ session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
+
+ # Add 15 messages
+ for msg_num in range(0, 15):
+ msg_str = "Message %02d" % msg_num
+ session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
+
+ # Browse 15 messages
+ session.message_subscribe(queue=queue_name, destination="tagA", acquire_mode=session.acquire_mode.not_acquired)
+ session.message_flow(destination="tagA", unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = session.incoming("tagA")
+ ids = RangedSet()
+ for msg_num in range(0, 15):
+ expected_str = "Message %02d" % msg_num
+ msg = queue.get(timeout=5)
+ self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+
+ # Release all 15 messages and close
+ session.message_release(ids)
+ session.queue_declare(queue=queue_name)
+ self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+
+ # Cancel subscription, start new one that consumes
+ session.message_cancel(destination="tagA")
+ session.message_subscribe(queue=queue_name, destination="tagB", acquire_mode=session.acquire_mode.pre_acquired)
+ session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = session.incoming("tagB")
+ for msg_num in range(0, 15):
+ expected_str = "Message %02d" % msg_num
+ msg = queue.get(timeout=5)
+ self.assertEqual(expected_str, msg.body)
+
+ # Check queue is empty
+ session.queue_declare(queue=queue_name)
+ self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
16 years, 9 months
rhmessaging commits: r2136 - mgmt/mint/sql.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-06-05 18:05:26 -0400 (Thu, 05 Jun 2008)
New Revision: 2136
Modified:
mgmt/mint/sql/indexes.sql
Log:
recover indexes overwritten on earlier commit
Modified: mgmt/mint/sql/indexes.sql
===================================================================
--- mgmt/mint/sql/indexes.sql 2008-06-05 21:53:57 UTC (rev 2135)
+++ mgmt/mint/sql/indexes.sql 2008-06-05 22:05:26 UTC (rev 2136)
@@ -0,0 +1,7 @@
+create index queue_vhost_id_idx on queue (vhost_id);
+create index exchange_vhost_id_idx on exchange (vhost_id);
+create index client_vhost_id_idx on client_connection (vhost_id);
+
+create index queue_stats_queue_id_idx on queue_stats (queue_id);
+create index exchange_stats_exchange_id_idx on exchange_stats (exchange_id);
+create index client_stats_client_id_idx on client_connection_stats (client_connection_id);
16 years, 9 months
rhmessaging commits: r2135 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-05 17:53:57 -0400 (Thu, 05 Jun 2008)
New Revision: 2135
Modified:
mgmt/cumin/python/cumin/model.py
mgmt/cumin/python/cumin/queue.py
mgmt/cumin/python/cumin/queue.strings
Log:
Add journal statistics
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2008-06-05 18:28:27 UTC (rev 2134)
+++ mgmt/cumin/python/cumin/model.py 2008-06-05 21:53:57 UTC (rev 2135)
@@ -29,6 +29,8 @@
CuminConnection(self)
CuminSession(self)
CuminLink(self)
+ CuminStore(self)
+ CuminJournal(self)
CuminBrokerRegistration(self)
CuminBrokerGroup(self)
@@ -998,7 +1000,89 @@
def get_title(self, session):
return "Broker Link"
+
+class CuminStore(RemoteClass):
+ def __init__(self, model):
+ super(CuminStore, self).__init__(model, "store", Store, StoreStats)
+
+ prop = CuminProperty(self, "location")
+ prop.title = "Location"
+
+class CuminJournal(RemoteClass):
+ def __init__(self, model):
+ super(CuminJournal, self).__init__(model, "journal",
+ Journal, JournalStats)
+
+ prop = CuminProperty(self, "name")
+ prop.title = "Name"
+ prop.summary = True
+
+ prop = CuminProperty(self, "journalDirectory")
+ prop.title = "Directory"
+
+ stat = CuminStat(self, "initialFileCount")
+ stat.title = "Initial File Count"
+
+ stat = CuminStat(self, "dataFileSize")
+ stat.title = "Data File Size"
+ stat.unit = "byte"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "recordDepth")
+ stat.title = "Record Depth"
+ stat.unit = "record"
+ stat.category = "io.journal"
+ stat = CuminStat(self, "recordEnqueues")
+ stat.title = "Record Enqueues"
+ stat.unit = "record"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "outstandingAIOs")
+ stat.title = "Outstanding AIOs"
+ stat.unit = "aio"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "freeFileCount")
+ stat.title = "Free Files"
+ stat.unit = "file"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "availableFileCount")
+ stat.title = "Avail. Files"
+ stat.unit = "file"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "writeWaitFailures")
+ stat.title = "Write Wait Failures"
+ stat.unit = "failure"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "writeBusyFailures")
+ stat.title = "Write Busy Failures"
+ stat.unit = "failure"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "readRecordCount")
+ stat.title = "Read Records"
+ stat.unit = "record"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "readBusyFailures")
+ stat.title = "Read Busy Failures"
+ stat.unit = "failure"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "writePageCacheDepth")
+ stat.title = "Write Page Cache Depth"
+ stat.unit = "page"
+ stat.category = "io.journal"
+
+ stat = CuminStat(self, "readPageCacheDepth")
+ stat.title = "Read Page Cache Depth"
+ stat.unit = "page"
+ stat.category = "io.journal"
+
class CuminBrokerRegistration(LocalClass):
def __init__(self, model):
super(CuminBrokerRegistration, self).__init__ \
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2008-06-05 18:28:27 UTC (rev 2134)
+++ mgmt/cumin/python/cumin/queue.py 2008-06-05 21:53:57 UTC (rev 2135)
@@ -480,6 +480,7 @@
super(QueueStatsDurability, self).__init__(app, name)
self.add_child(StatSet(app, "io", "io.durable"))
+ self.add_child(JournalStats(app, "jrnl", "io.journal"))
chart = self.EnqueueDequeueRateChart(app, "enqdeq")
self.add_child(chart)
@@ -498,6 +499,26 @@
def render_title(self, session, queue):
return "Durable Messages Enqueued and Dequeued"
+class JournalStats(StatSet):
+ def get_args(self, session):
+ queue = self.frame.get_args(session)[0]
+
+ try:
+ jrnl = Journal.selectBy(queue=queue)[0]
+ except IndexError:
+ jrnl = None
+
+ return (jrnl,)
+
+ def render_title(self, session, jrnl):
+ return "Journal"
+
+ def do_render(self, session, jrnl):
+ if jrnl:
+ return super(JournalStats, self).do_render(session, jrnl)
+ else:
+ return "<div class=\"iblock\">%s</div>" % fmt_none()
+
class QueueStatsTransactions(Widget):
def __init__(self, app, name):
super(QueueStatsTransactions, self).__init__(app, name)
Modified: mgmt/cumin/python/cumin/queue.strings
===================================================================
--- mgmt/cumin/python/cumin/queue.strings 2008-06-05 18:28:27 UTC (rev 2134)
+++ mgmt/cumin/python/cumin/queue.strings 2008-06-05 21:53:57 UTC (rev 2135)
@@ -203,6 +203,9 @@
<td>
<h2>Durable Input/Output</h2>
{io}
+
+ <h2>Journal</h2>
+ {jrnl}
</td>
<td>
{enqdeq}
@@ -210,6 +213,20 @@
</tr>
</table>
+[JournalStats.html]
+<table id="{id}" class="StatSet">
+ <thead>
+ <tr>
+ <th style="width: 50%; text-align: left;">Statistic</th>
+ <th style="width: 25%;">Value</th>
+ <th style="width: 25%;">Per Second</th>
+ </tr>
+ </thead>
+ <tbody>
+ {items}
+ </tbody>
+</table>
+
[QueueStatsTransactions.html]
<table class="twocol">
<tr>
16 years, 9 months
rhmessaging commits: r2134 - in mgmt/mint: sql and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-06-05 14:28:27 -0400 (Thu, 05 Jun 2008)
New Revision: 2134
Modified:
mgmt/mint/python/mint/schema.py
mgmt/mint/sql/schema.sql
Log:
updated schema to reflect changes in store xml
Modified: mgmt/mint/python/mint/schema.py
===================================================================
--- mgmt/mint/python/mint/schema.py 2008-06-05 18:22:03 UTC (rev 2133)
+++ mgmt/mint/python/mint/schema.py 2008-06-05 18:28:27 UTC (rev 2134)
@@ -641,12 +641,12 @@
statsPrev = ForeignKey('JournalStats', cascade='null', default=None)
name = StringCol(length=1000, default=None)
queue = ForeignKey('Queue', cascade='null', default=None)
- journalDirectory = StringCol(length=1000, default=None)
- journalBaseFileName = StringCol(length=1000, default=None)
- journalWritePageSize = IntCol(default=None)
- journalWritePages = IntCol(default=None)
- journalReadPageSize = IntCol(default=None)
- journalReadPages = IntCol(default=None)
+ directory = StringCol(length=1000, default=None)
+ baseFileName = StringCol(length=1000, default=None)
+ writePageSize = IntCol(default=None)
+ writePages = IntCol(default=None)
+ readPageSize = IntCol(default=None)
+ readPages = IntCol(default=None)
classInfos = dict() # brokerId => classInfo
@@ -671,31 +671,31 @@
journal = ForeignKey('Journal', cascade='null', default=None)
initialFileCount = SmallIntCol(default=None)
dataFileSize = IntCol(default=None)
- journalCurrentFileCount = IntCol(default=None)
- journalRecordDepth = IntCol(default=None)
- journalRecordDepthLow = IntCol(default=None)
- journalRecordDepthHigh = IntCol(default=None)
- journalRecordEnqueues = BigIntCol(default=None)
- journalRecordDequeues = BigIntCol(default=None)
- journalOutstandingAIOs = IntCol(default=None)
- journalOutstandingAIOsLow = IntCol(default=None)
- journalOutstandingAIOsHigh = IntCol(default=None)
- journalFreeFileCount = IntCol(default=None)
- journalFreeFileCountLow = IntCol(default=None)
- journalFreeFileCountHigh = IntCol(default=None)
- journalAvailableFileCount = IntCol(default=None)
- journalAvailableFileCountLow = IntCol(default=None)
- journalAvailableFileCountHigh = IntCol(default=None)
- journalWriteWaitFailures = BigIntCol(default=None)
- journalWriteBusyFailures = BigIntCol(default=None)
- journalReadRecordCount = BigIntCol(default=None)
- journalReadBusyFailures = BigIntCol(default=None)
- journalWritePageCacheDepth = IntCol(default=None)
- journalWritePageCacheDepthLow = IntCol(default=None)
- journalWritePageCacheDepthHigh = IntCol(default=None)
- journalReadPageCacheDepth = IntCol(default=None)
- journalReadPageCacheDepthLow = IntCol(default=None)
- journalReadPageCacheDepthHigh = IntCol(default=None)
+ currentFileCount = IntCol(default=None)
+ recordDepth = IntCol(default=None)
+ recordDepthLow = IntCol(default=None)
+ recordDepthHigh = IntCol(default=None)
+ recordEnqueues = BigIntCol(default=None)
+ recordDequeues = BigIntCol(default=None)
+ outstandingAIOs = IntCol(default=None)
+ outstandingAIOsLow = IntCol(default=None)
+ outstandingAIOsHigh = IntCol(default=None)
+ freeFileCount = IntCol(default=None)
+ freeFileCountLow = IntCol(default=None)
+ freeFileCountHigh = IntCol(default=None)
+ availableFileCount = IntCol(default=None)
+ availableFileCountLow = IntCol(default=None)
+ availableFileCountHigh = IntCol(default=None)
+ writeWaitFailures = BigIntCol(default=None)
+ writeBusyFailures = BigIntCol(default=None)
+ readRecordCount = BigIntCol(default=None)
+ readBusyFailures = BigIntCol(default=None)
+ writePageCacheDepth = IntCol(default=None)
+ writePageCacheDepthLow = IntCol(default=None)
+ writePageCacheDepthHigh = IntCol(default=None)
+ readPageCacheDepth = IntCol(default=None)
+ readPageCacheDepthLow = IntCol(default=None)
+ readPageCacheDepthHigh = IntCol(default=None)
classInfos = dict() # brokerId => classInfo
Modified: mgmt/mint/sql/schema.sql
===================================================================
--- mgmt/mint/sql/schema.sql 2008-06-05 18:22:03 UTC (rev 2133)
+++ mgmt/mint/sql/schema.sql 2008-06-05 18:28:27 UTC (rev 2134)
@@ -226,12 +226,12 @@
stats_prev_id INT,
name VARCHAR(1000),
queue_id INT,
- journal_directory VARCHAR(1000),
- journal_base_file_name VARCHAR(1000),
- journal_write_page_size INT,
- journal_write_pages INT,
- journal_read_page_size INT,
- journal_read_pages INT
+ directory VARCHAR(1000),
+ base_file_name VARCHAR(1000),
+ write_page_size INT,
+ write_pages INT,
+ read_page_size INT,
+ read_pages INT
);
CREATE TABLE journal_stats (
@@ -241,31 +241,31 @@
journal_id INT,
initial_file_count SMALLINT,
data_file_size INT,
- journal_current_file_count INT,
- journal_record_depth INT,
- journal_record_depth_low INT,
- journal_record_depth_high INT,
- journal_record_enqueues BIGINT,
- journal_record_dequeues BIGINT,
- journal_outstanding_ai_os INT,
- journal_outstanding_ai_os_low INT,
- journal_outstanding_ai_os_high INT,
- journal_free_file_count INT,
- journal_free_file_count_low INT,
- journal_free_file_count_high INT,
- journal_available_file_count INT,
- journal_available_file_count_low INT,
- journal_available_file_count_high INT,
- journal_write_wait_failures BIGINT,
- journal_write_busy_failures BIGINT,
- journal_read_record_count BIGINT,
- journal_read_busy_failures BIGINT,
- journal_write_page_cache_depth INT,
- journal_write_page_cache_depth_low INT,
- journal_write_page_cache_depth_high INT,
- journal_read_page_cache_depth INT,
- journal_read_page_cache_depth_low INT,
- journal_read_page_cache_depth_high INT
+ current_file_count INT,
+ record_depth INT,
+ record_depth_low INT,
+ record_depth_high INT,
+ record_enqueues BIGINT,
+ record_dequeues BIGINT,
+ outstanding_ai_os INT,
+ outstanding_ai_os_low INT,
+ outstanding_ai_os_high INT,
+ free_file_count INT,
+ free_file_count_low INT,
+ free_file_count_high INT,
+ available_file_count INT,
+ available_file_count_low INT,
+ available_file_count_high INT,
+ write_wait_failures BIGINT,
+ write_busy_failures BIGINT,
+ read_record_count BIGINT,
+ read_busy_failures BIGINT,
+ write_page_cache_depth INT,
+ write_page_cache_depth_low INT,
+ write_page_cache_depth_high INT,
+ read_page_cache_depth INT,
+ read_page_cache_depth_low INT,
+ read_page_cache_depth_high INT
);
CREATE TABLE link (
16 years, 9 months
rhmessaging commits: r2133 - mgmt/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-06-05 14:22:03 -0400 (Thu, 05 Jun 2008)
New Revision: 2133
Added:
mgmt/cumin/COPYING-for-wsgiserver
mgmt/cumin/LICENSE-for-wsgiserver
Modified:
mgmt/cumin/Makefile
Log:
Add copying and license files for the wsgiserver code from CherryPy
Added: mgmt/cumin/COPYING-for-wsgiserver
===================================================================
--- mgmt/cumin/COPYING-for-wsgiserver (rev 0)
+++ mgmt/cumin/COPYING-for-wsgiserver 2008-06-05 18:22:03 UTC (rev 2133)
@@ -0,0 +1,2 @@
+Copyright (c) 2004-2007, CherryPy Team (team(a)cherrypy.org)
+All rights reserved.
Added: mgmt/cumin/LICENSE-for-wsgiserver
===================================================================
--- mgmt/cumin/LICENSE-for-wsgiserver (rev 0)
+++ mgmt/cumin/LICENSE-for-wsgiserver 2008-06-05 18:22:03 UTC (rev 2133)
@@ -0,0 +1,22 @@
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+ * Neither the name of the CherryPy Team nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Modified: mgmt/cumin/Makefile
===================================================================
--- mgmt/cumin/Makefile 2008-06-05 12:14:23 UTC (rev 2132)
+++ mgmt/cumin/Makefile 2008-06-05 18:22:03 UTC (rev 2133)
@@ -21,7 +21,7 @@
install -d ${BIN_DIR}
install -pm 0755 bin/* ${BIN_DIR}
install -d ${doc}
- install -pm 0644 LICENSE COPYING ${doc}
+ install -pm 0644 LICENSE* COPYING* ${doc}
install -d ${share}/resources
install -pm 0644 resources/* ${share}/resources
install -d ${etc}
16 years, 9 months