[rhmessaging-commits] rhmessaging commits: r3640 - in store/trunk/cpp: lib/jrnl and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Sep 22 14:17:46 EDT 2009


Author: kpvdr
Date: 2009-09-22 14:17:46 -0400 (Tue, 22 Sep 2009)
New Revision: 3640

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/python_tests/flow_to_disk.py
   store/trunk/cpp/tests/run_python_tests
Log:
Store tests for new flow-to-disk functionality for qpid r817748.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2009-09-22 14:02:42 UTC (rev 3639)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2009-09-22 18:17:46 UTC (rev 3640)
@@ -252,6 +252,8 @@
 
 #define MAX_AIO_SLEEPS 1000  // 10 sec
 #define AIO_SLEEP_TIME 10000 // 10 ms
+// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
+// Throw exception for all errors.
 bool
 JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
 {

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-09-22 14:02:42 UTC (rev 3639)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-09-22 18:17:46 UTC (rev 3640)
@@ -1474,15 +1474,15 @@
             JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
             if (txn->getXid().empty()) {
                 if (message->isContentReleased()) {
-                    jc->enqueue_extern_data_record(size, dtokp.get(), false);
+                    jc->enqueue_extern_data_record(size, dtokp.get(), !message->isPersistent());
                 } else {
-                    jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+                    jc->enqueue_data_record(buff, size, size, dtokp.get(), !message->isPersistent());
                 }
             } else {
                 if (message->isContentReleased()) {
-                    jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+                    jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), !message->isPersistent());
                 } else {
-                    jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+                    jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), !message->isPersistent());
                 }
             }
         } else {

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2009-09-22 14:02:42 UTC (rev 3639)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2009-09-22 18:17:46 UTC (rev 3640)
@@ -87,7 +87,7 @@
             page_state _state;          ///< Status of page
             u_int64_t _frid;            ///< First rid in page (used for fhdr init)
             u_int32_t _wdblks;          ///< Total number of dblks in page so far
-            u_int32_t _rdblks;          ///< Total number of dblks in page so far
+            u_int32_t _rdblks;          ///< Total number of dblks in page
             std::deque<data_tok*>* _pdtokl; ///< Page message tokens list
             fcntl* _wfh;                ///< File handle for incrementing write compl counts
             fcntl* _rfh;                ///< File handle for incrementing read compl counts

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2009-09-22 14:02:42 UTC (rev 3639)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2009-09-22 18:17:46 UTC (rev 3640)
@@ -377,7 +377,8 @@
         store.stage(pmsg);
 
         //append to it
-        msg->releaseContent(&store);//ensure that data is not held in memory but is appended to disk when added
+        msg->setStore(&store);
+        msg->releaseContent();//ensure that data is not held in memory but is appended to disk when added
         store.appendContent(cpmsg, data1);
         store.appendContent(cpmsg, data2);
 

Modified: store/trunk/cpp/tests/python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py	2009-09-22 14:02:42 UTC (rev 3639)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py	2009-09-22 18:17:46 UTC (rev 3640)
@@ -34,16 +34,16 @@
 
     # --- Helper functions ---
 
-    def _browse(self, qn, dt, am, numMsgs, msgSize, txnConsume):
+    def _browse(self, queueName, destinationTag, acquireMode, numMsgs, msgSize, txnConsume):
         txid = None
         if txnConsume:
-            txid = self._makeXid("consumer-xid-%s" % qn)
+            txid = self._makeXid("consumer-xid-%s" % queueName)
             self.session.dtx_select()
             self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
-        self.session.message_subscribe(queue=qn, destination=dt, acquire_mode=am)
-        self.session.message_flow(destination=dt, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
-        self.session.message_flow(destination=dt, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
-        queue = self.session.incoming(dt)
+        self.session.message_subscribe(queue=queueName, destination=destinationTag, acquire_mode=acquireMode)
+        self.session.message_flow(destination=destinationTag, unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+        self.session.message_flow(destination=destinationTag, unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
+        queue = self.session.incoming(destinationTag)
         ids = RangedSet()
         for msgNum in range(0, numMsgs):
             expectedStr = self._makeMessage(msgNum, msgSize)
@@ -52,16 +52,16 @@
             ids.add(msg.id)
         return ids, txid
 
-    def _checkCancel(self, qn, dt, numMsgs, ids):
+    def _checkCancel(self, queueName, destinationTag, numMsgs, ids):
         self.session.message_release(ids)
-        self.session.queue_declare(queue=qn)
-        self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
-        self.session.message_cancel(destination=dt)
+        self.session.queue_declare(queue=queueName)
+        self.assertEqual(numMsgs, self.session.queue_query(queue=queueName).message_count)
+        self.session.message_cancel(destination=destinationTag)
 
-    def _checkConsume(self, qn, am, numMsgs, ids, txid, txnConsume):
-        if am == self.session.acquire_mode.not_acquired:
-            self.session.queue_declare(queue=qn)
-            self.assertEqual(numMsgs, self.session.queue_query(queue=qn).message_count)
+    def _checkConsume(self, queueName, acquireMode, numMsgs, ids, txid, txnConsume):
+        if acquireMode == self.session.acquire_mode.not_acquired:
+            self.session.queue_declare(queue=queueName)
+            self.assertEqual(numMsgs, self.session.queue_query(queue=queueName).message_count)
             response = self.session.message_acquire(ids)
             for range in ids:
                 for msgId in range:
@@ -73,10 +73,15 @@
             self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
         self._resetChannel()
 
-    def _checkEmpty(self, qn):
-        self.session.queue_declare(queue=qn)
-        self.assertEqual(0, self.session.queue_query(queue=qn).message_count)
+    def _checkEmpty(self, queueName):
+        self.session.queue_declare(queue=queueName)
+        self.assertEqual(0, self.session.queue_query(queue=queueName).message_count)
 
+    def _handleSessionClose(self):
+        time.sleep(1)
+        self.tearDown()
+        self.setUp()
+
     def _makeMessage(self, msgCnt, msgSize):
         msg = "Message-%04d" % msgCnt
         msgLen = len(msg)
@@ -93,19 +98,44 @@
         branchqual = "v%s" % self.txCounter
         return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
 
-    def _produce(self, qn, dm, numMsgs, msgSize, txnProduce):
+    def _produceDirect(self, queueName, deliveryMode, numMsgs, msgSize, txnProduce):
         if txnProduce:
-            txid = self._makeXid("producer-xid-%s" % qn)
+            txid = self._makeXid("producer-xid-%s" % queueName)
             self.session.dtx_select()
             self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
         for msgNum in range(0, numMsgs):
             msg_str = self._makeMessage(msgNum, msgSize)
-            self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=qn, delivery_mode=dm), msg_str))
+            self.session.message_transfer(message=Message(self.session.delivery_properties(routing_key=queueName, delivery_mode=deliveryMode), msg_str))
         if txnProduce:
             self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
             self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
             self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
         self._resetChannel()
+     
+    def _produceFanout(self, exchangeName, deliveryMode, numMsgs, msgSize, txnProduce, fail_on_msg):
+        if txnProduce:
+            txid = self._makeXid("producer-xid-%s" % exchangeName)
+            self.session.dtx_select()
+            self.assertEqual(self.XA_OK, self.session.dtx_start(xid=txid).status)
+        try:
+            msgNum = 0
+            for msgNum in range(0, numMsgs):
+                msg_str = self._makeMessage(msgNum, msgSize)
+                self.session.message_transfer(destination=exchangeName, message=Message(self.session.delivery_properties(delivery_mode=deliveryMode), msg_str))
+                if fail_on_msg != None and msgNum > fail_on_msg and not txnProduce:
+                    self.fail("Failed to throw SessionException on message_transfer %s" % fail_on_msg)
+        except SessionException:
+            if fail_on_msg != None and msgNum == fail_on_msg and not txnProduce:
+                self._handleSessionClose()
+        if txnProduce:
+            self.assertEqual(self.XA_OK, self.session.dtx_end(xid=txid).status)
+            if (fail_on_msg != None and txnProduce):
+                self.assertNotEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+                self.assertEqual(self.XA_OK, self.session.dtx_rollback(xid=txid).status)
+            else:
+                self.assertEqual(self.XA_OK, self.session.dtx_prepare(xid=txid).status)
+                self.assertEqual(self.XA_OK, self.session.dtx_commit(xid=txid, one_phase=False).status)
+        self._resetChannel()
     
     def _randomBool(self):
         if random.randint(0, 1) > 0:
@@ -113,112 +143,72 @@
         return False
 
     def _resetChannel(self):
-        self.session.close()
+        try:
+            self.session.close()
+        except Exception, e:
+            pass
         self.session = self.conn.session("test-session", 1)
 
-    # --- Simple tests ---
+    # --- Simple test framework (single queue) ---
 
-    def test_FlowToDisk_00_SimpleMaxCount(self):
-        """Flow-to-disk tests based on setting max_count"""
-        self.simpleLimit("test_FlowToDisk_00a", max_count = 10)
-        self.simpleLimit("test_FlowToDisk_00b", max_count = 10, persistent = True)
-        self.simpleLimit("test_FlowToDisk_00c", max_count = 10, max_size = 10000000, numMsgs = 100, msgSize = 10000)
-        self.simpleLimit("test_FlowToDisk_00d", max_count = 10, max_size = 10000000, persistent = True, numMsgs = 100, msgSize = 10000)
-
-    def test_FlowToDisk_01_SimpleMaxSize(self):
-        """Flow-to-disk tests based on setting max_size"""
-        self.simpleLimit("test_FlowToDisk_01a", max_size = 100)
-        self.simpleLimit("test_FlowToDisk_01b", max_size = 100, persistent = True)
-        self.simpleLimit("test_FlowToDisk_01c", max_size = 100000, numMsgs = 100, msgSize = 10000)
-        self.simpleLimit("test_FlowToDisk_01d", max_size = 100000, persistent = True, numMsgs = 100, msgSize = 10000)
-
-    def test_FlowToDisk_02_SimpleMaxCountNotAcquired(self):
-        """Flow-to-disk tests based on setting max_count, but not using pre-acquired mode (ie browsing)"""
-        self.simpleLimit("test_FlowToDisk_02a", max_count = 10, pre_acquired = False)
-        self.simpleLimit("test_FlowToDisk_02b", max_count = 10, persistent = True, pre_acquired = False)
-        self.simpleLimit("test_FlowToDisk_02c", max_count = 10, max_size = 10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-        self.simpleLimit("test_FlowToDisk_02d", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-
-    def test_FlowToDisk_03_SimpleMaxSizeNotAcquired(self):
-        """Flow-to-disk tests based on setting max_size, but not using pre-acquired mode (ie browsing)"""
-        self.simpleLimit("test_FlowToDisk_03a", max_size = 100, pre_acquired = False)
-        self.simpleLimit("test_FlowToDisk_03b", max_size = 100, persistent = True, pre_acquired = False)
-        self.simpleLimit("test_FlowToDisk_03c", max_size = 100, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-        self.simpleLimit("test_FlowToDisk_03d", max_size = 100, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
-    
-    def test_FlowToDisk_04_MaxSizeMaxCount(self):
-        """Flow-to-disk tests based on setting both max-count and max-size at the same time"""
-        self.simpleLimit("test_FlowToDisk_04a", max_count = 10, max_size = 1000)
-        self.simpleLimit("test_FlowToDisk_04b", max_count = 10, max_size = 1000, msgSize = 250)
-        self.simpleLimit("test_FlowToDisk_04c", max_count = 10, max_size = 1000, persistent = True)
-        self.simpleLimit("test_FlowToDisk_04d", max_count = 10, max_size = 1000, msgSize = 250, persistent = True)
-    
-    def test_FlowToDisk_05_Randomized(self):
-        """Randomized flow-to-disk tests"""
-        seed = long(1000.0 * time.time())
-        print "seed=0x%x" % seed
-        random.seed(seed)
-        for i in range(0, 10):
-            self.randomLimit(i)
-
-    def simpleLimit(self, qn, max_count = None, max_size = None, persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True):
-        qa = {'qpid.policy_type':'flow_to_disk'}
+    def simpleLimit(self, queueName, max_count = None, max_size = None, persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy = "flow_to_disk"):
+        if max_count != None or max_size != None:
+            queueArgs = {'qpid.policy_type':policy}
         if max_count != None:
-            qa['qpid.max_count'] = max_count
+            queueArgs['qpid.max_count'] = max_count
         if max_size != None:
-            qa['qpid.max_size'] = max_size
+            queueArgs['qpid.max_size'] = max_size
         if persistent:
-            dm = self.session.delivery_mode.persistent
+            deliveryMode = self.session.delivery_mode.persistent
         else:
-            dm = self.session.delivery_mode.non_persistent
+            deliveryMode = self.session.delivery_mode.non_persistent
         if pre_acquired:
-            am = self.session.acquire_mode.pre_acquired
+            acquireMode = self.session.acquire_mode.pre_acquired
         else:
-            am = self.session.acquire_mode.not_acquired
+            acquireMode = self.session.acquire_mode.not_acquired
         # Cycle through the produce/consume block transaction combinations
         for i in range(0, 4):
-            tp = i & 1 != 0 # Transactional produce
-            tc = i & 2 != 0 # Transactional consume
-            self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+            txnProduce = i & 1 != 0 # Transactional produce
+            txnConsume = i & 2 != 0 # Transactional consume
+            self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse)
 
-    def randomLimit(self, count):
-        qa = {'qpid.policy_type':'flow_to_disk'}
-        qn = "randomized_test_%04d" % count
+    def randomLimit(self, count, policy = "flow_to_disk"):
+        queueArgs = {'qpid.policy_type':policy}
+        queueName = "randomized_test_%04d" % count
         
         # Flow to disk policy
         maxCount = None
         if self._randomBool():
             maxCount = random.randint(0,10000)
-            qa['qpid.max_count'] = maxCount
+            queueArgs['qpid.max_count'] = maxCount
         maxSize = None
         if self._randomBool():
             maxSize = random.randint(0, 1000000)
-            qa['qpid.max_size'] = maxSize
+            queueArgs['qpid.max_size'] = maxSize
             
         # Persistence
         if self._randomBool():
-            dm = self.session.delivery_mode.persistent
+            deliveryMode = self.session.delivery_mode.persistent
         else:
-            dm = self.session.delivery_mode.non_persistent
+            deliveryMode = self.session.delivery_mode.non_persistent
         
         # Acquired mode
         if self._randomBool():
-            am = self.session.acquire_mode.pre_acquired
+            acquireMode = self.session.acquire_mode.pre_acquired
             browse = False
         else:
-            am = self.session.acquire_mode.not_acquired
+            acquireMode = self.session.acquire_mode.not_acquired
             browse = self._randomBool()
         
         numMsgs = random.randint(1, 10000)
         sizeLimit = int(1000000 / numMsgs)
         msgSize = random.randint(1, sizeLimit)
-        tp = self._randomBool()
-        tc = self._randomBool()
+        txnProduce = self._randomBool()
+        txnConsume = self._randomBool()
         
-        #print "  qn=%s, qa=%s, dm=%s, am=%s, numMsgs=%d, msgSize=%d, tp=%s, tc=%s, browse=%s" % (qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
-        self.txSimpleLimit(qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse)
+        self._txSimpleLimit(queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse)
 
-    def txSimpleLimit(self, qn, qa, dm, am, numMsgs, msgSize, tp, tc, browse):
+    def _txSimpleLimit(self, queueName, queueArgs, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse):
         """
         Test a simple case of message limits which will force flow-to-disk.
         * queue_args sets a limit - either max_count and/or max_size
@@ -227,19 +217,356 @@
         * Check the broker has no messages left.
         """
         
-        self.session.queue_declare(queue=qn, durable=True, arguments=qa)
+        self.session.queue_declare(queue=queueName, durable=True, arguments=queueArgs)
         
-        # --- Add messages ---
-        self._produce(qn, dm, numMsgs, msgSize, tp)
+        # Add messages
+        self._produceDirect(queueName, deliveryMode, numMsgs, msgSize, txnProduce)
         
-        # --- Browse messages (if possible) ---
-        if am == self.session.acquire_mode.not_acquired and browse:
-            dtA = "tagA-%d-%d" % (tp, tc)
-            ids, txid = self._browse(qn, dtA, am, numMsgs, msgSize, False)
-            self._checkCancel(qn, dtA, numMsgs, ids)
+        # Browse messages (if possible)
+        if acquireMode == self.session.acquire_mode.not_acquired and browse:
+            dtA = "tagA-%d-%d" % (txnProduce, txnConsume)
+            ids, txid = self._browse(queueName, dtA, acquireMode, numMsgs, msgSize, False)
+            self._checkCancel(queueName, dtA, numMsgs, ids)
         
-        # --- Consume messages ---
-        dtB = "tagB-%d-%d" % (tp, tc)
-        ids, txid = self._browse(qn, dtB, am, numMsgs, msgSize, tc)
-        self._checkConsume(qn, am, numMsgs, ids, txid, tc)
-        self._checkEmpty(qn)
+        # Consume messages
+        dtB = "tagB-%d-%d" % (txnProduce, txnConsume)
+        ids, txid = self._browse(queueName, dtB, acquireMode, numMsgs, msgSize, txnConsume)
+        self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
+        self._checkEmpty(queueName)
+        
+        self.session.queue_delete(queue=queueName)
+
+    # --- Multi-queue test framework ---
+
+    def multiQueueLimit(self, queueMap, exchangeName = "amq.fanout", persistent = False, pre_acquired = True, numMsgs = 15, msgSize = None, browse = True, policy = "flow_to_disk", fail_on_msg = None):
+        if persistent:
+            deliveryMode = self.session.delivery_mode.persistent
+        else:
+            deliveryMode = self.session.delivery_mode.non_persistent
+        if pre_acquired:
+            acquireMode = self.session.acquire_mode.pre_acquired
+        else:
+            acquireMode = self.session.acquire_mode.not_acquired
+        # Cycle through the produce/consume block transaction combinations
+        for i in range(0, 4):
+            txnProduce = i & 1 != 0 # Transactional produce
+            txnConsume = i & 2 != 0 # Transactional consume
+            self._txMultiQueueLimit(queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse, policy, fail_on_msg)
+
+    def _txMultiQueueLimit(self, queueMap, exchangeName, deliveryMode, acquireMode, numMsgs, msgSize, txnProduce, txnConsume, browse, policy, fail_on_msg):
+        """
+        Test a multi-queue case
+        queueMap = queue map where map is queue name (key) against queue args (value)
+        """
+        self._multiQueueSetup(queueMap, exchangeName, txnProduce, txnConsume, policy)
+        self._produceFanout(exchangeName, deliveryMode, numMsgs, msgSize, txnProduce, fail_on_msg)
+        if fail_on_msg == None:
+            # Browse messages (ie get without acquiring)
+            if acquireMode == self.session.acquire_mode.not_acquired and browse:
+                for queue in queueMap.iterkeys():
+                    deliveryTagA = "tagA-%d-%d" % (txnProduce, txnConsume)
+                    queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+                    ids, txid = self._browse(queueName, deliveryTagA, acquireMode, numMsgs, msgSize, False)
+                    self._checkCancel(queueName, deliveryTagA, numMsgs, ids)
+            # Consume messages
+            for queue in queueMap.iterkeys():
+                deliveryTagB = "tagB-%d-%d" % (txnProduce, txnConsume)
+                queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+                ids, txid = self._browse(queueName, deliveryTagB, acquireMode, numMsgs, msgSize, txnConsume)
+                self._checkConsume(queueName, acquireMode, numMsgs, ids, txid, txnConsume)
+                self._checkEmpty(queueName)
+        self._multiQueueTeardown(queueMap, exchangeName, txnProduce, txnConsume)
+
+    def _multiQueueSetup(self, queueMap, exchangeName, txnProduce, txnConsume, policy, exclusive = True):
+        for queue, qp in queueMap.iteritems():
+            queueArgs = {}
+            ftd = False
+            for p,v in qp.iteritems():
+                if p == "persistent":
+                    d = v == True
+                elif p == "qpid.max_count":
+                    if v != None:
+                        queueArgs[p] = v
+                        ftd = True
+                elif p == "qpid.max_size":
+                    if v != None:
+                        queueArgs[p] = v
+                        ftd = True
+            if ftd:
+                queueArgs["qpid.policy_type"] = policy
+            queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+            self.session.queue_declare(queue = queueName, durable = d, arguments = queueArgs, exclusive = exclusive)
+            self.session.exchange_bind(exchange = exchangeName, queue = queueName)
+
+    def _multiQueueTeardown(self, queueMap, exchangeName, txnProduce, txnConsume):
+        for queue in queueMap.iterkeys():
+            queueName = "%s-%d-%d" % (queue, txnProduce, txnConsume)
+            self.session.exchange_unbind(exchange = exchangeName, queue = queueName)
+            self.session.queue_delete(queue = queueName)
+
+    # --- Simple tests ---
+
+    def test_01_SimpleMaxCount(self):
+        """Flow-to-disk tests based on setting max_count"""
+        # Small msgs
+        self.simpleLimit("test_01a", max_count = 10)
+        self.simpleLimit("test_01b", max_count = 10, persistent = True)
+        # Large msgs - set max_size to high number to make sure it does not interfere
+        #self.simpleLimit("test_00c", max_count = 10, max_size = 10000000, numMsgs = 100, msgSize = 10000)
+        #self.simpleLimit("test_00d", max_count = 10, max_size = 10000000, persistent = True, numMsgs = 100, msgSize = 10000)
+
+    def test_02_SimpleMaxSize(self):
+        """Flow-to-disk tests based on setting max_size"""
+        # Small msgs
+        self.simpleLimit("test_02a", max_size = 100)
+        self.simpleLimit("test_02b", max_size = 100, persistent = True)
+        # Large msgs - set max_size to high number to make sure it does not interfere
+        #self.simpleLimit("test_01c", max_size = 100000, numMsgs = 100, msgSize = 10000)
+        #self.simpleLimit("test_01d", max_size = 100000, persistent = True, numMsgs = 100, msgSize = 10000)
+
+    def test_03_SimpleMaxCountNotAcquired(self):
+        """Flow-to-disk tests based on setting max_count, but not using pre-acquired mode (ie browsing)"""
+        # Small msgs
+        self.simpleLimit("test_03a", max_count = 10, pre_acquired = False)
+        self.simpleLimit("test_03b", max_count = 10, persistent = True, pre_acquired = False)
+        # Large msgs - set max_size to high number to make sure it does not interfere
+        #self.simpleLimit("test_02c", max_count = 10, max_size = 10000000, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+        #self.simpleLimit("test_02d", max_count = 10, max_size = 10000000, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+
+    def test_04_SimpleMaxSizeNotAcquired(self):
+        """Flow-to-disk tests based on setting max_size, but not using pre-acquired mode (ie browsing)"""
+        # Small msgs
+        self.simpleLimit("test_04a", max_size = 100, pre_acquired = False)
+        self.simpleLimit("test_04b", max_size = 100, persistent = True, pre_acquired = False)
+        # Large msgs - set max_size to high number to make sure it does not interfere
+        #self.simpleLimit("test_03c", max_size = 100, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+        #self.simpleLimit("test_03d", max_size = 100, persistent = True, pre_acquired = False, numMsgs = 100, msgSize = 10000)
+    
+    def test_05_MaxSizeMaxCount(self):
+        """Flow-to-disk tests based on setting both max-count and max-size at the same time"""
+        # Small msgs
+        self.simpleLimit("test_05a", max_count = 10, max_size = 1000)
+        self.simpleLimit("test_05b", max_count = 10, max_size = 1000, msgSize = 250)
+        # Large msgs - set max_size to high number to make sure it does not interfere
+        #self.simpleLimit("test_04c", max_count = 10, max_size = 1000, persistent = True)
+        #self.simpleLimit("test_04d", max_count = 10, max_size = 1000, msgSize = 250, persistent = True)
+    
+    # --- Multi-queue tests ---
+    
+    def test_06_MultiQueueTransQueueTransMsg(self):
+        """Flow-to-disk tests where queues and messages are transient and messages are routed to more than one queue"""
+        queueMap1 = {"test_06a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+                     "test_06b" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap1)
+        
+        queueMap2 = {"test_06c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_06d" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap2, fail_on_msg = 10)
+        
+        queueMap3 = {"test_06e" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_06f" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap3, msgSize = 100, fail_on_msg = 10)
+        queueMap4 = {"test_06g" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_06h" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap4, fail_on_msg =  8)
+        
+        queueMap5 = {"test_06i" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000}, 
+                     "test_06j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg =  8)
+        
+        queueMap6 = {"test_06k" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_06l" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None},
+                     "test_06m" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+                     "test_06n" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap6, msgSize = 100, fail_on_msg =  8)
+     
+    def test_07_MultiQueueDurableQueueTransMsg(self):
+        """Flow-to-disk tests where queues are durable and messages are transient and messages are routed to more than one queue"""
+        queueMap1 = {"test_07a" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None},
+                     "test_07b" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap1)
+        
+        queueMap2 = {"test_07c" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_07d" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap2)
+        
+        queueMap3 = {"test_07e" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_07f" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap3, msgSize = 100)
+        
+        queueMap4 = {"test_07g" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_07h" : {"persistent":True,  "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap4)
+        
+        queueMap5 = {"test_07i" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000}, 
+                     "test_07j" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap5, msgSize = 100)
+        
+        queueMap6 = {"test_07k" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_07l" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None},
+                     "test_07m" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000},
+                     "test_07n" : {"persistent":True,  "qpid.max_count":8,    "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap6, msgSize = 100)
+   
+    
+    def test_08_MultiQueueTransQueueDurableMsg(self):
+        """Flow-to-disk tests where queues are transient and messages are durable and messages are routed to more than one queue"""
+        queueMap1 = {"test_08a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+                     "test_08b" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap1, persistent = True)
+        
+        queueMap2 = {"test_08c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_08d" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap2, persistent = True, fail_on_msg = 10)
+        
+        queueMap3 = {"test_08e" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_08f" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True, fail_on_msg = 10)
+        
+        queueMap4 = {"test_08g" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_08h" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap4, persistent = True, fail_on_msg =  8)
+        
+        queueMap5 = {"test_08i" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000}, 
+                     "test_08j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg =  8)
+        
+        queueMap6 = {"test_08k" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_08l" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None},
+                     "test_08m" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+                     "test_08n" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True, fail_on_msg =  8)
+     
+    def test_09_MultiQueueDurableQueueDurableMsg(self):
+        """Flow-to-disk tests where queues and messages are durable and messages are routed to more than one queue"""
+        queueMap1 = {"test_09a" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None},
+                     "test_09b" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap1, persistent = True)
+        
+        queueMap2 = {"test_09c" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_09d" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap2, persistent = True)
+        
+        queueMap3 = {"test_09e" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_09f" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap3, msgSize = 100, persistent = True)
+        
+        queueMap4 = {"test_09g" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_09h" : {"persistent":True,  "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap4, persistent = True)
+       
+        queueMap5 = {"test_09i" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000}, 
+                     "test_09j" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True)
+        
+        queueMap6 = {"test_09k" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_09l" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None},
+                     "test_09m" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000},
+                     "test_09n" : {"persistent":True,  "qpid.max_count":8,    "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap6, msgSize = 100, persistent = True)
+      
+    def test_10_MultiQueueMixedQueueTransMsg(self):
+        """Flow-to-disk tests where both queues are both durable and transient and messages are transient and messages are routed to more than one queue"""
+        queueMap1 = {"test_10a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+                     "test_10b" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap1)
+        
+        queueMap2 = {"test_10c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_10d" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap2)
+        
+        queueMap3 = {"test_10e" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_10f" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap3, fail_on_msg = 10)
+        
+        queueMap4 = {"test_10g" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_10h" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap4, msgSize = 100)
+        
+        queueMap5 = {"test_10i" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_10j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap5, msgSize = 100, fail_on_msg = 10)
+        
+        queueMap6 = {"test_10k" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_10l" : {"persistent":True,  "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap6, fail_on_msg =  10)
+        
+        queueMap7 = {"test_10m" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_10n" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap7, fail_on_msg =  8)
+        
+        queueMap8 = {"test_10o" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+                     "test_10p" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap8, msgSize = 100, fail_on_msg =  10)
+        
+        queueMap9 = {"test_10q" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000}, 
+                     "test_10r" : {"persistent":False, "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap9, msgSize = 100, fail_on_msg =  8)
+        
+        queueMap10 = {"test_10s" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                      "test_10t" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None},
+                      "test_10u" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+                      "test_10v" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size":  800},
+                      "test_10w" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                      "test_10x" : {"persistent":True,  "qpid.max_count":12,   "qpid.max_size": None},
+                      "test_10y" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1100},
+                      "test_10z" : {"persistent":True,  "qpid.max_count":7,    "qpid.max_size":  900} }
+        self.multiQueueLimit(queueMap10, msgSize = 100, fail_on_msg =  8)
+       
+    def test_11_MultiQueueMixedQueueDurableMsg(self):
+        """Flow-to-disk tests where queues are both durable and transient and messages are durable and messages are routed to more than one queue"""
+        queueMap1 = {"test_11a" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None},
+                     "test_11b" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap1, persistent = True)
+        
+        queueMap2 = {"test_11c" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_11d" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap2, persistent = True)
+        
+        queueMap3 = {"test_11e" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_11f" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap3, persistent = True, fail_on_msg = 10)
+        
+        queueMap4 = {"test_11g" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_11h" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap4, msgSize = 100, persistent = True)
+        
+        queueMap5 = {"test_11i" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                     "test_11j" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000} }
+        self.multiQueueLimit(queueMap5, msgSize = 100, persistent = True, fail_on_msg = 10)
+        
+        queueMap6 = {"test_11k" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_11l" : {"persistent":True,  "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap6, persistent = True, fail_on_msg =  10)
+        
+        queueMap7 = {"test_11m" : {"persistent":True,  "qpid.max_count":10,   "qpid.max_size": None}, 
+                     "test_11n" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size": None} }
+        self.multiQueueLimit(queueMap7, persistent = True, fail_on_msg =  8)
+        
+        queueMap8 = {"test_11o" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000}, 
+                     "test_11p" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap8, msgSize = 100, persistent = True, fail_on_msg =  10)
+        
+        queueMap9 = {"test_11q" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1000}, 
+                     "test_11r" : {"persistent":False, "qpid.max_count":None, "qpid.max_size":  800} }
+        self.multiQueueLimit(queueMap9, msgSize = 100, persistent = True, fail_on_msg =  8)
+        
+        queueMap10 = {"test_11s" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": None}, 
+                      "test_11t" : {"persistent":False, "qpid.max_count":10,   "qpid.max_size": None},
+                      "test_11u" : {"persistent":False, "qpid.max_count":None, "qpid.max_size": 1000},
+                      "test_11v" : {"persistent":False, "qpid.max_count":8,    "qpid.max_size":  800},
+                      "test_11w" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": None}, 
+                      "test_11x" : {"persistent":True,  "qpid.max_count":12,   "qpid.max_size": None},
+                      "test_11y" : {"persistent":True,  "qpid.max_count":None, "qpid.max_size": 1100},
+                      "test_11z" : {"persistent":True,  "qpid.max_count":7,    "qpid.max_size":  900} }
+        self.multiQueueLimit(queueMap10, msgSize = 100, persistent = True, fail_on_msg =  8)
+  
+    # --- Long and randomized tests ---
+        
+    def test_12_Randomized(self):
+        """Randomized flow-to-disk tests"""
+        seed = long(1000.0 * time.time())
+        print "seed=0x%x" % seed
+        random.seed(seed)
+        for i in range(0, 10):
+            self.randomLimit(i)

Modified: store/trunk/cpp/tests/run_python_tests
===================================================================
--- store/trunk/cpp/tests/run_python_tests	2009-09-22 14:02:42 UTC (rev 3639)
+++ store/trunk/cpp/tests/run_python_tests	2009-09-22 18:17:46 UTC (rev 3640)
@@ -78,10 +78,12 @@
 start_broker() {
     ${QPIDD} --daemon --port 0 --no-module-dir --load-module=${STORE_LIB} --data-dir=${STORE_DIR} --auth=no --log-enable info+ --log-to-file ${STORE_DIR}/broker.python-test.log > qpidd.port
     LOCAL_PORT=`cat qpidd.port`
+    echo "run_python_tests: Started qpidd on port ${LOCAL_PORT}"
 }
 
 stop_broker() {
-    ${QPIDD} -q --port $LOCAL_PORT
+       echo "run_python_tests: Stopping broker on port ${LOCAL_PORT}"
+    ${QPIDD} -q --port ${LOCAL_PORT}
 }
 
 fail=0



More information about the rhmessaging-commits mailing list