rhmessaging commits: r3455 - mgmt/trunk/wooly/python/wooly.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-06-17 15:33:19 -0400 (Wed, 17 Jun 2009)
New Revision: 3455
Modified:
mgmt/trunk/wooly/python/wooly/__init__.py
Log:
* Add a ListAttribute class for that fairly common case
* Add a 'test' method to widget for testing assertions and reporting
which widget failures come from
* Refactor debug output on page failure; make it less verbose, with
more of the stuff we care about
Modified: mgmt/trunk/wooly/python/wooly/__init__.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/__init__.py 2009-06-17 17:58:35 UTC (rev 3454)
+++ mgmt/trunk/wooly/python/wooly/__init__.py 2009-06-17 19:33:19 UTC (rev 3455)
@@ -59,6 +59,13 @@
def __repr__(self):
return "%s('%s')" % (self.__class__.__name__, self.path)
+class ListAttribute(Attribute):
+ def get_default(self, session):
+ return list()
+
+ def add(self, session, value, key=None):
+ self.get(session).append(value)
+
class Parameter(Attribute):
def __init__(self, app, name):
super(Parameter, self).__init__(app, name)
@@ -114,10 +121,17 @@
self.__main_tmpl = Template(self, "html")
self.__defer_tmpl = Template(self, "deferred_html")
+ def test(self, expr, message=None):
+ if not expr:
+ if not message:
+ message = "Correctness breached"
+
+ raise Exception("[%s] %s" % (self, message))
+
def init(self):
#print "Initializing %s" % str(self)
- assert not self.sealed
+ self.test(not self.sealed)
ancestors = list()
widget = self.parent
@@ -139,7 +153,7 @@
self.page = self.ancestors[-1]
- assert isinstance(self.page, Page)
+ self.test(isinstance(self.page, Page))
self.page.init_widget(self)
@@ -158,21 +172,25 @@
child.init()
def add_child(self, child):
- assert not self.sealed
- assert child is not None
- assert child is not self
+ self.test(not self.sealed)
+ self.test(isinstance(child, Widget))
+ self.test(child is not self)
self.children.append(child)
self.children_by_name[child.name] = child
child.parent = self
def add_attribute(self, attribute):
- assert not self.sealed
+ self.test(not self.sealed)
+ self.test(isinstance(attribute, Attribute))
+
self.attributes.append(attribute)
attribute.widget = self
def add_parameter(self, parameter):
- assert not self.sealed
+ self.test(not self.sealed)
+ self.test(isinstance(parameter, Parameter))
+
self.parameters.append(parameter)
parameter.widget = self
@@ -206,7 +224,7 @@
return str
def show(self, session):
- assert self.parent
+ self.test(self.parent)
self.parent.show_child(session, self)
self.parent.show(session)
@@ -345,8 +363,8 @@
self.add_attribute(self.redirect)
def init(self):
- assert not self.sealed
- assert self.parent is None
+ self.test(not self.sealed)
+ self.test(self.parent is None)
self.ancestors = ()
self.path = ""
@@ -359,15 +377,15 @@
self.sealed = True
def init_widget(self, widget):
- assert not self.sealed
- assert isinstance(widget, Widget)
+ self.test(not self.sealed)
+ self.test(isinstance(widget, Widget))
self.page_widgets.append(widget)
self.page_widgets_by_path[widget.path] = widget
def init_parameter(self, param):
- assert not self.sealed
- assert isinstance(param, Parameter)
+ self.test(not self.sealed)
+ self.test(isinstance(param, Parameter))
self.page_parameters.append(param)
self.page_parameters_by_path[param.path] = param
@@ -497,14 +515,34 @@
self.processed_list = list()
self.rendered_list = list()
- def print_process_calls(self, out=sys.stdout):
+ def write(self, writer):
+ writer.write(str(self.session))
+ writer.write(os.linesep)
+
+ for item in sorted(self.session.values_by_path.items()):
+ writer.write(" %s = %s" % item)
+ writer.write(os.linesep)
+
if self.process_stack:
- self.process_stack[0].write(out)
+ writer.write(os.linesep)
+ writer.write("process trace:")
+ writer.write(os.linesep)
- def print_render_calls(self, out=sys.stdout):
+ for call in self.process_stack:
+ writer.write(" ")
+ call.write(writer)
+ writer.write(os.linesep)
+
if self.render_stack:
- self.render_stack[0].write(out)
+ writer.write(os.linesep)
+ writer.write("render trace:")
+ writer.write(os.linesep)
+ for call in self.render_stack:
+ writer.write(" ")
+ call.write(writer)
+ writer.write(os.linesep)
+
def print_last_call(self, out=sys.stdout):
if self.render_stack:
self.render_stack[-1].write(out)
@@ -871,7 +909,6 @@
self.widget = widget
self.session = session
self.session_values = copy(session.values_by_path)
- self.object = object
self.caller = None
self.callees = list()
@@ -891,36 +928,19 @@
def close(self):
self.end = clock()
- if len(self.stack) > 1:
- self.stack.pop()
+ #if len(self.stack) > 1:
+ self.stack.pop()
def write(self, writer):
writer.write(str(self.widget))
- writer.write(" (call %i, caller %i)" % (id(self), id(self.caller)))
- writer.write(os.linesep)
+ writer.write(" [%i]" % id(self))
- writer.write(" session: " + str(self.session))
- writer.write(os.linesep)
-
- for item in sorted(self.session_values.items()):
- writer.write(" value: %s = %s" % item)
- writer.write(os.linesep)
-
- writer.write(" object: " + str(self.object))
- writer.write(os.linesep)
-
- writer.write(" times: %f, %f" % (self.start, self.end or -1))
- writer.write(os.linesep)
-
- for call in self.callees:
- call.write(writer)
-
class FullPageNotify(Widget):
def __init__(self, app, name):
super(FullPageNotify, self).__init__(app, name)
self.fullpage = Parameter(app, "fullpage")
- self.add_attribute(self.fullpage)
+ self.add_attribute(self.fullpage)
def get_fullpage(self, session):
return self.fullpage.get(session)
16 years, 10 months
rhmessaging commits: r3454 - mgmt/trunk/parsley/python/parsley.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-06-17 13:58:35 -0400 (Wed, 17 Jun 2009)
New Revision: 3454
Modified:
mgmt/trunk/parsley/python/parsley/loggingex.py
Log:
Make this work with python 2.6
Modified: mgmt/trunk/parsley/python/parsley/loggingex.py
===================================================================
--- mgmt/trunk/parsley/python/parsley/loggingex.py 2009-06-17 13:43:53 UTC (rev 3453)
+++ mgmt/trunk/parsley/python/parsley/loggingex.py 2009-06-17 17:58:35 UTC (rev 3454)
@@ -1,11 +1,11 @@
-from logging import *
+import logging
levels = {
- "debug": DEBUG,
- "info": INFO,
- "warn": WARN,
- "error": ERROR,
- "critical": CRITICAL
+ "debug": logging.DEBUG,
+ "info": logging.INFO,
+ "warn": logging.WARN,
+ "error": logging.ERROR,
+ "critical": logging.CRITICAL
}
def enable_logging(name, level, file):
@@ -18,11 +18,11 @@
if type(file) is str:
file = open(file, "a")
- handler = StreamHandler(file)
+ handler = logging.StreamHandler(file)
fmt = "%(process)d %(asctime)s %(levelname)s %(message)s"
- handler.setFormatter(Formatter(fmt))
+ handler.setFormatter(logging.Formatter(fmt))
- log = getLogger(name)
+ log = logging.getLogger(name)
log.addHandler(handler)
log.setLevel(level)
16 years, 10 months
rhmessaging commits: r3453 - mgmt/trunk/wooly/resources.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-06-17 09:43:53 -0400 (Wed, 17 Jun 2009)
New Revision: 3453
Modified:
mgmt/trunk/wooly/resources/wooly.js
Log:
Clean up whitespace; handle no-content updates
Modified: mgmt/trunk/wooly/resources/wooly.js
===================================================================
--- mgmt/trunk/wooly/resources/wooly.js 2009-06-16 20:20:52 UTC (rev 3452)
+++ mgmt/trunk/wooly/resources/wooly.js 2009-06-17 13:43:53 UTC (rev 3453)
@@ -231,7 +231,7 @@
child = child.nextSibling;
}
return null;
- }
+ }
function copyNode(node) {
switch (node.nodeType) {
@@ -325,17 +325,24 @@
if (oldElem) {
var newElem = child.firstChild;
- if (newElem.nodeType != 1) {
+ while (newElem && newElem.nodeType != 1) {
newElem = newElem.nextSibling;
}
- var updateId = newElem.getAttribute("update");
- // only update a sub-block of html
- if (updateId) {
- oldElem = document.getElementById(updateId);
- newElem = xmlGetElementById(newElem, updateId);
+ if (newElem) {
+ // only update a sub-block of html
+
+ var updateId = newElem.getAttribute("update");
+
+ if (updateId) {
+ oldElem = document.getElementById(updateId);
+ newElem = xmlGetElementById(newElem, updateId);
+ }
+
+ replaceNode(newElem, oldElem);
+ } else {
+ oldElem.parentNode.removeChild(oldElem);
}
- replaceNode(newElem, oldElem);
} else {
wooly.log("Element '" + id + "' not found");
}
@@ -406,7 +413,7 @@
this.intervalUpdateInfo.interval = interval;
this.intervalUpdateInfo.passback = passback;
}
-
+
function update() {
try {
if (req.readyState == 4 && req.status == 200) {
@@ -441,7 +448,7 @@
this.restartIntervalUpdate = function (url) {
this.cancelIntervalUpdate();
- this.setIntervalUpdate(url, this.intervalUpdateInfo.callback,
+ this.setIntervalUpdate(url, this.intervalUpdateInfo.callback,
this.intervalUpdateInfo.interval,
this.intervalUpdateInfo.passback);
}
@@ -451,16 +458,16 @@
}
this.resumeIntervalUpdate = function () {
- wooly.setIntervalUpdate(wooly.intervalUpdateInfo.url,
- wooly.intervalUpdateInfo.callback,
+ wooly.setIntervalUpdate(wooly.intervalUpdateInfo.url,
+ wooly.intervalUpdateInfo.callback,
wooly.intervalUpdateInfo.interval,
wooly.intervalUpdateInfo.passback);
}
this.doubleIntervalUpdate = function () {
wooly.cancelIntervalUpdate();
wooly.intervalUpdateInfo.interval *= 2;
- wooly.setIntervalUpdate(wooly.intervalUpdateInfo.url,
- wooly.intervalUpdateInfo.callback,
+ wooly.setIntervalUpdate(wooly.intervalUpdateInfo.url,
+ wooly.intervalUpdateInfo.callback,
wooly.intervalUpdateInfo.interval,
wooly.intervalUpdateInfo.passback);
}
16 years, 10 months
rhmessaging commits: r3452 - in store/trunk/cpp/tests: python_tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2009-06-16 16:20:52 -0400 (Tue, 16 Jun 2009)
New Revision: 3452
Modified:
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/run_python_tests
Log:
New transactional flow-to-disk tests added, also a reorganization of the python flow-to-disk tests. Still a few things to do and tidy up, though.
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-11 18:53:52 UTC (rev 3451)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-16 20:20:52 UTC (rev 3452)
@@ -28,6 +28,54 @@
class FlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
+ XA_OK = 0
+ tx_counter = 0
+
+ # --- Helper functions ---
+
+ def _browse(self, qn, dt, am, num_msgs, msg_size, txnConsume):
+ txid = None
+ if txnConsume:
+ txid = self._makeXid("consumer-xid-%s" % qn)
+ self.session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+ self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = self.session.incoming(dt)
+ ids = RangedSet()
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
+ msg = queue.get(timeout=5)
+ self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+ return ids, txid
+
+ def _checkCancel(self, qn, dt, num_msgs, ids):
+ self.session.message_release(ids)
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(num_msgs, self.session.queue_query(queue=qn).message_count)
+ self.session.message_cancel(destination=dt)
+
+ def _checkConsume(self, qn, am, num_msgs, ids, txid, txnConsume):
+ if am == self.session.acquire_mode.not_acquired:
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(num_msgs, self.session.queue_query(queue=qn).message_count)
+ response = self.session.message_acquire(ids)
+ for range_ in ids:
+ for msg_id in range_:
+ self.assert_(msg_id in response.transfers)
+ self.session.message_accept(ids)
+ if txnConsume:
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+ self._resetChannel()
+
+ def _checkEmpty(self, qn):
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
+
def _makeMessage(self, msgCnt, msgSize):
msg = "Message-%04d" % msgCnt
msgLen = len(msg)
@@ -39,149 +87,161 @@
msg += chr(ord('a') + (i % 26))
return msg
- def test_FlowToDisk_01_SimpleMaxCountTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_01_SimpleMaxCountTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
+ def _makeXid(self, txid):
+ self.tx_counter += 1
+ branchqual = "v%s" % self.tx_counter
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
- def test_FlowToDisk_02_SimpleMaxCountPersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_02_SimpleMaxCountPersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
+ def _produce(self, qn, dm, num_msgs, msg_size, txnProduce):
+ if txnProduce:
+ txid = self._makeXid("producer-xid-%s" % qn)
+ self.session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+ for msg_num in range(0, num_msgs):
+ msg_str = self._makeMessage(msg_num, msg_size)
+ self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn, delivery_mode=dm), msg_str))
+ if txnProduce:
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+ self._resetChannel()
- def test_FlowToDisk_03_SimpleMaxSizeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_03_SimpleMaxSizeTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
+ def _resetChannel(self):
+ self.session.close()
+ self.session = self.conn.session("test-session", 1)
- def test_FlowToDisk_04_SimpleMaxSizePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_04_SimpleMaxSizePersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
+ # --- Simple tests ---
- def test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_00_SimpleMaxCountTransient(self):
+ self.simple_limit("test_FlowToDisk_00_SimpleMaxCountTransient", max_count = 10)
- def test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_01_SimpleMaxCountPersistent(self):
+ self.simple_limit("test_FlowToDisk_01_SimpleMaxCountPersistent", max_count = 10, persistent = True)
- def test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_02_SimpleMaxSizeTransient(self):
+ self.simple_limit("test_FlowToDisk_02_SimpleMaxSizeTransient", max_size = 100)
- def test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_03_SimpleMaxSizePersistent(self):
+ self.simple_limit("test_FlowToDisk_03_SimpleMaxSizePersistent", max_size = 100, persistent = True)
- def test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_04_SimpleMaxCountTransientLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_04_SimpleMaxCountTransientLargeMsg", max_count = 10, max_size = 10000000, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_05_SimpleMaxCountPersistentLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_05_SimpleMaxCountPersistentLargeMsg", max_count = 10, max_size = 10000000, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_06_SimpleMaxSizeTransientLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_06_SimpleMaxSizeTransientLargeMsg", max_size = 100, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_07_SimpleMaxSizePersistentLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_07_SimpleMaxSizePersistentLargeMsg", max_size = 100, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_08_SimpleMaxCountTransientNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_08_SimpleMaxCountTransientNotAcquired", max_count = 10, pre_acquired = False)
- def test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_09_SimpleMaxCountPersistentNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_09_SimpleMaxCountPersistentNotAcquired", max_count = 10, persistent = True, pre_acquired = False)
- def test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_10_SimpleMaxSizeTransientNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_10_SimpleMaxSizeTransientNotAcquired", max_size = 100, pre_acquired = False)
- def test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_11_SimpleMaxSizePersistentNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_11_SimpleMaxSizePersistentNotAcquired", max_size = 100, persistent = True, pre_acquired = False)
- def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode, num_msgs = 15, msg_size = None):
+ def test_FlowToDisk_12_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_12_SimpleMaxCountTransientNotAcquiredLargeMsg", max_count = 10, max_size = 10000000, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def test_FlowToDisk_13_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_13_SimpleMaxCountPersistentNotAcquiredLargeMsg", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def test_FlowToDisk_14_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_14_SimpleMaxSizeTransientNotAcquiredLargeMsg", max_size = 100, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def test_FlowToDisk_15_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_15_SimpleMaxSizePersistentNotAcquiredLargeMsg", max_size = 100, persistent = True, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def simple_limit(self, qn, max_count = None, max_size = None, persistent = False, pre_acquired = True, num_msgs = 15, msg_size = None):
+ qa = {'qpid.policy_type':'flow_to_disk'}
+ if max_count != None:
+ qa['qpid.max_count'] = max_count
+ if max_size != None:
+ qa['qpid.max_size'] = max_size
+ if persistent:
+ dm = self.session.delivery_mode.persistent
+ else:
+ dm = self.session.delivery_mode.non_persistent
+ if pre_acquired:
+ am = self.session.acquire_mode.pre_acquired
+ else:
+ am = self.session.acquire_mode.not_acquired
+ # Cycle through the produce/consume block transaction combinations
+ for i in range(0, 4):
+ tp = i & 1 != 0 # Transactional produce
+ tc = i & 2 != 0 # Transactional consume
+ self.tx_simple_limit(qn, qa, dm, am, num_msgs, msg_size, tp, tc)
+
+ def tx_simple_limit(self, qn, qa, dm, am, num_msgs, msg_size, tp, tc):
"""
Test a simple case of message limits which will force flow-to-disk.
- * queue_args sets a limit - either max_count 10 or max_size 100
- * 15 messages of size 10 are added. The last five will flow to disk.
- * Consume 15 messages.
+ * queue_args sets a limit - either max_count and/or max_size
+ * messages are added. Some will flow to disk.
+ * Consume all messages sent.
* Check the broker has no messages left.
"""
-
- session = self.session
- session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
-
- # Add 15 messages
- for msg_num in range(0, num_msgs):
- msg_str = self._makeMessage(msg_num, msg_size)
- session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
-
- # Consume/browse 15 messages
- session.message_subscribe(queue=queue_name, destination="tag", acquire_mode=acquire_mode)
- session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
- queue = session.incoming("tag")
- ids = RangedSet()
- for msg_num in range(0, num_msgs):
- expected_str = self._makeMessage(msg_num, msg_size)
- msg = queue.get(timeout=5)
- self.assertEqual(expected_str, msg.body)
- ids.add(msg.id)
-
- # If not_acquired, chek messages are still on queue, then acquire/accept
- if acquire_mode == self.session.acquire_mode.not_acquired:
- session.queue_declare(queue=queue_name)
- self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
- response = session.message_acquire(ids)
- for range_ in ids:
- for msg_id in range_:
- self.assert_(msg_id in response.transfers)
- session.message_accept(ids)
+ self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+
+ # --- Add messages ---
+ self._produce(qn, dm, num_msgs, msg_size, tp)
+
+ # --- Browse messages, then consume ---
+ dt = "tag-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dt, am, num_msgs, msg_size, tc)
+ self._checkConsume(qn, am, num_msgs, ids, txid, tc)
+ self._checkEmpty(qn)
- # Check queue is empty
- session.queue_declare(queue=queue_name)
- self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
+ def test_FlowToDisk_50_MaxCountBrowseConsumeTransient(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_50_MaxCountBrowseConsumeTransient", max_count = 10)
- def test_FlowToDisk_17_MaxCountBrowseConsumeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_17_MaxCountBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+ def test_FlowToDisk_51_MaxCountBrowseConsumePersistent(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_51_MaxCountBrowseConsumePersistent", max_count = 10, persistent = True)
- def test_FlowToDisk_18_MaxCountBrowseConsumePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_18_MaxCountBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+ def test_FlowToDisk_52_MaxSizeBrowseConsumeTransient(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_52_MaxSizeBrowseConsumeTransient", max_size = 100)
- def test_FlowToDisk_19_MaxSizeBrowseConsumeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_19_MaxSizeBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+ def test_FlowToDisk_53_MaxSizeBrowseConsumePersistent(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_53_MaxSizeBrowseConsumePersistent", max_size = 100, persistent = True)
- def test_FlowToDisk_20_MaxSizeBrowseConsumePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_20_MaxSizeBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+ def test_FlowToDisk_54_MaxCountBrowseConsumeTransientLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_54_MaxCountBrowseConsumeTransientLargeMsg", max_count = 10, max_size = 10000000, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+ def test_FlowToDisk_55_MaxCountBrowseConsumePersistentLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_55_MaxCountBrowseConsumePersistentLargeMsg", max_count = 10, max_size = 10000000, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+ def test_FlowToDisk_56_MaxSizeBrowseConsumeTransientLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_56_MaxSizeBrowseConsumeTransientLargeMsg", max_size = 100, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+ def test_FlowToDisk_57_MaxSizeBrowseConsumePersistentLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_57_MaxSizeBrowseConsumePersistentLargeMsg", max_size = 100, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+ def not_acquired_browse_consume_limit(self, qn, max_count = None, max_size = None, persistent = False, num_msgs = 15, msg_size = None):
+ qa = {'qpid.policy_type':'flow_to_disk'}
+ if max_count != None:
+ qa['qpid.max_count'] = max_count
+ if max_size != None:
+ qa['qpid.max_size'] = max_size
+ if persistent:
+ dm = self.session.delivery_mode.persistent
+ else:
+ dm = self.session.delivery_mode.non_persistent
+ # Cycle through the produce/consume block transaction combinations
+ for i in range(0, 4):
+ tp = i & 1 != 0 # Transactional produce
+ tc = i & 2 != 0 # Transactional consume
+ self.tx_not_acquired_browse_consume_limit(qn, qa, dm, num_msgs, msg_size, tp, tc)
-
- def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode, num_msgs = 15, msg_size = None):
+ def tx_not_acquired_browse_consume_limit(self, qn, qa, dm, num_msgs, msg_size, tp, tc):
"""
Test to check browsing then subsequent consumption of flow-to-disk messages.
* 15 messages of size 10 are added. The last five will flow to disk.
@@ -190,46 +250,20 @@
* Consumes 15 messages
* Checks the broker has no messages left.
"""
-
- session = self.session
- session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
-
+
+ self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+ am = self.session.acquire_mode.not_acquired
+
# Add 15 messages
- for msg_num in range(0, num_msgs):
- msg_str = self._makeMessage(msg_num, msg_size)
- session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
-
- # Browse 15 messages
- session.message_subscribe(queue=queue_name, destination="tagA", acquire_mode=session.acquire_mode.not_acquired)
- session.message_flow(destination="tagA", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
- queue = session.incoming("tagA")
- ids = RangedSet()
- for msg_num in range(0, num_msgs):
- expected_str = self._makeMessage(msg_num, msg_size)
- msg = queue.get(timeout=5)
- self.assertEqual(expected_str, msg.body)
- ids.add(msg.id)
-
- # Release all 15 messages and close
- session.message_release(ids)
- session.queue_declare(queue=queue_name)
- self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
-
- # Cancel subscription, start new one that consumes
- session.message_cancel(destination="tagA")
- session.message_subscribe(queue=queue_name, destination="tagB", acquire_mode=session.acquire_mode.pre_acquired)
- session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
- queue = session.incoming("tagB")
- ids = RangedSet()
- for msg_num in range(0, num_msgs):
- expected_str = self._makeMessage(msg_num, msg_size)
- msg = queue.get(timeout=5)
- self.assertEqual(expected_str, msg.body)
- ids.add(msg.id)
- session.message_accept(ids)
-
- # Check queue is empty
- session.queue_declare(queue=queue_name)
- self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
+ self._produce(qn, dm, num_msgs, msg_size, tp)
+
+ # Browse 15 messages, then release and close
+ dtA = "tagA-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtA, am, num_msgs, msg_size, False)
+ self._checkCancel(qn, dtA, num_msgs, ids)
+
+ # --- Browse messages, then consume ---
+ dtB = "tagB-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtB, am, num_msgs, msg_size, tc)
+ self._checkConsume(qn, am, num_msgs, ids, txid, tc)
+ self._checkEmpty(qn)
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2009-06-11 18:53:52 UTC (rev 3451)
+++ store/trunk/cpp/tests/run_python_tests 2009-06-16 20:20:52 UTC (rev 3452)
@@ -56,7 +56,7 @@
exit
fi
-BROKER_OPTS="--no-module-dir --load-module=${LIBSTORE} --data-dir=${TMP_STORE_DIR} --auth=no"
+BROKER_OPTS="--no-module-dir --load-module=${LIBSTORE} --data-dir=${TMP_STORE_DIR} --auth=no --log-enable info+ --log-to-file ${TMP_STORE_DIR}/broker.python-test.log"
AMQP_SPEC=0-10-errata
#Make sure temp dir exists if this is the first to use it
16 years, 10 months
rhmessaging commits: r3451 - in store/trunk/cpp: tests/cluster and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2009-06-11 14:53:52 -0400 (Thu, 11 Jun 2009)
New Revision: 3451
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/cluster/run_cluster_tests
store/trunk/cpp/tests/python_tests/flow_to_disk.py
Log:
Fix for BZ505274 - "Large durable messages that 'flow to disk', are not recovered correctly".
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-06-11 18:53:52 UTC (rev 3451)
@@ -253,7 +253,7 @@
#define MAX_AIO_SLEEPS 1000 // 10 sec
#define AIO_SLEEP_TIME 10000 // 10 ms
bool
-JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length)
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
{
if (_dtok.rid() != rid)
{
@@ -321,13 +321,14 @@
throw jexception(journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
}
}
- if (_external)
- return false;
- u_int32_t offset = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
- if (offset + length > _dlen) {
- data.append((const char*)_datap + offset, _dlen - offset);
+
+ if (_external) return false;
+
+ u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
+ if (hdr_offs + offset + length > _dlen) {
+ data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
} else {
- data.append((const char*)_datap + offset, length);
+ data.append((const char*)_datap + hdr_offs + offset, length);
}
return true;
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/JournalImpl.h 2009-06-11 18:53:52 UTC (rev 3451)
@@ -151,7 +151,7 @@
// Temporary fn to read and save last msg read from journal so it can be assigned
// in chunks. To be replaced when coding to do this direct from the journal is ready.
// Returns true if the record is extern, false if local.
- bool loadMsgContent(u_int64_t rid, std::string& data, size_t length);
+ bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-06-11 18:53:52 UTC (rev 3451)
@@ -1335,7 +1335,7 @@
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc && jc->is_enqueued(messageId) ) {
- if (jc->loadMsgContent(messageId, data, length)) {
+ if (jc->loadMsgContent(messageId, data, length, offset)) {
return;
}
}
Modified: store/trunk/cpp/tests/cluster/run_cluster_tests
===================================================================
--- store/trunk/cpp/tests/cluster/run_cluster_tests 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/tests/cluster/run_cluster_tests 2009-06-11 18:53:52 UTC (rev 3451)
@@ -61,7 +61,7 @@
export SENDER_EXEC=${QPID_DIR}/cpp/src/tests/sender
else
# Path from known installed locations
- CLUSTER_PATH=/usr/libexec/qpid/tests/cluster_test
+ CLUSTER_PATH=/usr/libexec/qpid/tests/${CPP_CLUSTER_EXEC}
if test -z ${CLUSTER_PATH} ; then
echo "No executable \"${CPP_CLUSTER_EXEC}\" found in path"
exit 1
@@ -73,8 +73,8 @@
export CLUSTER_LIB=/usr/lib/qpid/daemon/cluster.so
export QPID_CONFIG_EXEC=/usr/bin/qpid-config
export QPID_ROUTE_EXEC=/usr/bin/qpid-route
- export RECEIVER_EXEC=/usr/libexec/qpid/test/receiver
- export SENDER_EXEC=/usr/libexec/qpid/test/sender
+ export RECEIVER_EXEC=/usr/libexec/qpid/tests/receiver
+ export SENDER_EXEC=/usr/libexec/qpid/tests/sender
fi
export STORE_LIB=${abs_srcdir}/../../lib/.libs/msgstore.so
@@ -99,7 +99,7 @@
Unable to load python qpid module - skipping python tests.
- PYTHONPATH=${PYTHONPATH}"
+ PYTHONPATH=${PYTHONPATH}
===========================================================
@@ -114,8 +114,8 @@
mkdir -p ${TMP_STORE_DIR}/cluster
else
# Delete old cluster test dirs
- rm -rf "${TMP_STORE_DIR}/cluster"
- mkdir -p "${TMP_STORE_DIR}/cluster"
+ rm -rf ${TMP_STORE_DIR}/cluster
+ mkdir -p ${TMP_STORE_DIR}/cluster
fi
export TMP_STORE_DIR
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-11 16:00:38 UTC (rev 3450)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-11 18:53:52 UTC (rev 3451)
@@ -28,12 +28,23 @@
class FlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
+ def _makeMessage(self, msgCnt, msgSize):
+ msg = "Message-%04d" % msgCnt
+ msgLen = len(msg)
+ if msgSize != None and msgSize > msgLen:
+ for i in range(msgLen, msgSize):
+ if i == msgLen:
+ msg += "-"
+ else:
+ msg += chr(ord('a') + (i % 26))
+ return msg
+
def test_FlowToDisk_01_SimpleMaxCountTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
self.simple_limit("test_FlowToDisk_01_SimpleMaxCountTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
def test_FlowToDisk_02_SimpleMaxCountPersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
self.simple_limit("test_FlowToDisk_02_SimpleMaxCountPersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
def test_FlowToDisk_03_SimpleMaxSizeTransient(self):
@@ -44,23 +55,55 @@
queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
self.simple_limit("test_FlowToDisk_04_SimpleMaxSizePersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
- def test_FlowToDisk_05_SimpleMaxCountTransientNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
- self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
- def test_FlowToDisk_06_SimpleMaxCountPersistentNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
- self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
- def test_FlowToDisk_07_SimpleMaxSizeTransientNotAcquired(self):
+ def test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg(self):
queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+ self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
- def test_FlowToDisk_08_SimpleMaxSizePersistentNotAcquired(self):
+ def test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg(self):
queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+ self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
- def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode):
+ def test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.simple_limit("test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.simple_limit("test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+ self.simple_limit("test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+ self.simple_limit("test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+
+ def test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.simple_limit("test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+ def test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.simple_limit("test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+ def test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+ self.simple_limit("test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+ def test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+ self.simple_limit("test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+
+ def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode, num_msgs = 15, msg_size = None):
"""
Test a simple case of message limits which will force flow-to-disk.
* queue_args sets a limit - either max_count 10 or max_size 100
@@ -73,8 +116,8 @@
session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
# Add 15 messages
- for msg_num in range(0, 15):
- msg_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ msg_str = self._makeMessage(msg_num, msg_size)
session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
# Consume/browse 15 messages
@@ -83,8 +126,8 @@
session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("tag")
ids = RangedSet()
- for msg_num in range(0, 15):
- expected_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
ids.add(msg.id)
@@ -92,36 +135,53 @@
# If not_acquired, chek messages are still on queue, then acquire/accept
if acquire_mode == self.session.acquire_mode.not_acquired:
session.queue_declare(queue=queue_name)
- self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+ self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
response = session.message_acquire(ids)
for range_ in ids:
for msg_id in range_:
self.assert_(msg_id in response.transfers)
- session.message_accept(ids)
+
+ session.message_accept(ids)
# Check queue is empty
session.queue_declare(queue=queue_name)
self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
- def test_FlowToDisk_09_MaxCountBrowseConsumeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_09_MaxCountBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+ def test_FlowToDisk_17_MaxCountBrowseConsumeTransient(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_17_MaxCountBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
- def test_FlowToDisk_10_MaxCountBrowseConsumePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_10_MaxCountBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+ def test_FlowToDisk_18_MaxCountBrowseConsumePersistent(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_18_MaxCountBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
- def test_FlowToDisk_11_MaxSizeBrowseConsumeTransient(self):
+ def test_FlowToDisk_19_MaxSizeBrowseConsumeTransient(self):
queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_11_MaxSizeBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_19_MaxSizeBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
- def test_FlowToDisk_12_MaxSizeBrowseConsumePersistent(self):
+ def test_FlowToDisk_20_MaxSizeBrowseConsumePersistent(self):
queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_12_MaxSizeBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_20_MaxSizeBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+
+ def test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+
+ def test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+
+ def test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+
+ def test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg(self):
+ queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
- def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode):
+ def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode, num_msgs = 15, msg_size = None):
"""
Test to check browsing then subsequent consumption of flow-to-disk messages.
* 15 messages of size 10 are added. The last five will flow to disk.
@@ -135,8 +195,8 @@
session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
# Add 15 messages
- for msg_num in range(0, 15):
- msg_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ msg_str = self._makeMessage(msg_num, msg_size)
session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
# Browse 15 messages
@@ -145,8 +205,8 @@
session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("tagA")
ids = RangedSet()
- for msg_num in range(0, 15):
- expected_str = "Message %02d" % msg_num
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
ids.add(msg.id)
@@ -154,7 +214,7 @@
# Release all 15 messages and close
session.message_release(ids)
session.queue_declare(queue=queue_name)
- self.assertEqual(15, session.queue_query(queue=queue_name).message_count)
+ self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
# Cancel subscription, start new one that consumes
session.message_cancel(destination="tagA")
@@ -162,10 +222,13 @@
session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("tagB")
- for msg_num in range(0, 15):
- expected_str = "Message %02d" % msg_num
+ ids = RangedSet()
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
msg = queue.get(timeout=5)
self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+ session.message_accept(ids)
# Check queue is empty
session.queue_declare(queue=queue_name)
16 years, 10 months
rhmessaging commits: r3450 - mgmt/trunk/cumin/python/cumin/grid.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2009-06-11 12:00:38 -0400 (Thu, 11 Jun 2009)
New Revision: 3450
Modified:
mgmt/trunk/cumin/python/cumin/grid/pool.strings
Log:
Move Job and Slot graphs under Pool stats.
Modified: mgmt/trunk/cumin/python/cumin/grid/pool.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/pool.strings 2009-06-11 15:55:15 UTC (rev 3449)
+++ mgmt/trunk/cumin/python/cumin/grid/pool.strings 2009-06-11 16:00:38 UTC (rev 3450)
@@ -59,12 +59,12 @@
<div class="col1">
<h2>Stats for Pool {pool_name}</h2>
{slot_stats}
- {grid_stats}
+ <div>{jobs}</div>
+ <div>{slots}</div>
</div>
<div class="col2">
<div>{pool_slot_map}</div>
- <div>{jobs}</div>
- <div>{slots}</div>
+ {grid_stats}
</div>
</div>
<div style="clear:left;"><!-- --></div>
16 years, 10 months
rhmessaging commits: r3449 - store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2009-06-11 11:55:15 -0400 (Thu, 11 Jun 2009)
New Revision: 3449
Modified:
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventCreated.cpp
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventEnqThresholdExceeded.cpp
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventFull.cpp
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventRecovered.cpp
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp
store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp
Log:
Updated qmf-generated files
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventCreated.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventCreated.cpp 2009-06-11 15:50:49 UTC (rev 3448)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventCreated.cpp 2009-06-11 15:55:15 UTC (rev 3449)
@@ -70,6 +70,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (eventName); // Event Name
buf.putBin128 (md5Sum); // Schema Hash
+ buf.putOctet (0); // No Superclass
buf.putShort (3); // Argument Count
// Arguments
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventEnqThresholdExceeded.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventEnqThresholdExceeded.cpp 2009-06-11 15:50:49 UTC (rev 3448)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventEnqThresholdExceeded.cpp 2009-06-11 15:55:15 UTC (rev 3449)
@@ -68,6 +68,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (eventName); // Event Name
buf.putBin128 (md5Sum); // Schema Hash
+ buf.putOctet (0); // No Superclass
buf.putShort (2); // Argument Count
// Arguments
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventFull.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventFull.cpp 2009-06-11 15:50:49 UTC (rev 3448)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventFull.cpp 2009-06-11 15:55:15 UTC (rev 3449)
@@ -68,6 +68,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (eventName); // Event Name
buf.putBin128 (md5Sum); // Schema Hash
+ buf.putOctet (0); // No Superclass
buf.putShort (2); // Argument Count
// Arguments
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventRecovered.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventRecovered.cpp 2009-06-11 15:50:49 UTC (rev 3448)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/EventRecovered.cpp 2009-06-11 15:55:15 UTC (rev 3449)
@@ -78,6 +78,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (eventName); // Event Name
buf.putBin128 (md5Sum); // Schema Hash
+ buf.putOctet (0); // No Superclass
buf.putShort (7); // Argument Count
// Arguments
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp 2009-06-11 15:50:49 UTC (rev 3448)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp 2009-06-11 15:55:15 UTC (rev 3449)
@@ -127,6 +127,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
+ buf.putOctet (0); // No Superclass
buf.putShort (13); // Config Element Count
buf.putShort (29); // Inst Element Count
buf.putShort (1); // Method Count
Modified: store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp
===================================================================
--- store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp 2009-06-11 15:50:49 UTC (rev 3448)
+++ store/trunk/cpp/lib/gen/qmf/com/redhat/rhm/store/Store.cpp 2009-06-11 15:55:15 UTC (rev 3449)
@@ -111,6 +111,7 @@
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
+ buf.putOctet (0); // No Superclass
buf.putShort (11); // Config Element Count
buf.putShort (9); // Inst Element Count
buf.putShort (0); // Method Count
16 years, 10 months
rhmessaging commits: r3448 - mgmt/trunk/cumin/python/cumin/grid.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2009-06-11 11:50:49 -0400 (Thu, 11 Jun 2009)
New Revision: 3448
Modified:
mgmt/trunk/cumin/python/cumin/grid/pool.py
Log:
Prevent error during background update of GridStatSet
Modified: mgmt/trunk/cumin/python/cumin/grid/pool.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/pool.py 2009-06-11 13:55:11 UTC (rev 3447)
+++ mgmt/trunk/cumin/python/cumin/grid/pool.py 2009-06-11 15:50:49 UTC (rev 3448)
@@ -403,6 +403,11 @@
def get_args(self, session):
return self.parent.get_grid_args(session)
+ def render(self, session):
+ grid = self.parent.get_grid_args(session)
+ if grid.count() > 0:
+ return super(PoolStats.GridStats.GridStatSet, self).render(session)
+
class PoolSlotMap(SlotMap):
def get_title_name(self, session, pool):
return pool.name
16 years, 10 months
rhmessaging commits: r3447 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2009-06-11 09:55:11 -0400 (Thu, 11 Jun 2009)
New Revision: 3447
Modified:
mgmt/trunk/cumin/python/cumin/charts.py
mgmt/trunk/cumin/python/cumin/stat.py
Log:
Indicate sample interval in charts where we're rolling up values
Modified: mgmt/trunk/cumin/python/cumin/charts.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/charts.py 2009-06-11 13:29:47 UTC (rev 3446)
+++ mgmt/trunk/cumin/python/cumin/charts.py 2009-06-11 13:55:11 UTC (rev 3447)
@@ -137,10 +137,11 @@
self.surface.write_to_png(writer)
class TimeSeriesChart(object):
- def __init__(self, width, height):
+ def __init__(self, width, height, interval=0):
self.width = width - 40
self.height = height - 20
- self.surface = ImageSurface(FORMAT_ARGB32, width, height)
+ real_height = height + ((interval > 10) and 12 or 0)
+ self.surface = ImageSurface(FORMAT_ARGB32, width, real_height)
self.surface.set_device_offset(1.5, 5.5)
self.x_max = 1
self.x_min = 0
@@ -253,6 +254,16 @@
cr.stroke()
+ def plot_interval(self, interval):
+ if interval > 10:
+ cr = Context(self.surface)
+ cr.set_line_width(0.2)
+ cr.set_source_rgb(0.2, 0.2, 0.2)
+
+ cr.move_to(2, self.height + 24)
+ cr.show_text("* Samples averaged over %i second interval" % interval)
+ cr.stroke()
+
def plot_x_axis(self, intervals, step):
cr = Context(self.surface)
cr.set_line_width(0.2)
Modified: mgmt/trunk/cumin/python/cumin/stat.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/stat.py 2009-06-11 13:29:47 UTC (rev 3446)
+++ mgmt/trunk/cumin/python/cumin/stat.py 2009-06-11 13:55:11 UTC (rev 3447)
@@ -415,8 +415,12 @@
if interval != -1:
return interval
else:
- max_samples = int(width * 1.5)
- return max(int(duration / max_samples), 1)
+ mode = self.mode.get(session)
+ if mode == "rate":
+ return 1
+ else:
+ max_samples = int(width * 1.5)
+ return max(int(duration / max_samples), 1)
def render_samples(self, session, recent):
c = {(1,0,0): "red", (0,0,1): "blue", (0,1,0): "green"}
@@ -450,17 +454,17 @@
if cached_png:
return cached_png
- width = self.container_width.get(session)
- height = self.container_height.get(session)
- chart = TimeSeriesChart(width, height)
-
samples = dict()
values = dict()
+ width = self.container_width.get(session)
+ height = self.container_height.get(session)
mode = self.mode.get(session)
duration = self.duration.get(session)
interval = self.get_interval(session, duration, width)
+ chart = TimeSeriesChart(width, height, interval=interval)
+
if mode == "rate":
method = None # don't do averaging
for stat in stats:
@@ -523,6 +527,7 @@
chart.plot_ticks(samples[stat], color=color)
chart.plot_frame()
+ chart.plot_interval(interval)
if mode == "rate":
titles = ["%s / sec" % x.title for x in stats]
16 years, 10 months
rhmessaging commits: r3446 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2009-06-11 09:29:47 -0400 (Thu, 11 Jun 2009)
New Revision: 3446
Modified:
store/trunk/cpp/tests/MessageUtils.h
Log:
Fix frame flags where content will follow header
Modified: store/trunk/cpp/tests/MessageUtils.h
===================================================================
--- store/trunk/cpp/tests/MessageUtils.h 2009-06-11 13:10:56 UTC (rev 3445)
+++ store/trunk/cpp/tests/MessageUtils.h 2009-06-11 13:29:47 UTC (rev 3446)
@@ -32,12 +32,13 @@
struct MessageUtils
{
static boost::intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey,
- const Uuid& messageId=Uuid(), uint64_t contentSize = 0)
+ const Uuid& messageId=Uuid(), uint64_t contentSize = 0)
{
boost::intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
+ header.setLastSegment(contentSize == 0);
msg->getFrames().append(method);
msg->getFrames().append(header);
16 years, 10 months