[rhmessaging-commits] rhmessaging commits: r3452 - in store/trunk/cpp/tests: python_tests and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Jun 16 16:20:52 EDT 2009
Author: kpvdr
Date: 2009-06-16 16:20:52 -0400 (Tue, 16 Jun 2009)
New Revision: 3452
Modified:
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/run_python_tests
Log:
New transactional flow-to-disk tests added, also a reorganization of the python flow-to-disk tests. Still a few things to do and tidy up, though.
Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-11 18:53:52 UTC (rev 3451)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2009-06-16 20:20:52 UTC (rev 3452)
@@ -28,6 +28,54 @@
class FlowToDiskTests(TestBase010):
"""Tests for async store flow-to-disk"""
+ XA_OK = 0
+ tx_counter = 0
+
+ # --- Helper functions ---
+
+ def _browse(self, qn, dt, am, num_msgs, msg_size, txnConsume):
+ txid = None
+ if txnConsume:
+ txid = self._makeXid("consumer-xid-%s" % qn)
+ self.session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+ self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+ self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
+ queue = self.session.incoming(dt)
+ ids = RangedSet()
+ for msg_num in range(0, num_msgs):
+ expected_str = self._makeMessage(msg_num, msg_size)
+ msg = queue.get(timeout=5)
+ self.assertEqual(expected_str, msg.body)
+ ids.add(msg.id)
+ return ids, txid
+
+ def _checkCancel(self, qn, dt, num_msgs, ids):
+ self.session.message_release(ids)
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(num_msgs, self.session.queue_query(queue=qn).message_count)
+ self.session.message_cancel(destination=dt)
+
+ def _checkConsume(self, qn, am, num_msgs, ids, txid, txnConsume):
+ if am == self.session.acquire_mode.not_acquired:
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(num_msgs, self.session.queue_query(queue=qn).message_count)
+ response = self.session.message_acquire(ids)
+ for range_ in ids:
+ for msg_id in range_:
+ self.assert_(msg_id in response.transfers)
+ self.session.message_accept(ids)
+ if txnConsume:
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+ self._resetChannel()
+
+ def _checkEmpty(self, qn):
+ self.session.queue_declare(queue=qn)
+ self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
+
def _makeMessage(self, msgCnt, msgSize):
msg = "Message-%04d" % msgCnt
msgLen = len(msg)
@@ -39,149 +87,161 @@
msg += chr(ord('a') + (i % 26))
return msg
- def test_FlowToDisk_01_SimpleMaxCountTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_01_SimpleMaxCountTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
+ def _makeXid(self, txid):
+ self.tx_counter += 1
+ branchqual = "v%s" % self.tx_counter
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
- def test_FlowToDisk_02_SimpleMaxCountPersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_02_SimpleMaxCountPersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
+ def _produce(self, qn, dm, num_msgs, msg_size, txnProduce):
+ if txnProduce:
+ txid = self._makeXid("producer-xid-%s" % qn)
+ self.session.dtx_select()
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+ for msg_num in range(0, num_msgs):
+ msg_str = self._makeMessage(msg_num, msg_size)
+ self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn, delivery_mode=dm), msg_str))
+ if txnProduce:
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+ self._resetChannel()
- def test_FlowToDisk_03_SimpleMaxSizeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_03_SimpleMaxSizeTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
+ def _resetChannel(self):
+ self.session.close()
+ self.session = self.conn.session("test-session", 1)
- def test_FlowToDisk_04_SimpleMaxSizePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_04_SimpleMaxSizePersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
+ # --- Simple tests ---
- def test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_00_SimpleMaxCountTransient(self):
+ self.simple_limit("test_FlowToDisk_00_SimpleMaxCountTransient", max_count = 10)
- def test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_01_SimpleMaxCountPersistent(self):
+ self.simple_limit("test_FlowToDisk_01_SimpleMaxCountPersistent", max_count = 10, persistent = True)
- def test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_02_SimpleMaxSizeTransient(self):
+ self.simple_limit("test_FlowToDisk_02_SimpleMaxSizeTransient", max_size = 100)
- def test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+ def test_FlowToDisk_03_SimpleMaxSizePersistent(self):
+ self.simple_limit("test_FlowToDisk_03_SimpleMaxSizePersistent", max_size = 100, persistent = True)
- def test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_04_SimpleMaxCountTransientLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_04_SimpleMaxCountTransientLargeMsg", max_count = 10, max_size = 10000000, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_05_SimpleMaxCountPersistentLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_05_SimpleMaxCountPersistentLargeMsg", max_count = 10, max_size = 10000000, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_06_SimpleMaxSizeTransientLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_06_SimpleMaxSizeTransientLargeMsg", max_size = 100, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+ def test_FlowToDisk_07_SimpleMaxSizePersistentLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_07_SimpleMaxSizePersistentLargeMsg", max_size = 100, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_08_SimpleMaxCountTransientNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_08_SimpleMaxCountTransientNotAcquired", max_count = 10, pre_acquired = False)
- def test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.simple_limit("test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_09_SimpleMaxCountPersistentNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_09_SimpleMaxCountPersistentNotAcquired", max_count = 10, persistent = True, pre_acquired = False)
- def test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_10_SimpleMaxSizeTransientNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_10_SimpleMaxSizeTransientNotAcquired", max_size = 100, pre_acquired = False)
- def test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.simple_limit("test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+ def test_FlowToDisk_11_SimpleMaxSizePersistentNotAcquired(self):
+ self.simple_limit("test_FlowToDisk_11_SimpleMaxSizePersistentNotAcquired", max_size = 100, persistent = True, pre_acquired = False)
- def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode, num_msgs = 15, msg_size = None):
+ def test_FlowToDisk_12_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_12_SimpleMaxCountTransientNotAcquiredLargeMsg", max_count = 10, max_size = 10000000, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def test_FlowToDisk_13_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_13_SimpleMaxCountPersistentNotAcquiredLargeMsg", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def test_FlowToDisk_14_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_14_SimpleMaxSizeTransientNotAcquiredLargeMsg", max_size = 100, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def test_FlowToDisk_15_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
+ self.simple_limit("test_FlowToDisk_15_SimpleMaxSizePersistentNotAcquiredLargeMsg", max_size = 100, persistent = True, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+ def simple_limit(self, qn, max_count = None, max_size = None, persistent = False, pre_acquired = True, num_msgs = 15, msg_size = None):
+ qa = {'qpid.policy_type':'flow_to_disk'}
+ if max_count != None:
+ qa['qpid.max_count'] = max_count
+ if max_size != None:
+ qa['qpid.max_size'] = max_size
+ if persistent:
+ dm = self.session.delivery_mode.persistent
+ else:
+ dm = self.session.delivery_mode.non_persistent
+ if pre_acquired:
+ am = self.session.acquire_mode.pre_acquired
+ else:
+ am = self.session.acquire_mode.not_acquired
+ # Cycle through the produce/consume block transaction combinations
+ for i in range(0, 4):
+ tp = i & 1 != 0 # Transactional produce
+ tc = i & 2 != 0 # Transactional consume
+ self.tx_simple_limit(qn, qa, dm, am, num_msgs, msg_size, tp, tc)
+
+ def tx_simple_limit(self, qn, qa, dm, am, num_msgs, msg_size, tp, tc):
"""
Test a simple case of message limits which will force flow-to-disk.
- * queue_args sets a limit - either max_count 10 or max_size 100
- * 15 messages of size 10 are added. The last five will flow to disk.
- * Consume 15 messages.
+ * queue_args sets a limit - either max_count and/or max_size
+ * messages are added. Some will flow to disk.
+ * Consume all messages sent.
* Check the broker has no messages left.
"""
-
- session = self.session
- session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
-
- # Add 15 messages
- for msg_num in range(0, num_msgs):
- msg_str = self._makeMessage(msg_num, msg_size)
- session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
-
- # Consume/browse 15 messages
- session.message_subscribe(queue=queue_name, destination="tag", acquire_mode=acquire_mode)
- session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
- queue = session.incoming("tag")
- ids = RangedSet()
- for msg_num in range(0, num_msgs):
- expected_str = self._makeMessage(msg_num, msg_size)
- msg = queue.get(timeout=5)
- self.assertEqual(expected_str, msg.body)
- ids.add(msg.id)
-
- # If not_acquired, chek messages are still on queue, then acquire/accept
- if acquire_mode == self.session.acquire_mode.not_acquired:
- session.queue_declare(queue=queue_name)
- self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
- response = session.message_acquire(ids)
- for range_ in ids:
- for msg_id in range_:
- self.assert_(msg_id in response.transfers)
- session.message_accept(ids)
+ self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+
+ # --- Add messages ---
+ self._produce(qn, dm, num_msgs, msg_size, tp)
+
+ # --- Browse messages, then consume ---
+ dt = "tag-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dt, am, num_msgs, msg_size, tc)
+ self._checkConsume(qn, am, num_msgs, ids, txid, tc)
+ self._checkEmpty(qn)
- # Check queue is empty
- session.queue_declare(queue=queue_name)
- self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
+ def test_FlowToDisk_50_MaxCountBrowseConsumeTransient(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_50_MaxCountBrowseConsumeTransient", max_count = 10)
- def test_FlowToDisk_17_MaxCountBrowseConsumeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_17_MaxCountBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+ def test_FlowToDisk_51_MaxCountBrowseConsumePersistent(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_51_MaxCountBrowseConsumePersistent", max_count = 10, persistent = True)
- def test_FlowToDisk_18_MaxCountBrowseConsumePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_18_MaxCountBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+ def test_FlowToDisk_52_MaxSizeBrowseConsumeTransient(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_52_MaxSizeBrowseConsumeTransient", max_size = 100)
- def test_FlowToDisk_19_MaxSizeBrowseConsumeTransient(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_19_MaxSizeBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+ def test_FlowToDisk_53_MaxSizeBrowseConsumePersistent(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_53_MaxSizeBrowseConsumePersistent", max_size = 100, persistent = True)
- def test_FlowToDisk_20_MaxSizeBrowseConsumePersistent(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_20_MaxSizeBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+ def test_FlowToDisk_54_MaxCountBrowseConsumeTransientLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_54_MaxCountBrowseConsumeTransientLargeMsg", max_count = 10, max_size = 10000000, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+ def test_FlowToDisk_55_MaxCountBrowseConsumePersistentLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_55_MaxCountBrowseConsumePersistentLargeMsg", max_count = 10, max_size = 10000000, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+ def test_FlowToDisk_56_MaxSizeBrowseConsumeTransientLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_56_MaxSizeBrowseConsumeTransientLargeMsg", max_size = 100, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+ def test_FlowToDisk_57_MaxSizeBrowseConsumePersistentLargeMsg(self):
+ self.not_acquired_browse_consume_limit("test_FlowToDisk_57_MaxSizeBrowseConsumePersistentLargeMsg", max_size = 100, persistent = True, num_msgs = 100, msg_size = 10000)
- def test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg(self):
- queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
- self.not_acquired_browse_consume_limit("test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+ def not_acquired_browse_consume_limit(self, qn, max_count = None, max_size = None, persistent = False, num_msgs = 15, msg_size = None):
+ qa = {'qpid.policy_type':'flow_to_disk'}
+ if max_count != None:
+ qa['qpid.max_count'] = max_count
+ if max_size != None:
+ qa['qpid.max_size'] = max_size
+ if persistent:
+ dm = self.session.delivery_mode.persistent
+ else:
+ dm = self.session.delivery_mode.non_persistent
+ # Cycle through the produce/consume block transaction combinations
+ for i in range(0, 4):
+ tp = i & 1 != 0 # Transactional produce
+ tc = i & 2 != 0 # Transactional consume
+ self.tx_not_acquired_browse_consume_limit(qn, qa, dm, num_msgs, msg_size, tp, tc)
-
- def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode, num_msgs = 15, msg_size = None):
+ def tx_not_acquired_browse_consume_limit(self, qn, qa, dm, num_msgs, msg_size, tp, tc):
"""
Test to check browsing then subsequent consumption of flow-to-disk messages.
* 15 messages of size 10 are added. The last five will flow to disk.
@@ -190,46 +250,20 @@
* Consumes 15 messages
* Checks the broker has no messages left.
"""
-
- session = self.session
- session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
-
+
+ self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+ am = self.session.acquire_mode.not_acquired
+
# Add 15 messages
- for msg_num in range(0, num_msgs):
- msg_str = self._makeMessage(msg_num, msg_size)
- session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
-
- # Browse 15 messages
- session.message_subscribe(queue=queue_name, destination="tagA", acquire_mode=session.acquire_mode.not_acquired)
- session.message_flow(destination="tagA", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
- queue = session.incoming("tagA")
- ids = RangedSet()
- for msg_num in range(0, num_msgs):
- expected_str = self._makeMessage(msg_num, msg_size)
- msg = queue.get(timeout=5)
- self.assertEqual(expected_str, msg.body)
- ids.add(msg.id)
-
- # Release all 15 messages and close
- session.message_release(ids)
- session.queue_declare(queue=queue_name)
- self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
-
- # Cancel subscription, start new one that consumes
- session.message_cancel(destination="tagA")
- session.message_subscribe(queue=queue_name, destination="tagB", acquire_mode=session.acquire_mode.pre_acquired)
- session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
- session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
- queue = session.incoming("tagB")
- ids = RangedSet()
- for msg_num in range(0, num_msgs):
- expected_str = self._makeMessage(msg_num, msg_size)
- msg = queue.get(timeout=5)
- self.assertEqual(expected_str, msg.body)
- ids.add(msg.id)
- session.message_accept(ids)
-
- # Check queue is empty
- session.queue_declare(queue=queue_name)
- self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
+ self._produce(qn, dm, num_msgs, msg_size, tp)
+
+ # Browse 15 messages, then release and close
+ dtA = "tagA-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtA, am, num_msgs, msg_size, False)
+ self._checkCancel(qn, dtA, num_msgs, ids)
+
+ # --- Browse messages, then consume ---
+ dtB = "tagB-%d-%d" % (tp, tc)
+ ids, txid = self._browse(qn, dtB, am, num_msgs, msg_size, tc)
+ self._checkConsume(qn, am, num_msgs, ids, txid, tc)
+ self._checkEmpty(qn)
Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests 2009-06-11 18:53:52 UTC (rev 3451)
+++ store/trunk/cpp/tests/run_python_tests 2009-06-16 20:20:52 UTC (rev 3452)
@@ -56,7 +56,7 @@
exit
fi
-BROKER_OPTS="--no-module-dir --load-module=${LIBSTORE} --data-dir=${TMP_STORE_DIR} --auth=no"
+BROKER_OPTS="--no-module-dir --load-module=${LIBSTORE} --data-dir=${TMP_STORE_DIR} --auth=no --log-enable info+ --log-to-file ${TMP_STORE_DIR}/broker.python-test.log"
AMQP_SPEC=0-10-errata
#Make sure temp dir exists if this is the first to use it
More information about the rhmessaging-commits
mailing list