[rhmessaging-commits] rhmessaging commits: r3452 - in store/trunk/cpp/tests: python_tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Jun 16 16:20:52 EDT 2009


Author: kpvdr
Date: 2009-06-16 16:20:52 -0400 (Tue, 16 Jun 2009)
New Revision: 3452

Modified:
   store/trunk/cpp/tests/python_tests/flow_to_disk.py
   store/trunk/cpp/tests/run_python_tests
Log:
New transactional flow-to-disk tests added, also a reorganization of the python flow-to-disk tests. Still a few things to do and tidy up, though.

Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py	2009-06-11 18:53:52 UTC (rev 3451)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py	2009-06-16 20:20:52 UTC (rev 3452)
@@ -28,6 +28,54 @@
 class FlowToDiskTests(TestBase010):
     """Tests for async store flow-to-disk"""
 
+    XA_OK = 0
+    tx_counter = 0
+
+    # --- Helper functions ---
+
+    def _browse(self, qn, dt, am, num_msgs, msg_size, txnConsume):
+        txid = None
+        if txnConsume:
+            txid = self._makeXid("consumer-xid-%s" % qn)
+            self.session.dtx_select()
+            self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+        self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
+        self.session.message_flow(destination=dt, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+        self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
+        queue = self.session.incoming(dt)
+        ids = RangedSet()
+        for msg_num in range(0, num_msgs):
+            expected_str = self._makeMessage(msg_num, msg_size)
+            msg = queue.get(timeout=5)
+            self.assertEqual(expected_str, msg.body)
+            ids.add(msg.id)
+        return ids, txid
+
+    def _checkCancel(self, qn, dt, num_msgs, ids):
+        self.session.message_release(ids)
+        self.session.queue_declare(queue=qn)
+        self.assertEqual(num_msgs, self.session.queue_query(queue=qn).message_count)
+        self.session.message_cancel(destination=dt)
+
+    def _checkConsume(self, qn, am, num_msgs, ids, txid, txnConsume):
+        if am == self.session.acquire_mode.not_acquired:
+            self.session.queue_declare(queue=qn)
+            self.assertEqual(num_msgs, self.session.queue_query(queue=qn).message_count)
+            response = self.session.message_acquire(ids)
+            for range_ in ids:
+                for msg_id in range_:
+                    self.assert_(msg_id in response.transfers)
+        self.session.message_accept(ids)
+        if txnConsume:
+            self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+            self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+            self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+        self._resetChannel()
+
+    def _checkEmpty(self, qn):
+        self.session.queue_declare(queue=qn)
+        self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
+
     def _makeMessage(self, msgCnt, msgSize):
         msg = "Message-%04d" % msgCnt
         msgLen = len(msg)
@@ -39,149 +87,161 @@
                     msg += chr(ord('a') + (i % 26))
         return msg
 
-    def test_FlowToDisk_01_SimpleMaxCountTransient(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_01_SimpleMaxCountTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
+    def _makeXid(self, txid):
+        self.tx_counter += 1
+        branchqual = "v%s" % self.tx_counter
+        return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
 
-    def test_FlowToDisk_02_SimpleMaxCountPersistent(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_02_SimpleMaxCountPersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
+    def _produce(self, qn, dm, num_msgs, msg_size, txnProduce):
+        if txnProduce:
+            txid = self._makeXid("producer-xid-%s" % qn)
+            self.session.dtx_select()
+            self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+        for msg_num in range(0, num_msgs):
+            msg_str = self._makeMessage(msg_num, msg_size)
+            self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn, delivery_mode=dm), msg_str))
+        if txnProduce:
+            self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+            self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+            self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+        self._resetChannel()
 
-    def test_FlowToDisk_03_SimpleMaxSizeTransient(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_03_SimpleMaxSizeTransient", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired)
+    def _resetChannel(self):
+        self.session.close()
+        self.session = self.conn.session("test-session", 1)
 
-    def test_FlowToDisk_04_SimpleMaxSizePersistent(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_04_SimpleMaxSizePersistent", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired)
+    # --- Simple tests ---
 
-    def test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_05_SimpleMaxCountTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+    def test_FlowToDisk_00_SimpleMaxCountTransient(self):
+        self.simple_limit("test_FlowToDisk_00_SimpleMaxCountTransient", max_count = 10)
 
-    def test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_06_SimpleMaxCountPersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+    def test_FlowToDisk_01_SimpleMaxCountPersistent(self):
+        self.simple_limit("test_FlowToDisk_01_SimpleMaxCountPersistent", max_count = 10, persistent = True)
 
-    def test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_07_SimpleMaxSizeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+    def test_FlowToDisk_02_SimpleMaxSizeTransient(self):
+        self.simple_limit("test_FlowToDisk_02_SimpleMaxSizeTransient", max_size = 100)
 
-    def test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_08_SimpleMaxSizePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.pre_acquired, 100, 10000)
+    def test_FlowToDisk_03_SimpleMaxSizePersistent(self):
+        self.simple_limit("test_FlowToDisk_03_SimpleMaxSizePersistent", max_size = 100, persistent = True)
 
-    def test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_09_SimpleMaxCountTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+    def test_FlowToDisk_04_SimpleMaxCountTransientLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_04_SimpleMaxCountTransientLargeMsg", max_count = 10, max_size = 10000000, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_10_SimpleMaxCountPersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+    def test_FlowToDisk_05_SimpleMaxCountPersistentLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_05_SimpleMaxCountPersistentLargeMsg", max_count = 10, max_size = 10000000, persistent = True, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_11_SimpleMaxSizeTransientNotAcquired", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired)
+    def test_FlowToDisk_06_SimpleMaxSizeTransientLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_06_SimpleMaxSizeTransientLargeMsg", max_size = 100, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_12_SimpleMaxSizePersistentNotAcquired", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired)
+    def test_FlowToDisk_07_SimpleMaxSizePersistentLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_07_SimpleMaxSizePersistentLargeMsg", max_size = 100, persistent = True, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_13_SimpleMaxCountTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+    def test_FlowToDisk_08_SimpleMaxCountTransientNotAcquired(self):
+        self.simple_limit("test_FlowToDisk_08_SimpleMaxCountTransientNotAcquired", max_count = 10, pre_acquired = False)
 
-    def test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.simple_limit("test_FlowToDisk_14_SimpleMaxCountPersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+    def test_FlowToDisk_09_SimpleMaxCountPersistentNotAcquired(self):
+        self.simple_limit("test_FlowToDisk_09_SimpleMaxCountPersistentNotAcquired", max_count = 10, persistent = True, pre_acquired = False)
 
-    def test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_15_SimpleMaxSizeTransientNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.non_persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+    def test_FlowToDisk_10_SimpleMaxSizeTransientNotAcquired(self):
+         self.simple_limit("test_FlowToDisk_10_SimpleMaxSizeTransientNotAcquired", max_size = 100, pre_acquired = False)
 
-    def test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.simple_limit("test_FlowToDisk_16_SimpleMaxSizePersistentNotAcquiredLargeMsg", queue_args, self.session.delivery_mode.persistent, self.session.acquire_mode.not_acquired, 100, 10000)
+    def test_FlowToDisk_11_SimpleMaxSizePersistentNotAcquired(self):
+        self.simple_limit("test_FlowToDisk_11_SimpleMaxSizePersistentNotAcquired", max_size = 100, persistent = True, pre_acquired = False)
 
-    def simple_limit(self, queue_name, queue_args, delivery_mode, acquire_mode, num_msgs = 15, msg_size = None):
+    def test_FlowToDisk_12_SimpleMaxCountTransientNotAcquiredLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_12_SimpleMaxCountTransientNotAcquiredLargeMsg", max_count = 10, max_size = 10000000, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+    def test_FlowToDisk_13_SimpleMaxCountPersistentNotAcquiredLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_13_SimpleMaxCountPersistentNotAcquiredLargeMsg", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+    def test_FlowToDisk_14_SimpleMaxSizeTransientNotAcquiredLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_14_SimpleMaxSizeTransientNotAcquiredLargeMsg", max_size = 100, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+    def test_FlowToDisk_15_SimpleMaxSizePersistentNotAcquiredLargeMsg(self):
+        self.simple_limit("test_FlowToDisk_15_SimpleMaxSizePersistentNotAcquiredLargeMsg", max_size = 100, persistent = True, pre_acquired = False, num_msgs = 100, msg_size = 10000)
+
+    def simple_limit(self, qn, max_count = None, max_size = None, persistent = False, pre_acquired = True, num_msgs = 15, msg_size = None):
+        qa = {'qpid.policy_type':'flow_to_disk'}
+        if max_count != None:
+            qa['qpid.max_count'] = max_count
+        if max_size != None:
+            qa['qpid.max_size'] = max_size
+        if persistent:
+            dm = self.session.delivery_mode.persistent
+        else:
+            dm = self.session.delivery_mode.non_persistent
+        if pre_acquired:
+            am = self.session.acquire_mode.pre_acquired
+        else:
+            am = self.session.acquire_mode.not_acquired
+        # Cycle through the produce/consume block transaction combinations
+        for i in range(0, 4):
+            tp = i & 1 != 0 # Transactional produce
+            tc = i & 2 != 0 # Transactional consume
+            self.tx_simple_limit(qn, qa, dm, am, num_msgs, msg_size, tp, tc)      
+
+    def tx_simple_limit(self, qn, qa, dm, am, num_msgs, msg_size, tp, tc):
         """
         Test a simple case of message limits which will force flow-to-disk.
-        * queue_args sets a limit - either max_count 10 or max_size 100
-        * 15 messages of size 10 are added. The last five will flow to disk.
-        * Consume 15 messages.
+        * queue_args sets a limit - either max_count and/or max_size
+        * messages are added. Some will flow to disk.
+        * Consume all messages sent.
         * Check the broker has no messages left.
         """
-
-        session = self.session
-        session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
-
-        # Add 15 messages
-        for msg_num in range(0, num_msgs):
-            msg_str = self._makeMessage(msg_num, msg_size)
-            session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
-
-        # Consume/browse 15 messages
-        session.message_subscribe(queue=queue_name, destination="tag", acquire_mode=acquire_mode)
-        session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
-        session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
-        queue = session.incoming("tag")
-        ids = RangedSet()
-        for msg_num in range(0, num_msgs):
-            expected_str = self._makeMessage(msg_num, msg_size)
-            msg = queue.get(timeout=5)
-            self.assertEqual(expected_str, msg.body)
-            ids.add(msg.id)
-
-        # If not_acquired, chek messages are still on queue, then acquire/accept
-        if acquire_mode == self.session.acquire_mode.not_acquired:
-            session.queue_declare(queue=queue_name)
-            self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
-            response = session.message_acquire(ids)
-            for range_ in ids:
-                for msg_id in range_:
-                    self.assert_(msg_id in response.transfers)
         
-        session.message_accept(ids)
+        self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+        
+        # --- Add messages ---
+        self._produce(qn, dm, num_msgs, msg_size, tp)
+        
+        # --- Browse messages, then consume ---
+        dt = "tag-%d-%d" % (tp, tc)
+        ids, txid = self._browse(qn, dt, am, num_msgs, msg_size, tc)
+        self._checkConsume(qn, am, num_msgs, ids, txid, tc)
+        self._checkEmpty(qn)
 
-        # Check queue is empty
-        session.queue_declare(queue=queue_name)
-        self.assertEqual(0, session.queue_query(queue=queue_name).message_count)        
 
+    def test_FlowToDisk_50_MaxCountBrowseConsumeTransient(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_50_MaxCountBrowseConsumeTransient", max_count = 10)
 
-    def test_FlowToDisk_17_MaxCountBrowseConsumeTransient(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_17_MaxCountBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+    def test_FlowToDisk_51_MaxCountBrowseConsumePersistent(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_51_MaxCountBrowseConsumePersistent", max_count = 10, persistent = True)
 
-    def test_FlowToDisk_18_MaxCountBrowseConsumePersistent(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_18_MaxCountBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+    def test_FlowToDisk_52_MaxSizeBrowseConsumeTransient(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_52_MaxSizeBrowseConsumeTransient", max_size = 100)
 
-    def test_FlowToDisk_19_MaxSizeBrowseConsumeTransient(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_19_MaxSizeBrowseConsumeTransient", queue_args, self.session.delivery_mode.non_persistent)
+    def test_FlowToDisk_53_MaxSizeBrowseConsumePersistent(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_53_MaxSizeBrowseConsumePersistent", max_size = 100, persistent = True)
 
-    def test_FlowToDisk_20_MaxSizeBrowseConsumePersistent(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_20_MaxSizeBrowseConsumePersistent", queue_args, self.session.delivery_mode.persistent)
+    def test_FlowToDisk_54_MaxCountBrowseConsumeTransientLargeMsg(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_54_MaxCountBrowseConsumeTransientLargeMsg", max_count = 10, max_size = 10000000, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_21_MaxCountBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+    def test_FlowToDisk_55_MaxCountBrowseConsumePersistentLargeMsg(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_55_MaxCountBrowseConsumePersistentLargeMsg", max_count = 10, max_size = 10000000, persistent = True, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_count': 10, 'qpid.max_size': 100000000}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_22_MaxCountBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+    def test_FlowToDisk_56_MaxSizeBrowseConsumeTransientLargeMsg(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_56_MaxSizeBrowseConsumeTransientLargeMsg", max_size = 100, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_23_MaxSizeBrowseConsumeTransientLargeMsg", queue_args, self.session.delivery_mode.non_persistent, 100, 10000)
+    def test_FlowToDisk_57_MaxSizeBrowseConsumePersistentLargeMsg(self):
+        self.not_acquired_browse_consume_limit("test_FlowToDisk_57_MaxSizeBrowseConsumePersistentLargeMsg", max_size = 100, persistent = True, num_msgs = 100, msg_size = 10000)
 
-    def test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg(self):
-        queue_args = {'qpid.policy_type':'flow_to_disk', 'qpid.max_size': 100}
-        self.not_acquired_browse_consume_limit("test_FlowToDisk_24_MaxSizeBrowseConsumePersistentLargeMsg", queue_args, self.session.delivery_mode.persistent, 100, 10000)
+    def not_acquired_browse_consume_limit(self, qn, max_count = None, max_size = None, persistent = False, num_msgs = 15, msg_size = None):
+        qa = {'qpid.policy_type':'flow_to_disk'}
+        if max_count != None:
+            qa['qpid.max_count'] = max_count
+        if max_size != None:
+            qa['qpid.max_size'] = max_size
+        if persistent:
+            dm = self.session.delivery_mode.persistent
+        else:
+            dm = self.session.delivery_mode.non_persistent
+        # Cycle through the produce/consume block transaction combinations
+        for i in range(0, 4):
+            tp = i & 1 != 0 # Transactional produce
+            tc = i & 2 != 0 # Transactional consume
+            self.tx_not_acquired_browse_consume_limit(qn, qa, dm, num_msgs, msg_size, tp, tc)
         
-
-    def not_acquired_browse_consume_limit(self, queue_name, queue_args, delivery_mode, num_msgs = 15, msg_size = None):
+    def tx_not_acquired_browse_consume_limit(self, qn, qa, dm, num_msgs, msg_size, tp, tc):
         """
         Test to check browsing then subsequent consumption of flow-to-disk messages.
         * 15 messages of size 10 are added. The last five will flow to disk.
@@ -190,46 +250,20 @@
         * Consumes 15 messages
         * Checks the broker has no messages left.
         """
-
-        session = self.session
-        session.queue_declare(queue=queue_name, durable=True, arguments=queue_args)
-
+        
+        self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+        am = self.session.acquire_mode.not_acquired
+        
         # Add 15 messages
-        for msg_num in range(0, num_msgs):
-            msg_str = self._makeMessage(msg_num, msg_size)
-            session.message_transfer(message=Message(session.delivery_properties(routing_key=queue_name, delivery_mode=delivery_mode), msg_str))
-
-        # Browse 15 messages
-        session.message_subscribe(queue=queue_name, destination="tagA", acquire_mode=session.acquire_mode.not_acquired)
-        session.message_flow(destination="tagA", unit=session.credit_unit.message, value=0xFFFFFFFF)
-        session.message_flow(destination="tagA", unit=session.credit_unit.byte, value=0xFFFFFFFF)
-        queue = session.incoming("tagA")
-        ids = RangedSet()
-        for msg_num in range(0, num_msgs):
-            expected_str = self._makeMessage(msg_num, msg_size)
-            msg = queue.get(timeout=5)
-            self.assertEqual(expected_str, msg.body)
-            ids.add(msg.id)
-
-        # Release all 15 messages and close
-        session.message_release(ids)
-        session.queue_declare(queue=queue_name)
-        self.assertEqual(num_msgs, session.queue_query(queue=queue_name).message_count)
-
-        # Cancel subscription, start new one that consumes
-        session.message_cancel(destination="tagA")
-        session.message_subscribe(queue=queue_name, destination="tagB", acquire_mode=session.acquire_mode.pre_acquired)
-        session.message_flow(destination="tagB", unit=session.credit_unit.message, value=0xFFFFFFFF)
-        session.message_flow(destination="tagB", unit=session.credit_unit.byte, value=0xFFFFFFFF)
-        queue = session.incoming("tagB")
-        ids = RangedSet()
-        for msg_num in range(0, num_msgs):
-            expected_str = self._makeMessage(msg_num, msg_size)
-            msg = queue.get(timeout=5)
-            self.assertEqual(expected_str, msg.body)
-            ids.add(msg.id)
-        session.message_accept(ids)
-
-        # Check queue is empty
-        session.queue_declare(queue=queue_name)
-        self.assertEqual(0, session.queue_query(queue=queue_name).message_count)
+        self._produce(qn, dm, num_msgs, msg_size, tp)
+        
+        # Browse 15 messages, then release and close
+        dtA = "tagA-%d-%d" % (tp, tc)
+        ids, txid = self._browse(qn, dtA, am, num_msgs, msg_size, False)
+        self._checkCancel(qn, dtA, num_msgs, ids)
+        
+        # --- Browse messages, then consume ---
+        dtB = "tagB-%d-%d" % (tp, tc)
+        ids, txid = self._browse(qn, dtB, am, num_msgs, msg_size, tc)
+        self._checkConsume(qn, am, num_msgs, ids, txid, tc)
+        self._checkEmpty(qn)

Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests	2009-06-11 18:53:52 UTC (rev 3451)
+++ store/trunk/cpp/tests/run_python_tests	2009-06-16 20:20:52 UTC (rev 3452)
@@ -56,7 +56,7 @@
     exit
 fi
 
-BROKER_OPTS="--no-module-dir --load-module=${LIBSTORE} --data-dir=${TMP_STORE_DIR} --auth=no"
+BROKER_OPTS="--no-module-dir --load-module=${LIBSTORE} --data-dir=${TMP_STORE_DIR} --auth=no --log-enable info+ --log-to-file ${TMP_STORE_DIR}/broker.python-test.log"
 AMQP_SPEC=0-10-errata
 
 #Make sure temp dir exists if this is the first to use it




More information about the rhmessaging-commits mailing list