[rhmessaging-commits] rhmessaging commits: r3905 - in store/trunk/cpp: tests and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Apr 13 13:30:23 EDT 2010


Author: kpvdr
Date: 2010-04-13 13:30:22 -0400 (Tue, 13 Apr 2010)
New Revision: 3905

Added:
   store/trunk/cpp/tests/new_python_tests/flow_to_disk.py
   store/trunk/cpp/tests/new_python_tests/store_test.py
   store/trunk/cpp/tests/run_short_python_tests
Removed:
   store/trunk/cpp/tests/old_python_tests/
   store/trunk/cpp/tests/run_old_python_tests
Modified:
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/tests/Makefile.am
   store/trunk/cpp/tests/jrnl/jtt/Makefile.am
   store/trunk/cpp/tests/new_python_tests/__init__.py
   store/trunk/cpp/tests/new_python_tests/client_persistence.py
   store/trunk/cpp/tests/run_long_python_tests
   store/trunk/cpp/tests/run_new_python_tests
Log:
Fix for QPID-2470 - Broker does not honour flow-to-disk policy on recovery. Added new flow-to-disk tests which detect this condition. Also reorganized tests into short, regular and long tests.

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-04-13 17:30:22 UTC (rev 3905)
@@ -1007,8 +1007,7 @@
             } // switch
         } // while
     } catch (const journal::jexception& e) {
-        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
-                              ": recoverMessages() failed: " + e.what());
+        THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
     }
 }
 

Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/Makefile.am	2010-04-13 17:30:22 UTC (rev 3905)
@@ -27,7 +27,7 @@
 INCLUDES=-I$(top_srcdir)/lib -I$(top_srcdir)/lib/gen
 
 TMP_DATA_DIR=$(abs_srcdir)/tmp_data_dir
-
+ 
 if DO_CLUSTER_TESTS
 SUBDIRS = jrnl . cluster
 else
@@ -39,7 +39,6 @@
   OrderingTest                  \
   TransactionalTest             \
   TwoPhaseCommitTest            \
-  run_old_python_tests          \
   run_new_python_tests          \
   system_test.sh                \
   clean.sh
@@ -51,7 +50,7 @@
 SHORT_TESTS =					\
   SimpleTest                    \
   TransactionalTest             \
-  system_test.sh                \
+  run_short_python_tests        \
   clean.sh
 
 check_PROGRAMS =                \
@@ -79,11 +78,10 @@
     clean.sh \
     failing_python_tests.txt \
     new_python_tests \
-    old_python_tests \
     persistence.py \
     run_long_python_tests \
-    run_old_python_tests \
     run_new_python_tests \
+    run_short_python_tests \
     run_test \
     start_broker \
     stop_broker \

Modified: store/trunk/cpp/tests/jrnl/jtt/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/Makefile.am	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/jrnl/jtt/Makefile.am	2010-04-13 17:30:22 UTC (rev 3905)
@@ -40,8 +40,9 @@
 .valgrind.supp: $(top_srcdir)/tests/.valgrind.supp
 	cp $^ .
 
-TESTS = \
+LONG_TESTS = \
     _ut_data_src \
+    _ut_long_data_src \
     _ut_jrnl_init_params \
     _ut_read_arg \
     _ut_test_case \
@@ -50,9 +51,6 @@
     _ut_test_case_set \
     _ut_jrnl_instance
 
-LONG_TESTS = \
-    _ut_long_data_src
-
 check_PROGRAMS = jtt \
     _ut_data_src \
     _ut_long_data_src \

Modified: store/trunk/cpp/tests/new_python_tests/__init__.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/__init__.py	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/new_python_tests/__init__.py	2010-04-13 17:30:22 UTC (rev 3905)
@@ -22,3 +22,4 @@
 # The GNU Lesser General Public License is available in the file COPYING.
 
 from client_persistence import *
+from flow_to_disk import *

Modified: store/trunk/cpp/tests/new_python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/client_persistence.py	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/new_python_tests/client_persistence.py	2010-04-13 17:30:22 UTC (rev 3905)
@@ -1,223 +1,169 @@
-# Copyright (c) 2008 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
+"""
+Copyright (c) 2008 Red Hat, Inc.
 
-from qpid.brokertest import *
-from qpid.messaging import Empty, Message
-from qmf.console import Session
-    
-def storeArgs():
-    assert BrokerTest.store_lib 
-    return ["--load-module", BrokerTest.store_lib]
+This file is part of the Qpid async store library msgstore.so.
 
-class Qmf:
-    """
-    QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available.
-    """
-    def __init__(self, broker):
-        self.__session = Session()
-        self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port())
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
 
-    def addExchange(self, exchangeName, exchangeType, altExchangeName=None, passive=False, durable=False, arguments = {}):
-        """Add a new exchange"""
-        amqpSession = self.__broker.getAmqpSession()
-        if altExchangeName:
-            amqpSession.exchange_declare(exchange=exchangeName, type=exchangeType, alternate_exchange=altExchangeName, passive=passive, durable=durable, arguments=arguments)
-        else:
-            amqpSession.exchange_declare(exchange=exchangeName, type=exchangeType, passive=passive, durable=durable, arguments=arguments)
-    
-    def addQueue(self, queueName, altExchangeName=None, passive=False, durable=False, arguments = {}):
-        """Add a new queue"""
-        amqpSession = self.__broker.getAmqpSession()
-        if altExchangeName:
-            amqpSession = amqpSession.queue_declare(queueName, alternate_exchange=altExchangeName, passive=passive, durable=durable, arguments=arguments)
-        else:
-            amqpSession = amqpSession.queue_declare(queueName, passive=passive, durable=durable, arguments=arguments)
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+Lesser General Public License for more details.
 
-    def __query(self, name, _class, package, altExchangeName=None):
-        try:
-            objList = self.__session.getObjects(_class=_class, _package=package)
-            found = False
-            for o in objList:
-                if o.name == name:
-                    found = True
-                    if altExchangeName != None:
-                        altExchList = self.__session.getObjects(_objectId=o.altExchange)
-                        if len(altExchList) == 0 or altExchList[0].name != altExchangeName: return False
-                    break
-            return found
-        except: return False
-                
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+USA
 
-    def queryExchange(self, exchangeName, altExchangeName=None):
-        """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known value."""
-        return self.__query(exchangeName, "exchange", "org.apache.qpid.broker", altExchangeName)
-    
-    def queryQueue(self, queueName, altExchangeName=None):
-        """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known value."""
-        return self.__query(queueName, "queue", "org.apache.qpid.broker", altExchangeName)
-    
-    def queueMsgCount(self, queueName):
-        queueList = self.__session.getObjects(_class="queue", _name=queueName)
-        if len(queueList):
-            return queueList[0].msgDepth
-    
-    def queueEmpty(self, queueName):
-        return self.queueMsgCount(queueName) == 0
+The GNU Lesser General Public License is available in the file COPYING.
+"""
 
-
-class StoreTest(BrokerTest):
-    """
-    This subclass of BrokerTest adds some convenience test/check functions
-    """
+from qpid.brokertest import EXPECT_EXIT_OK
+from store_test import StoreTest, Qmf, store_args
+from qpid.messaging import Message
     
-    def __chkEmpty(self, queue, receiver):
-        try:
-            msg = receiver.fetch(timeout=0)
-            self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg))
-        except Empty: pass
-    
-    def chkMsg(self, broker, queue, msgChk, empty=False, ack=True):
-        return self.chkMsgs(broker, queue, [msgChk], empty, ack)
-        
-    def chkMsgs(self, broker, queue, msgChkList, empty=False, ack=True):
-        s = broker.connect().session()
-        rcvr = s.receiver(queue + "; {create:always}", capacity=len(msgChkList))
-        try: rmList = [rcvr.fetch(timeout=0) for i in range(len(msgChkList))]
-        except Empty: self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i))
-        for i in range(0, len(rmList)):
-            self.assertEqual(rmList[i].content, msgChkList[i].content)
-            self.assertEqual(rmList[i].correlation_id, msgChkList[i].correlation_id)
-        if empty: self.__chkEmpty(queue, rcvr)
-        if ack:
-            s.acknowledge()
-            s.connection.close()
-        else:
-            return s
 
-
 class ExchangeQueueTests(StoreTest):
     """
     Simple tests of the broker exchange and queue types
     """
     
-    def testDirectExchange(self):
+    def test_direct_exchange(self):
         """Test Direct exchange."""
-        broker = self.broker(storeArgs(), name="testDirectExchange", expect=EXPECT_EXIT_OK)
-        m1 = Message("A_Message1", durable=True, correlation_id="Msg0001")
-        m2 = Message("B_Message1", durable=True, correlation_id="Msg0002")
-        broker.send_message("a", m1)
-        broker.send_message("b", m2)
+        broker = self.broker(store_args(), name="testDirectExchange", expect=EXPECT_EXIT_OK)
+        msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001")
+        msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002")
+        broker.send_message("a", msg1)
+        broker.send_message("b", msg2)
         broker.terminate()
         
-        broker = self.broker(storeArgs(), name="testDirectExchange")
-        self.chkMsg(broker, "a", m1, True)
-        self.chkMsg(broker, "b", m2, True)
+        broker = self.broker(store_args(), name="testDirectExchange")
+        self.check_message(broker, "a", msg1, True)
+        self.check_message(broker, "b", msg2, True)
     
-    def testTopicExchange(self):
+    def test_topic_exchange(self):
         """Test Topic exchange."""
-        broker = self.broker(storeArgs(), name="testTopicExchange", expect=EXPECT_EXIT_OK)
-        s = broker.connect().session()
-        snd1 = s.sender("abc/key1; {create:always, node-properties:{durable:True, type:topic}}")
-        snd2 = s.sender("abc/key2; {create:always, node-properties:{durable:True, type:topic}}")
-        s.receiver("a; {create:always, node-properties:{durable:True, x-properties:{bindings:['abc/key1']}}}")
-        s.receiver("b; {create:always, node-properties:{durable:True, x-properties:{bindings:['abc/key1']}}}")
-        s.receiver("c; {create:always, node-properties:{durable:True, x-properties:{bindings:['abc/key1']}}}")
-        m1 = Message("Message1", durable=True, correlation_id="Msg0003")
-        snd1.send(m1)
-        m2 = Message("Message2", durable=True, correlation_id="Msg0004")
-        snd2.send(m2)
-        s.connection.close()
+        broker = self.broker(store_args(), name="testTopicExchange", expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}")
+        snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}")
+        ssn.receiver("a; {create:always, link:{durable:True, x-bindings:[{exchange:abc, key:key1}]}}")
+        ssn.receiver("b; {create:always, link:{durable:True, x-bindings:[{exchange:abc, key:key1}]}}")
+        ssn.receiver("c; {create:always, link:{durable:True, x-bindings:[{exchange:abc, key:key1}, "
+                     "{exchange:abc, key: key2}]}}")
+        ssn.receiver("d; {create:always, link:{durable:True, x-bindings:[{exchange:abc, key:key2}]}}")
+        ssn.receiver("e; {create:always, link:{durable:True, x-bindings:[{exchange:abc, key:key2}]}}")
+        msg1 = Message("Message1", durable=True, correlation_id="Msg0003")
+        snd1.send(msg1)
+        msg2 = Message("Message2", durable=True, correlation_id="Msg0004")
+        snd2.send(msg2)
         broker.terminate()
         
-        broker = self.broker(storeArgs(), name="testTopicExchange")
-        self.chkMsg(broker, "a", m1, True)
-        self.chkMsg(broker, "b", m1, True)
-        self.chkMsg(broker, "c", m1, True)
+        broker = self.broker(store_args(), name="testTopicExchange")
+        self.check_message(broker, "a", msg1, True)
+        self.check_message(broker, "b", msg1, True)
+        self.check_messages(broker, "c", [msg1, msg2], True)
+        self.check_message(broker, "d", msg2, True)
+        self.check_message(broker, "e", msg2, True)
         
     
-    def testLVQ(self):
+    def test_lvq(self):
         """Test LVQ."""        
-        broker = self.broker(storeArgs(), name="testLVQ", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK)
         ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"})
         ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"})
         mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"})
         mb2 = Message("B2", durable=True, correlation_id="Msg0008", properties={"qpid.LVQ_key":"B"})
         mb3 = Message("B3", durable=True, correlation_id="Msg0009", properties={"qpid.LVQ_key":"B"})
         mc1 = Message("C1", durable=True, correlation_id="Msg0010", properties={"qpid.LVQ_key":"C"})
-        broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1], xprops="\"qpid.last_value_queue\":True")
+        broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
+                             xprops="arguments:{\"qpid.last_value_queue\":True}")
         broker.terminate()
         
-        broker = self.broker(storeArgs(), name="testLVQ", expect=EXPECT_EXIT_OK)
-        s = self.chkMsgs(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
+        broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK)
+        ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
         # Add more messages while subscriber is active (no replacement):
         ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"})
         ma4 = Message("A4", durable=True, correlation_id="Msg0012", properties={"qpid.LVQ_key":"A"})
         mc2 = Message("C2", durable=True, correlation_id="Msg0013", properties={"qpid.LVQ_key":"C"})
         mc3 = Message("C3", durable=True, correlation_id="Msg0014", properties={"qpid.LVQ_key":"C"})
         mc4 = Message("C4", durable=True, correlation_id="Msg0015", properties={"qpid.LVQ_key":"C"})
-        broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], xprops="\"qpid.last_value_queue\":True", session=s)
-        s.acknowledge()
-        s.connection.close()
+        broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn)
+        ssn.acknowledge()
         broker.terminate()
         
-        broker = self.broker(storeArgs(), name="testLVQ")
-        self.chkMsgs(broker, "lvq-test", [mc4, ma4], True)
+        broker = self.broker(store_args(), name="testLVQ")
+        self.check_messages(broker, "lvq-test", [mc4, ma4], True)
+    
+    def test_fanout_exchange(self):
+        """Test Fanout Exchange"""
+        broker = self.broker(store_args(), name="testFanout", expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True}}")
+        msg1 = Message("Msg1", durable=True, correlation_id="Msg0001")
+        snd.send(msg1)
+        msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
+        snd.send(msg2)
+        broker.terminate()
         
+        broker = self.broker(store_args(), name="testFanout")
+        self.check_messages(broker, "q1", [msg1, msg2], True)
+        self.check_messages(broker, "q2", [msg1, msg2], True)
+        self.check_messages(broker, "q3", [msg1, msg2], True)
+        
 
 class AlternateExchagePropertyTests(StoreTest):
     """
     Test the persistence of the Alternate Exchange property for exchanges and queues.
     """
 
-    def testExchange(self):
+    def test_exchange(self):
         """Exchange alternate exchange property persistence test"""
-        broker = self.broker(storeArgs(), name="testExchangeBroker", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="testExchangeBroker", expect=EXPECT_EXIT_OK)
         qmf = Qmf(broker)
-        qmf.addExchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
-        qmf.addExchange("testExch", "direct", durable=True, altExchangeName="altExch")
+        qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
+        qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch")
         broker.terminate()
 
-        broker = self.broker(storeArgs(), name="testExchangeBroker")
+        broker = self.broker(store_args(), name="testExchangeBroker")
         qmf = Qmf(broker)
-        try: qmf.addExchange("altExch", "direct", passive=True)
-        except Exception, e: self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % e)
-        try: qmf.addExchange("testExch", "direct", passive=True)
-        except Exception, e: self.fail("Test exchange (\"testExch\") instance not recovered: %s" % e)
-        self.assertTrue(qmf.queryExchange("testExch", altExchangeName = "altExch"), "Alternate exchange property not found or is incorrect on exchange \"testExch\".")
+        try:
+            qmf.add_exchange("altExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error)
+        try:
+            qmf.add_exchange("testExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Test exchange (\"testExch\") instance not recovered: %s" % error)
+        self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"),
+                        "Alternate exchange property not found or is incorrect on exchange \"testExch\".")
         
-    def testQueue(self):
+    def test_queue(self):
         """Queue alternate exchange property persistexchangeNamece test"""
-        broker = self.broker(storeArgs(), name="testQueueBroker", expect=EXPECT_EXIT_OK)
+        broker = self.broker(store_args(), name="testQueueBroker", expect=EXPECT_EXIT_OK)
         qmf = Qmf(broker)
-        qmf.addExchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
-        qmf.addQueue("testQueue", durable=True, altExchangeName="altExch")
+        qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
+        qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch")
         broker.terminate()
 
-        broker = self.broker(storeArgs(), name="testQueueBroker")
+        broker = self.broker(store_args(), name="testQueueBroker")
         qmf = Qmf(broker)
-        try: qmf.addExchange("altExch", "direct", passive=True)
-        except Exception, e: self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % e)
-        try: qmf.addQueue("testQueue", passive=True)
-        except Exception, e: self.fail("Test queue (\"testQueue\") instance not recovered: %s" % e)
-        self.assertTrue(qmf.queryQueue("testQueue", altExchangeName = "altExch"), "Alternate exchange property not found or is incorrect on queue \"testQueue\".")
+        try:
+            qmf.add_exchange("altExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error)
+        try:
+            qmf.add_queue("testQueue", passive=True)
+        except Exception, error:
+            self.fail("Test queue (\"testQueue\") instance not recovered: %s" % error)
+        self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = "altExch"),
+                        "Alternate exchange property not found or is incorrect on queue \"testQueue\".")
 
 
 class RedeliveredTests(StoreTest):
@@ -225,16 +171,16 @@
     Test the behavior of the redelivered flag in the context of persistence
     """
 
-    def testBrokerRecovery(self):
+    def test_broker_recovery(self):
         """Test that the redelivered flag is set on messages after recovery of broker"""
-        broker = self.broker(storeArgs(), name="testAfterRecover", expect=EXPECT_EXIT_OK)
-        mc = "xyz"*100
-        m = Message(mc, durable=True)
-        broker.send_message("testQueue", m)
+        broker = self.broker(store_args(), name="testAfterRecover", expect=EXPECT_EXIT_OK)
+        msg_content = "xyz"*100
+        msg = Message(msg_content, durable=True)
+        broker.send_message("testQueue", msg)
         broker.terminate()
         
-        broker = self.broker(storeArgs(), name="testAfterRecover")
-        rm = broker.get_message("testQueue")
-        self.assertEqual(mc, rm.content)
-        self.assertTrue(rm.redelivered)
+        broker = self.broker(store_args(), name="testAfterRecover")
+        rcv_msg = broker.get_message("testQueue")
+        self.assertEqual(msg_content, rcv_msg.content)
+        self.assertTrue(rcv_msg.redelivered)
 

Added: store/trunk/cpp/tests/new_python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/flow_to_disk.py	                        (rev 0)
+++ store/trunk/cpp/tests/new_python_tests/flow_to_disk.py	2010-04-13 17:30:22 UTC (rev 3905)
@@ -0,0 +1,1127 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import qpid
+from qpid.brokertest import EXPECT_EXIT_OK, EXPECT_UNKNOWN
+from qpid.datatypes import uuid4
+from store_test import StoreTest, store_args
+from qpid.messaging import Message, SessionError, SendError
+
+class FlowToDisk(StoreTest):
+    """Tests for async store flow-to-disk"""
+    
+    @staticmethod
+    def _broker_name(queue_name, txn_produce, txn_consume):
+        """Create a broker name based on the queue name and the transaction parameters"""
+        name = queue_name
+        if txn_produce:
+            name += "_TxP"
+        if txn_consume:
+            name += "_TxC"
+        return name
+
+    def _tx_simple_limit(self, queue_name, kwargs):
+        """
+        Test a simple case of message limits which will force flow-to-disk.
+        * queue_args sets a limit - either max_count and/or max_size
+        * messages are added. Some will flow to disk.
+        * Consume all messages sent.
+        * Check the broker has no messages left.
+        """
+        # Unpack args
+        txn_produce = kwargs.get("txn_produce", False)
+        txn_consume = kwargs.get("txn_consume", False)
+        recover = kwargs.get("recover", False)
+        max_count = kwargs.get("max_count")
+        max_size = kwargs.get("max_size")
+        policy = kwargs.get("policy", "flow_to_disk")
+        num_msgs = kwargs.get("num_msgs", 15)
+        msg_size = kwargs.get("msg_size", 10)
+        msg_durable = kwargs.get("msg_durable", False)
+        sync = kwargs.get("sync", False)
+        browse = kwargs.get("browse", False)
+        
+        bname = self._broker_name(queue_name, txn_produce, txn_consume)
+        if recover:
+            expect = EXPECT_UNKNOWN
+        else:
+            expect = EXPECT_EXIT_OK
+        broker = self.broker(store_args(), name=bname, expect=expect, log_level="debug+")
+        prod_session = broker.connect().session(transactional=txn_produce)
+        sender = prod_session.sender(self.snd_addr(queue_name, auto_create=True, durable=True, ftd_count=max_count,
+                                                   ftd_size=max_size, policy=policy))
+        
+        # Send messages
+        msgs = []
+        pre_recover_ftd_msgs = []  # msgs released before a recover
+        post_recover_ftd_msgs = [] # msgs released after a recover
+        cum_msg_size = 0
+        for index in range(0, num_msgs):
+            msg = Message(self.make_message(index, msg_size), durable=msg_durable, id=uuid4(),
+                          correlation_id="msg-%04d"%index)
+            #print "Sending msg %s" % msg.id
+            msgs.append(msg)
+            cum_msg_size += msg_size
+            if (max_count != None and index >= max_count) or (max_size != None and cum_msg_size > max_size):
+                pre_recover_ftd_msgs.append(msg)
+            sender.send(msg, sync=sync)
+        if not sync:
+            sender.sync()
+        # Close transaction (if needed)
+        if txn_produce:
+            prod_session.commit()
+        
+        # Browse messages
+        if browse:
+            self.check_messages(broker, queue_name, msgs, browse=True)
+            
+        if recover:
+            broker.terminate()
+            if msg_durable:
+                post_recover_ftd_msgs = pre_recover_ftd_msgs
+            else:
+                del msgs[:] # Transient messages will be discarded on recover
+            old_broker = broker # keep for log analysis
+            broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK, log_level="debug+")
+        
+            # Browse messages after recover
+            if browse:
+                self.check_messages(broker, queue_name, msgs, browse=True)
+        
+        # Consume messages
+        self.check_messages(broker, queue_name, msgs, transactional=txn_consume, empty=True)
+        broker.terminate()
+
+        # Check broker logs for released messages
+        if recover:
+            if txn_produce:
+                self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
+            else:
+                self.check_msg_release(old_broker, pre_recover_ftd_msgs)
+            self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)      
+        else:
+            if txn_produce:
+                self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
+            else:
+                self.check_msg_release(broker, pre_recover_ftd_msgs)
+    
+    def simple_limit(self, queue_name, **kwargs):
+        """Adapter for adding transactions to test"""
+        # Cycle through the produce/consume block transaction combinations
+        for index in range(0, 4):
+            kwargs["txn_produce"] = index & 1 != 0 # Transactional produce
+            kwargs["txn_consume"] = index & 2 != 0 # Transactional consume
+            self._tx_simple_limit(queue_name, kwargs)
+
+class SimpleMaxCountTest(FlowToDisk):
+    """Flow-to-disk tests based on setting max_count"""
+    
+    def test_base(self):
+        """Base test"""
+        self.simple_limit("SimpleMaxCount", max_count=10)
+        
+    def test_recover(self):
+        """Recover test"""
+        self.simple_limit("SimpleMaxCountRecover", max_count=10, recover=True)
+        
+    def test_durable(self):
+        """Durable message test"""
+        self.simple_limit("SimpleMaxCountDurable", max_count=10, msg_durable=True)
+        
+    def test_durable_recover(self):
+        """Durable message recover test"""
+        self.simple_limit("SimpleMaxCountDurableRecover", max_count=10, msg_durable=True, recover=True)
+        
+    def test_browse(self):
+        """Browse test"""
+        self.simple_limit("SimpleMaxCountBrowse", max_count=10, browse=True)
+        
+    def test_browse_recover(self):
+        """Browse before and after recover test"""
+        self.simple_limit("SimpleMaxCountBrowseRecover", max_count=10, browse=True, recover=True)
+        
+    def test_durable_browse(self):
+        """Browse durable message test"""
+        self.simple_limit("SimpleMaxCountDurableBrowse", max_count=10, msg_durable=True, browse=True)
+        
+    def test_durable_browse_recover(self):
+        """Browse durable messages before and after recover"""
+        self.simple_limit("SimpleMaxCountDurableBrowseRecover", max_count=10, msg_durable=True, browse=True,
+                          recover=True)
+       
+    def test_large_msg(self):
+        """Large message test"""   
+        self.simple_limit("SimpleMaxCountLargeMsg", max_count=10, max_size=10000000, num_msgs=100, msg_size=10000)
+       
+    def test_large_msg_recover(self):
+        """Large message test"""   
+        self.simple_limit("SimpleMaxCountLargeMsgRecover", max_count=10, max_size=10000000, num_msgs=100,
+                          msg_size=10000, recover=True)
+    
+    def test_large_msg_durable(self):
+        """Large durable message test"""
+        self.simple_limit("SimpleMaxCountLargeMsgDurable", max_count=10, max_size=10000000, msg_durable=True,
+                          num_msgs=100, msg_size=10000)
+     
+    def test_large_msg_durable_recover(self):
+        """Large durable message test"""
+        self.simple_limit("SimpleMaxCountLargeMsgDurableRecover", max_count=10, max_size=10000000, msg_durable=True,
+                          num_msgs=100, msg_size=10000, recover=True)
+   
+    def test_large_msg_browse(self):
+        """Large message browse test"""
+        self.simple_limit("SimpleMaxCountLargeMsgBrowse", max_count=10, max_size=10000000, browse=True, num_msgs=100,
+                          msg_size=10000)
+    
+    def test_large_msg_browse_recover(self):
+        """Large message browse test"""
+        self.simple_limit("SimpleMaxCountLargeMsgBrowseRecover", max_count=10, max_size=10000000, browse=True,
+                          num_msgs=100, msg_size=10000, recover=True)
+   
+    def test_large_msg_durable_browse(self):
+        """Large durable message browse test""" 
+        self.simple_limit("SimpleMaxCountLargeMsgDurableBrowse", max_count=10, max_size=10000000, msg_durable=True,
+                          browse=True, num_msgs=100, msg_size=10000)
+   
+    def test_large_msg_durable_browse_recover(self):
+        """Large durable message browse test""" 
+        self.simple_limit("SimpleMaxCountLargeMsgDurableBrowseRecover", max_count=10, max_size=10000000,
+                          msg_durable=True, browse=True, num_msgs=100, msg_size=10000, recover=True)
+
+class SimpleMaxSizeTest(FlowToDisk):
+    """Flow-to-disk tests based on setting max_size"""
+    
+    def test_base(self):
+        """Base test"""
+        self.simple_limit("SimpleMaxSize", max_size=100)
+        
+    def test_recover(self):
+        """Recover test"""
+        self.simple_limit("SimpleMaxSizeRecover", max_size=100, recover=True)
+        
+    def test_durable(self):
+        """Durable message test"""
+        self.simple_limit("SimpleMaxSizeDurable", max_size=100, msg_durable=True)
+        
+    def test_durable_recover(self):
+        """Durable message recover test"""
+        self.simple_limit("SimpleMaxSizeDurable", max_size=100, msg_durable=True, recover=True)
+        
+    def test_browse(self):
+        """Browse test"""
+        self.simple_limit("SimpleMaxSizeBrowse", max_size=100, browse=True)
+        
+    def test_browse_recover(self):
+        """Browse before and after recover test"""
+        self.simple_limit("SimpleMaxSizeBrowseRecover", max_size=100, browse=True, recover=True)
+        
+    def test_durable_browse(self):
+        """Browse durable message test"""
+        self.simple_limit("SimpleMaxSizeDurableBrowse", max_size=100, msg_durable=True, browse=True)
+        
+    def test_durable_browse_recover(self):
+        """Browse durable messages before and after recover"""
+        self.simple_limit("SimpleMaxSizeDurableBrowseRecover", max_size=100, msg_durable=True, browse=True,
+                          recover=True)
+        
+    def test_large_msg(self):
+        """Large message test"""   
+        self.simple_limit("SimpleMaxSizeLargeMsg", max_size=100000, num_msgs=100, msg_size=10000)
+       
+    def test_large_msg_recover(self):
+        """Large message test"""   
+        self.simple_limit("SimpleMaxSizeLargeMsgRecover", max_size=100000, num_msgs=100, msg_size=10000, recover=True)
+     
+    def test_large_msg_durable(self):
+        """Large durable message test"""
+        self.simple_limit("SimpleMaxSizeLargeMsgDurable", max_size=100000, msg_durable=True, num_msgs=100,
+                          msg_size=10000)
+     
+    def test_large_msg_durable_recover(self):
+        """Large durable message test"""
+        self.simple_limit("SimpleMaxSizeLargeMsgDurableRecover", max_size=100000, msg_durable=True, num_msgs=100,
+                          msg_size=10000, recover=True)
+   
+    def test_large_msg_browse(self):
+        """Large message browse test"""
+        self.simple_limit("SimpleMaxSizeLargeMsgBrowse", max_size=100, browse=True, num_msgs=100, msg_size=10000)
+    
+    def test_large_msg_browse_recover(self):
+        """Large message browse test"""
+        self.simple_limit("SimpleMaxSizeLargeMsgBrowseRecover", max_size=100, browse=True, num_msgs=100, msg_size=10000,
+                          recover=True)
+   
+    def test_large_msg_durable_browse(self):
+        """Large durable message browse test""" 
+        self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowse", max_size=100, msg_durable=True, browse=True,
+                          num_msgs=100, msg_size=10000)
+   
+    def test_large_msg_durable_browse_recover(self):
+        """Large durable message browse test""" 
+        self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowseRecover", max_size=100, msg_durable=True, browse=True,
+                          num_msgs=100, msg_size=10000, recover=True)
+
+class SimpleMaxSizeCountTest(FlowToDisk):
+    """Flow-to-disk tests based on setting both max_count and max_size at the same time"""
+    
+    def test_base(self):
+        """Base test"""
+        self.simple_limit("MaxSizeMaxCount", max_count=10, max_size=1000)
+        
+    def test_recover(self):
+        """Recover test"""
+        self.simple_limit("MaxSizeMaxCountRecover", max_count=10, max_size=1000, recover=True)
+        
+    def test_durable(self):
+        """Durable message test"""
+        self.simple_limit("MaxSizeMaxCountDurable", max_count=10, max_size=1000, msg_size=250)
+        
+    def test_durable_recover(self):
+        """Durable message recover test"""
+        self.simple_limit("MaxSizeMaxCountDurableRecover", max_count=10, max_size=1000, msg_size=250, recover=True)
+        
+    def test_browse(self):
+        """Browse test"""
+        self.simple_limit("MaxSizeMaxCountBrowse", max_count=10, max_size=1000, browse=True)
+        
+    def test_browse_recover(self):
+        """Browse before and after recover test"""
+        self.simple_limit("MaxSizeMaxCountBrowseRecover", max_count=10, max_size=1000, browse=True, recover=True)
+        
+    def test_durable_browse(self):
+        """Browse durable message test"""
+        self.simple_limit("MaxSizeMaxCountDurableBrowse", max_count=10, max_size=1000, msg_size=250, browse=True)
+        
+    def test_durable_browse_recover(self):
+        """Browse durable messages before and after recover"""
+        self.simple_limit("MaxSizeMaxCountDurableBrowseRecover", max_count=10, max_size=1000, msg_size=250, browse=True,
+                          recover=True)
+
+# ======================================================================================================================
+
+class MultiQueueFlowToDisk(FlowToDisk):
+    """Tests for async store flow-to-disk involving multiple queues"""
+    
+    def _multi_queue_setup(self, queue_map, broker, exchange_name, txn_produce, txn_consume, policy, exclusive = False):
+        """Create the send session and receive sessions for multi-queue scenarios"""
+        connection = broker.connect()
+        snd_session = connection.session(transactional=txn_produce)
+        addr = self.snd_addr(exchange_name, topic_flag=True, exchage_type="fanout")
+        #print "snd_addr=\"%s\"" % addr
+        sndr = snd_session.sender(addr)
+        for queue_name, queue_properties in queue_map.iteritems():
+            if "durable" in queue_properties.keys():
+                durable = queue_properties["durable"]
+            else:
+                durable = False
+            max_count = None
+            if "max_count" in queue_properties.keys():
+                max_count = queue_properties["max_count"]
+            max_size = None
+            if "max_size" in queue_properties.keys():
+                max_size = queue_properties["max_size"]
+            rcv_session = connection.session(transactional=txn_consume)
+            addr = self.rcv_addr(exchange_name, auto_create=False, link_name=queue_name, durable=durable,
+                                 exclusive=exclusive, ftd_count=max_count, ftd_size=max_size, policy=policy)
+            #print "rcv_addr=\"%s\"" % addr
+            rcv_session.receiver(addr)
+        return snd_session, sndr
+
+    @staticmethod
+    def _make_policy_dict(src, marker, delim=";"):
+        """Create a dictionary of key/value strings from a formatted string src of the form
+        '... marker key1=val1, key2=val2, ..., keyN=valN delimiter ...'
+        where the portion of interest starts at marker m until the following delimiter d (default: ';')."""
+        pos = src.find(marker) + len(marker)
+        res = []
+        for index in src[pos:src.find(delim, pos)].split():
+            if "=" in index:
+                res.append(index.strip(",").split("="))
+        if len(res) > 0:
+            return dict(res)
+    
+    @staticmethod
+    def _make_policy_val(src, marker, delim=";"):
+        """Return a string value from a formatted string of the form '... marker val delimiter ...' where the value
+        lies between marker and delimiter d (default: ';')"""
+        pos = src.find(marker) + len(marker)
+        return src[pos:src.find(delim, pos)].strip()
+    
+    @staticmethod
+    def _check_error(error_str, fail_list=None):
+        """Check a policy exception string to ensure the failure occurred on the expected queue and at the expected
+        count."""
+        if error_str.startswith("resource-limit-exceeded"):
+            fail_policy = MultiQueueFlowToDisk._make_policy_val(error_str, "type=", delim="(")
+            fail_queue_name = MultiQueueFlowToDisk._make_policy_val(error_str, "Policy exceeded on ", delim=",")
+            fail_count_dict = MultiQueueFlowToDisk._make_policy_dict(error_str, "count: ")
+            fail_size_dict = MultiQueueFlowToDisk._make_policy_dict(error_str, "size: ")
+            #print "+++", fail_policy, fail_queue_name, fail_count_dict, fail_size_dict
+            #print "===", fail_list
+            if fail_list == None:
+                return False # Not expected - no failure should have occurred
+            for fail in fail_list:
+                if fail_queue_name == fail["queue"]:
+                    #print "<<<", fail
+                    if fail_policy != fail["type"]:
+                        return False
+                    if (fail_count_dict != None and "count" in fail and \
+                        int(fail_count_dict["current"]) != fail["count"]) \
+                       or \
+                       (fail_size_dict != None and "size" in fail and int(fail_size_dict["current"]) != fail["size"]):
+                        return False
+                    #print ">>> Failure expected"
+                    return True
+        return False
+    
+    @staticmethod
+    def _check_send_error(error, fail_list=None):
+        """Check that an error is a SendError which in turn contains a qpid.ops.ExecutionException."""
+        if not isinstance(error.args[0], qpid.ops.ExecutionException):
+            return False
+        if "description" not in error.args[0].args():
+            return False
+        return MultiQueueFlowToDisk._check_error(error.args[0].args()["description"], fail_list)
+    
+    @staticmethod
+    def _check_session_error(error, txn=False):
+        """Check that an error is a SessionError which in turn contains a qpid.ops.ExecutionException."""
+        if not isinstance(error.args[0], qpid.ops.ExecutionException):
+            return False
+        if "description" not in error.args[0].args():
+            return False
+        if txn and error.args[0].args()["description"].startswith("internal-error: Commit failed"):
+            #print ">>> Txn commit failure: expected"
+            return True
+        return False
+    
+    @staticmethod
+    def _is_queue_durable(queue_map, index):
+        """Return true if the indexed queue is durable (indexed by queue_map.keys() or queue_map.values())"""
+        return "durable" in queue_map.values()[index] and queue_map.values()[index]["durable"]
+    
+    @staticmethod
+    def _expected_msg_loss(fail_list):
+        """Examine the fail_list for expected failures and return a tuple containing the expected failure conditions""" 
+        count_exp_loss = None
+        count_exp_loss_queues = None
+        size_exp_loss = None
+        size_exp_loss_queues = None
+        if fail_list != None:
+            for fail in fail_list:
+                if "count" in fail:
+                    this_count = fail["count"]
+                    if count_exp_loss == None:
+                        count_exp_loss = this_count
+                        count_exp_loss_queues = [fail["queue"]]
+                    elif this_count < count_exp_loss:
+                        count_exp_loss = this_count
+                        count_exp_loss_queues = [fail["queue"]]
+                    elif this_count == count_exp_loss:
+                        count_exp_loss_queues.append(fail["queue"])
+                if "size" in fail:
+                    this_size = fail["size"]
+                    if size_exp_loss == None:
+                        size_exp_loss = this_size
+                        size_exp_loss_queues = [fail["queue"]]
+                    elif this_size < size_exp_loss:
+                        size_exp_loss = this_size
+                        size_exp_loss_queues = [fail["queue"]]
+                    elif this_size == size_exp_loss:
+                        size_exp_loss_queues.append(fail["queue"])
+        return (count_exp_loss, count_exp_loss_queues, size_exp_loss, size_exp_loss_queues)
+ 
+    @staticmethod
+    def _expected_msg_ftd(queue_map):
+        max_count = None
+        max_size = None
+        for queue_props in queue_map.itervalues():
+            if "durable" in queue_props and queue_props["durable"]:
+                if "max_count" in queue_props and queue_props["max_count"] != None and \
+                   (max_count == None or queue_props["max_count"] < max_count):
+                    max_count = queue_props["max_count"]
+                if "max_size" in queue_props and queue_props["max_size"] != None and \
+                   (max_size == None or queue_props["max_size"] < max_size):
+                    max_size = queue_props["max_size"]
+        return (max_count, max_size)
+        
+ 
+    def tx_multi_queue_limit(self, broker_base_name, queue_map, exchange_name, **kwargs):
+        """ Test a multi-queue case
+        queue_map = queue map where map is queue name (key) against queue args (value)
+        """
+        # Unpack args
+        msg_durable = kwargs.get("msg_durable", False)
+        num_msgs = kwargs.get("num_msgs", 15)
+        msg_size = kwargs.get("msg_size", 10)
+        txn_produce = kwargs.get("txn_produce", False)
+        txn_consume = kwargs.get("txn_consume", False)
+        browse = kwargs.get("browse", False)
+        policy = kwargs.get("policy", "flow_to_disk")
+        recover = kwargs.get("recover", False)
+        sync = kwargs.get("sync", False)
+        fail_list = kwargs.get("fail_list")
+        
+        bname = self._broker_name(broker_base_name, txn_produce, txn_consume)
+        broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK, log_level="debug+")
+        #print "+++ Started broker %s" % bname
+        snd_session, sndr = self._multi_queue_setup(queue_map, broker, exchange_name, txn_produce, txn_consume, policy)
+        
+        # Find expected limits
+        count_exp_loss, count_exp_loss_queues, size_exp_loss, size_exp_loss_queues = self._expected_msg_loss(fail_list)
+        max_count, max_size = self._expected_msg_ftd(queue_map)
+        
+        # Send messages
+        try: 
+            msgs = []
+            pre_recover_ftd_msgs = []  # msgs released before a recover
+            post_recover_ftd_msgs = [] # msgs released after a recover
+            cum_msg_size = 0
+            target_queues = []
+            for index in range(0, num_msgs):
+                msg = Message(self.make_message(index, msg_size), durable=msg_durable, id=uuid4(),
+                            correlation_id="msg-%04d"%index)
+                #print "Sending msg %s" % msg.id
+                sndr.send(msg, sync=sync)
+                if msg_size != None:
+                    cum_msg_size += msg_size
+                if count_exp_loss != None and index >= count_exp_loss:
+                    target_queues.extend(count_exp_loss_queues)
+                if size_exp_loss != None and cum_msg_size > size_exp_loss:
+                    target_queues.extend(size_exp_loss_queues)
+                if (count_exp_loss == None or index < count_exp_loss) and \
+                   (size_exp_loss == None or cum_msg_size <= size_exp_loss):
+                    msgs.append(msg)
+                if (max_count != None and index >= max_count) or (max_size != None and cum_msg_size > max_size):
+                    pre_recover_ftd_msgs.append(msg)
+            if not sync:
+                sndr.sync()
+            if txn_produce:
+                snd_session.commit()
+        except SendError, error:
+            if not self._check_send_error(error, fail_list):
+                raise
+        except SessionError, error:
+            msgs[:] = [] # Transaction failed, all messages lost
+            if not self._check_session_error(error, txn_produce):
+                raise
+
+        # Browse messages
+        if browse: 
+            for index in range(0, len(queue_map)):
+                self.check_messages(broker, queue_map.keys()[index], msgs, browse=True)
+            
+        if recover:
+            broker.terminate()
+            #print "--- Terminated broker %s" % bname
+            if msg_durable:
+                post_recover_ftd_msgs = pre_recover_ftd_msgs
+            else:
+                del msgs[:] # Transient messages will be discarded on recover
+            old_broker = broker # keep for log analysis
+            broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK, log_level="debug+")
+            #print "+++ Restarted broker %s" % bname
+            # Browse messages
+            if browse: 
+                for index in range(0, len(queue_map)):
+                    empty = not self._is_queue_durable(queue_map, index)
+                    self.check_messages(broker, queue_map.keys()[index], msgs, browse=True, emtpy_flag=empty)
+        
+        # Consume messages
+        for index in range(0, len(queue_map)):
+            empty_chk = txn_produce or queue_map.keys()[index] in target_queues
+            empty = recover and not self._is_queue_durable(queue_map, index)
+            self.check_messages(broker, queue_map.keys()[index], msgs, transactional=txn_consume, empty=empty_chk,
+                                emtpy_flag=empty)
+
+        broker.terminate()
+        #print "--- Stopped broker %s" % bname
+
+        # Check broker logs for released messages
+        if recover:
+            if txn_produce:
+                if msg_durable:
+                    self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
+                else:
+                    self.check_msg_block_on_commit(old_broker, pre_recover_ftd_msgs)
+            else:
+                if msg_durable:
+                    self.check_msg_release(old_broker, pre_recover_ftd_msgs)
+                else:
+                    self.check_msg_block(old_broker, pre_recover_ftd_msgs)
+            self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)      
+        else:
+            if txn_produce:
+                if msg_durable:
+                    self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
+                else:
+                    self.check_msg_block_on_commit(broker, pre_recover_ftd_msgs)
+            else:
+                if msg_durable:
+                    self.check_msg_release(broker, pre_recover_ftd_msgs)
+                else:
+                    self.check_msg_block(broker, pre_recover_ftd_msgs)
+
+#    def multi_queue_limit(self, broker_name, queue_map, exchange_name, **kwargs):
+#        """Adapter for adding transactions to test"""
+#        # Cycle through the produce/consume block transaction combinations
+#        for index in range(0, 4):
+#            kwargs["txn_produce"] = index & 1 != 0 # Transactional produce
+#            kwargs["txn_consume"] = index & 2 != 0 # Transactional consume
+#            self._tx_multi_queue_limit(broker_name, queue_map, exchange_name, kwargs)
+
+    # --- Parameterized test methods ---
+    
+    def no_limit(self, num, queue_durable=False, msg_durable=False, browse=False, recover=False, txn_produce=False,
+                 txn_consume=False):
+        """No policy test"""
+        queue_map_1 = {"a%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": None},
+                       "b%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": None} }
+        self.tx_multi_queue_limit("MultiQueue_NoLimit", queue_map_1, exchange_name="Fanout_a%02d" % num,
+                                  msg_durable=msg_durable, browse=browse, recover=recover, txn_produce=txn_produce,
+                                  txn_consume=txn_consume)
+
+    def max_count(self, num, queue_durable=False, msg_durable=False, browse=False, recover=False, txn_produce=False,
+                  txn_consume=False):
+        """Count policy test"""
+        queue_map_2 = {"c%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": None}, 
+                       "d%02d" % num : {"durable":queue_durable, "max_count":10,   "max_size": None} }
+        fail_list = None
+        if not queue_durable:
+            fail_list = [{"queue":"d%02d" % num, "type":"reject", "count":10}]
+        self.tx_multi_queue_limit("MultiQueue_MaxCount", queue_map_2, exchange_name="Fanout_b%02d" % num,
+                                  msg_durable=msg_durable, browse=browse, recover=recover, fail_list=fail_list,
+                                  txn_produce=txn_produce, txn_consume=txn_consume)
+
+    def max_size(self, num, queue_durable=False, msg_durable=False, browse=False, recover=False, txn_produce=False,
+                 txn_consume=False):
+        """Size policy test"""
+        queue_map_3 = {"e%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": None}, 
+                       "f%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": 1000} }
+        fail_list = None
+        if not queue_durable:
+            fail_list = [{"queue":"f%02d" % num, "type":"reject", "size":1000}]
+        self.tx_multi_queue_limit("MultiQueue_MaxSize", queue_map_3, exchange_name="Fanout_c%02d" % num, msg_size=100,
+                                  msg_durable=msg_durable, browse=browse, recover=recover, fail_list=fail_list,
+                                  txn_produce=txn_produce, txn_consume=txn_consume)
+    
+    def dual_max_count(self, num, queue_durable=False, msg_durable=False, browse=False, recover=False,
+                       txn_produce=False, txn_consume=False):
+        """Multiple count policy test"""
+        queue_map_4 = {"g%02d" % num : {"durable":queue_durable, "max_count":10,   "max_size": None}, 
+                       "h%02d" % num : {"durable":queue_durable, "max_count":8,    "max_size": None} }
+        fail_list = None
+        if not queue_durable:
+            fail_list = [{"queue":"h%02d" % num, "type":"reject", "count":8}]
+        self.tx_multi_queue_limit("MultiQueue_DualMaxCount", queue_map_4, exchange_name="Fanout_d%02d" % num,
+                                  msg_durable=msg_durable, browse=browse, recover=recover, fail_list=fail_list,
+                                  txn_produce=txn_produce, txn_consume=txn_consume)
+    
+    def dual_max_size(self, num, queue_durable=False, msg_durable=False, browse=False, recover=False, txn_produce=False,
+                      txn_consume=False):
+        """Multiple size policy test"""
+        queue_map_5 = {"i%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": 1000}, 
+                       "j%02d" % num : {"durable":queue_durable, "max_count":None, "max_size":  800} }
+        fail_list = None
+        if not queue_durable:
+            fail_list = [{"queue":"j%02d" % num, "type":"reject", "size":800}]
+        self.tx_multi_queue_limit("MultiQueue_DualMaxSize", queue_map_5, exchange_name="Fanout_e%02d" % num,
+                                  msg_size=100, msg_durable=msg_durable, browse=browse, recover=recover,
+                                  fail_list=fail_list, txn_produce=txn_produce, txn_consume=txn_consume)
+    
+    def mixed_limit_1(self, num, queue_durable=False, msg_durable=False, browse=False, recover=False, txn_produce=False,
+                      txn_consume=False):
+        """Both count and size polices active with the same queue having equal probabilities of triggering the
+        policy"""
+        queue_map_6 = {"k%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": None}, 
+                       "l%02d" % num : {"durable":queue_durable, "max_count":10,   "max_size": None},
+                       "m%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": 1000},
+                       "n%02d" % num : {"durable":queue_durable, "max_count":8,    "max_size":  800} }
+        fail_list = None
+        if not queue_durable:
+            fail_list = [{"queue":"n%02d" % num, "type":"reject", "count":8, "size":800}]
+        self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_6, exchange_name="Fanout_f%02d" % num,
+                                  msg_size=100, msg_durable=msg_durable, browse=browse, recover=recover,
+                                  fail_list=fail_list, txn_produce=txn_produce, txn_consume=txn_consume)
+    
+    def mixed_limit_2(self, num, queue_durable=False, msg_durable=False, browse=False, recover=False, txn_produce=False,
+                      txn_consume=False):
+        """Both count and size polices active with different queues having equal probabilities of triggering the
+        policy"""
+        queue_map_7 = {"o%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": None}, 
+                       "p%02d" % num : {"durable":queue_durable, "max_count":10,   "max_size": None},
+                       "q%02d" % num : {"durable":queue_durable, "max_count":None, "max_size": 800},
+                       "r%02d" % num : {"durable":queue_durable, "max_count":8,    "max_size": 1000} }
+        fail_list = None
+        if not queue_durable:
+            fail_list = [{"queue":"q%02d" % num, "type":"reject", "size":800},
+                         {"queue":"r%02d" % num, "type":"reject", "count":8,}]
+        self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_7, exchange_name="Fanout_g%02d" % num,
+                                  msg_size=100, msg_durable=msg_durable, browse=browse, recover=recover,
+                                  fail_list=fail_list, txn_produce=txn_produce, txn_consume=txn_consume)
+
+    # --- Non-parameterized test methods - these will be run by Python test framework ---
+    
+    _num = None
+    _queue_durable = False
+    _msg_durable = False
+    _browse = False
+    _recover = False
+    _txn_produce = False
+    _txn_consume = False
+    
+    def test_no_limit(self):
+        """No policy test (non-parameterized)"""
+        self.no_limit(self._num, queue_durable=self._queue_durable, msg_durable=self._msg_durable, browse=self._browse,
+                      recover=self._recover, txn_produce=self._txn_produce, txn_consume=self._txn_consume)
+        
+    def test_max_count(self):
+        """Count policy test (non-parameterized)"""
+        self.max_count(self._num, queue_durable=self._queue_durable, msg_durable=self._msg_durable, browse=self._browse,
+                       recover=self._recover, txn_produce=self._txn_produce, txn_consume=self._txn_consume)
+        
+    def test_max_size(self):
+        """Size policy test (non-parameterized)"""
+        self.max_size(self._num, queue_durable=self._queue_durable, msg_durable=self._msg_durable, browse=self._browse,
+                      recover=self._recover, txn_produce=self._txn_produce, txn_consume=self._txn_consume)
+        
+    def test_dual_max_count(self):
+        """Multiple count policy test (non-parameterized)"""
+        self.dual_max_count(self._num, queue_durable=self._queue_durable, msg_durable=self._msg_durable,
+                            browse=self._browse, recover=self._recover, txn_produce=self._txn_produce,
+                            txn_consume=self._txn_consume)
+        
+    def test_dual_max_size(self):
+        """Multiple size policy test (non-parameterized)"""
+        self.dual_max_size(self._num, queue_durable=self._queue_durable, msg_durable=self._msg_durable,
+                           browse=self._browse, recover=self._recover, txn_produce=self._txn_produce,
+                           txn_consume=self._txn_consume)
+        
+    def test_mixed_limit_1(self):
+        """Both count and size polices active with the same queue having equal probabilities of triggering the
+        policy (non-parameterized)"""
+        self.mixed_limit_1(self._num, queue_durable=self._queue_durable, msg_durable=self._msg_durable,
+                           browse=self._browse, recover=self._recover, txn_produce=self._txn_produce,
+                           txn_consume=self._txn_consume)
+        
+    def test_mixed_limit_2(self):
+        """Both count and size polices active with different queues having equal probabilities of triggering the
+        policy (non-parameterized)"""
+        self.mixed_limit_2(self._num, queue_durable=self._queue_durable, msg_durable=self._msg_durable,
+                           browse=self._browse, recover=self._recover, txn_produce=self._txn_produce,
+                           txn_consume=self._txn_consume)
+    
+# --- Tests ---
+        
+class MultiQueueTest(MultiQueueFlowToDisk):
+    _num = 1
+ 
+class MultiDurableQueueTest(MultiQueueFlowToDisk):
+    _num = 2
+    _queue_durable = True
+ 
+class MultiQueueDurableMsgTest(MultiQueueFlowToDisk):
+    _num = 3
+    _msg_durable = True
+ 
+class MultiDurableQueueDurableMsgTest(MultiQueueFlowToDisk):
+    _num = 4
+    _queue_durable = True
+    _msg_durable = True
+        
+class MultiQueueBrowseTest(MultiQueueFlowToDisk):
+    _num = 5
+    _browse = True
+ 
+class MultiDurableQueueBrowseTest(MultiQueueFlowToDisk):
+    _num = 6
+    _queue_durable = True
+    _browse = True
+ 
+class MultiQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
+    _num = 7
+    _msg_durable = True
+    _browse = True
+
+class MultiDurableQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
+    _num = 8
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+        
+class MultiQueueRecoverTest(MultiQueueFlowToDisk):
+    _num = 9
+    _recover = True
+ 
+class MultiDurableQueueRecoverTest(MultiQueueFlowToDisk):
+    _num = 10
+    _queue_durable = True
+    _recover = True
+ 
+class MultiQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
+    _num = 11
+    _msg_durable = True
+    _recover = True
+ 
+class MultiDurableQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
+    _num = 12
+    _queue_durable = True
+    _msg_durable = True
+    _recover = True
+        
+class MultiQueueBrowseRecoverTest(MultiQueueFlowToDisk):
+    _num = 13
+    _browse = True
+    _recover = True
+ 
+class MultiDurableQueueBrowseRecoverTest(MultiQueueFlowToDisk):
+    _num = 14
+    _queue_durable = True
+    _browse = True
+    _recover = True
+ 
+class MultiQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
+    _num = 15
+    _msg_durable = True
+    _browse = True
+    _recover = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
+    _num = 16
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+    _recover = True
+         
+class MultiQueueTxPTest(MultiQueueFlowToDisk):
+    _num = 17
+    _txn_produce = True
+ 
+class MultiDurableQueueTxPTest(MultiQueueFlowToDisk):
+    _num = 18
+    _queue_durable = True
+    _txn_produce = True
+ 
+class MultiQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
+    _num = 19
+    _msg_durable = True
+    _txn_produce = True
+ 
+class MultiDurableQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
+    _num = 20
+    _queue_durable = True
+    _msg_durable = True
+    _txn_produce = True
+        
+class MultiQueueBrowseTxPTest(MultiQueueFlowToDisk):
+    _num = 21
+    _browse = True
+    _txn_produce = True
+ 
+class MultiDurableQueueBrowseTxPTest(MultiQueueFlowToDisk):
+    _num = 22
+    _queue_durable = True
+    _browse = True
+    _txn_produce = True
+ 
+class MultiQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
+    _num = 23
+    _msg_durable = True
+    _browse = True
+    _txn_produce = True
+
+class MultiDurableQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
+    _num = 24
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+    _txn_produce = True
+        
+class MultiQueueRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 25
+    _recover = True
+    _txn_produce = True
+ 
+class MultiDurableQueueRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 26
+    _queue_durable = True
+    _recover = True
+    _txn_produce = True
+ 
+class MultiQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 27
+    _msg_durable = True
+    _recover = True
+    _txn_produce = True
+ 
+class MultiDurableQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 28
+    _queue_durable = True
+    _msg_durable = True
+    _recover = True
+    _txn_produce = True
+        
+class MultiQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 29
+    _browse = True
+    _recover = True
+    _txn_produce = True
+ 
+class MultiDurableQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 30
+    _queue_durable = True
+    _browse = True
+    _recover = True
+    _txn_produce = True
+ 
+class MultiQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 31
+    _msg_durable = True
+    _browse = True
+    _recover = True
+    _txn_produce = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+    _num = 32
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+    _recover = True
+    _txn_produce = True
+        
+class MultiQueueTxCTest(MultiQueueFlowToDisk):
+    _num = 33
+    _txn_consume = True
+ 
+class MultiDurableQueueTxCTest(MultiQueueFlowToDisk):
+    _num = 34
+    _queue_durable = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
+    _num = 35
+    _msg_durable = True
+    _txn_consume = True
+ 
+class MultiDurableQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
+    _num = 36
+    _queue_durable = True
+    _msg_durable = True
+    _txn_consume = True
+        
+class MultiQueueBrowseTxCTest(MultiQueueFlowToDisk):
+    _num = 37
+    _browse = True
+    _txn_consume = True
+ 
+class MultiDurableQueueBrowseTxCTest(MultiQueueFlowToDisk):
+    _num = 38
+    _queue_durable = True
+    _browse = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
+    _num = 39
+    _msg_durable = True
+    _browse = True
+    _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
+    _num = 40
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+    _txn_consume = True
+        
+class MultiQueueRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 41
+    _recover = True
+    _txn_consume = True
+ 
+class MultiDurableQueueRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 42
+    _queue_durable = True
+    _recover = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 43
+    _msg_durable = True
+    _recover = True
+    _txn_consume = True
+ 
+class MultiDurableQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 44
+    _queue_durable = True
+    _msg_durable = True
+    _recover = True
+    _txn_consume = True
+        
+class MultiQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 45
+    _browse = True
+    _recover = True
+    _txn_consume = True
+ 
+class MultiDurableQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 46
+    _queue_durable = True
+    _browse = True
+    _recover = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 47
+    _msg_durable = True
+    _browse = True
+    _recover = True
+    _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+    _num = 48
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+    _recover = True
+    _txn_consume = True
+         
+class MultiQueueTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 49
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiDurableQueueTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 50
+    _queue_durable = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 51
+    _msg_durable = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiDurableQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 52
+    _queue_durable = True
+    _msg_durable = True
+    _txn_produce = True
+    _txn_consume = True
+        
+class MultiQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 53
+    _browse = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiDurableQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 54
+    _queue_durable = True
+    _browse = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 55
+    _msg_durable = True
+    _browse = True
+    _txn_produce = True
+    _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 56
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+    _txn_produce = True
+    _txn_consume = True
+        
+class MultiQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 57
+    _recover = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiDurableQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 58
+    _queue_durable = True
+    _recover = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 59
+    _msg_durable = True
+    _recover = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiDurableQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 60
+    _queue_durable = True
+    _msg_durable = True
+    _recover = True
+    _txn_produce = True
+    _txn_consume = True
+        
+class MultiQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 61
+    _browse = True
+    _recover = True
+    _txn_produce = True
+ 
+class MultiDurableQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 62
+    _queue_durable = True
+    _browse = True
+    _recover = True
+    _txn_produce = True
+    _txn_consume = True
+ 
+class MultiQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 63
+    _msg_durable = True
+    _browse = True
+    _recover = True
+    _txn_produce = True
+    _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+    _num = 64
+    _queue_durable = True
+    _msg_durable = True
+    _browse = True
+    _recover = True
+    _txn_produce = True
+    _txn_consume = True
+  
+    # --- Long and randomized tests ---
+        
+#    def test_12_Randomized(self):
+#        """Randomized flow-to-disk tests"""
+#        seed = long(1000.0 * time.time())
+#        print "seed=0x%x" % seed
+#        random.seed(seed)
+#        for index in range(0, 10):
+#            self.randomLimit(index)

Added: store/trunk/cpp/tests/new_python_tests/store_test.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/store_test.py	                        (rev 0)
+++ store/trunk/cpp/tests/new_python_tests/store_test.py	2010-04-13 17:30:22 UTC (rev 3905)
@@ -0,0 +1,407 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+ USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import re
+from qpid.brokertest import BrokerTest
+from qpid.messaging import Empty
+from qmf.console import Session
+
+    
+def store_args():
+    """Return the broker args necessary to load the async store"""
+    assert BrokerTest.store_lib 
+    return ["--load-module", BrokerTest.store_lib]
+
+class Qmf:
+    """
+    QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available.
+    """
+    def __init__(self, broker):
+        self.__session = Session()
+        self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port())
+
+    def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None, passive=False, durable=False,
+                     arguments = None):
+        """Add a new exchange"""
+        amqp_session = self.__broker.getAmqpSession()
+        if arguments == None:
+            arguments = {}
+        if alt_exchange_name:
+            amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
+                                          alternate_exchange=alt_exchange_name, passive=passive, durable=durable,
+                                          arguments=arguments)
+        else:
+            amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable,
+                                          arguments=arguments)
+    
+    def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None):
+        """Add a new queue"""
+        amqp_session = self.__broker.getAmqpSession()
+        if arguments == None:
+            arguments = {}
+        if alt_exchange_name:
+            amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name, passive=passive,
+                                       durable=durable, arguments=arguments)
+        else:
+            amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments)
+    
+    def delete_queue(self, queue_name):
+        """Delete an existing queue"""
+        amqp_session = self.__broker.getAmqpSession()
+        amqp_session.queue_delete(queue_name)
+
+    def _query(self, name, _class, package, alt_exchange_name=None):
+        """Qmf query function which can optionally look for the presence of an alternate exchange name"""
+        try:
+            obj_list = self.__session.getObjects(_class=_class, _package=package)
+            found = False
+            for obj in obj_list:
+                if obj.name == name:
+                    found = True
+                    if alt_exchange_name != None:
+                        alt_exch_list = self.__session.getObjects(_objectId=obj.altExchange)
+                        if len(alt_exch_list) == 0 or alt_exch_list[0].name != alt_exchange_name:
+                            return False
+                    break
+            return found
+        except Exception:
+            return False
+                
+
+    def query_exchange(self, exchange_name, alt_exchange_name=None):
+        """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
+        value."""
+        return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name)
+    
+    def query_queue(self, queue_name, alt_exchange_name=None):
+        """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
+        value."""
+        return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name)
+    
+    def queue_message_count(self, queue_name):
+        """Query the number of messages on a queue"""
+        queue_list = self.__session.getObjects(_class="queue", _name=queue_name)
+        if len(queue_list):
+            return queue_list[0].msgDepth
+    
+    def queue_empty(self, queue_name):
+        """Check if a queue is empty (has no messages waiting)"""
+        return self.queue_message_count(queue_name) == 0
+
+
+class StoreTest(BrokerTest):
+    """
+    This subclass of BrokerTest adds some convenience test/check functions
+    """
+    
+    def _chk_empty(self, queue, receiver):
+        """Check if a queue is empty (has no more messages)"""
+        try:
+            msg = receiver.fetch(timeout=0)
+            self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg))
+        except Empty:
+            pass
+
+    @staticmethod
+    def make_message(msg_count, msg_size):
+        """Make message content. Format: 'abcdef....' followed by 'msg-NNNN', where NNNN is the message count"""
+        msg = "msg-%04d" % msg_count
+        msg_len = len(msg)
+        buff = ""
+        if msg_size != None and msg_size > msg_len:
+            for index in range(0, msg_size - msg_len):
+                if index == msg_size - msg_len - 1:
+                    buff += "-"
+                else:
+                    buff += chr(ord('a') + (index % 26))
+        return buff + msg
+    
+    # Functions for formatting address strings
+    
+    @staticmethod
+    def _fmt_csv(string_list, list_braces = None):
+        """Format a list using comma-separation. Braces are optionally added."""
+        if len(string_list) == 0:
+            return ""
+        first = True
+        str_ = ""
+        if list_braces != None:
+            str_ += list_braces[0]
+        for string in string_list:
+            if string != None:
+                if first:
+                    first = False
+                else:
+                    str_ += ", "
+                str_ += string
+        if list_braces != None:
+            str_ += list_braces[1]
+        return str_
+    
+    def _fmt_map(self, string_list):
+        """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map
+        element('key:val')."""
+        return self._fmt_csv(string_list, list_braces="{}") 
+    
+    def _fmt_list(self, string_list):
+        """Format a list [l1, l2, l3, ...] from a string list."""
+        return self._fmt_csv(string_list, list_braces="[]") 
+    
+    def addr_fmt(self, node_name, **kwargs):
+        """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address
+        string."""
+        # Get keyword args
+        node_subject = kwargs.get("node_subject")
+        create_policy = kwargs.get("create_policy")
+        delete_policy = kwargs.get("delete_policy")
+        assert_policy = kwargs.get("assert_policy")
+        mode = kwargs.get("mode")
+        link = kwargs.get("link", False)
+        link_name = kwargs.get("link_name")
+        node_type = kwargs.get("node_type")
+        durable = kwargs.get("durable", False)
+        link_reliability = kwargs.get("link_reliability")
+        x_declare_list = kwargs.get("x_declare_list", [])
+        x_bindings_list = kwargs.get("x_bindings_list", [])
+        x_subscribe_list = kwargs.get("x_subscribe_list", [])
+        
+        node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0)
+        link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or
+                             len(x_bindings_list) > 0 or len(x_subscribe_list) > 0)
+        assert not (node_flag and link_flag)
+        
+        opt_str_list = []
+        if create_policy != None:
+            opt_str_list.append("create: %s" % create_policy)
+        if delete_policy != None:
+            opt_str_list.append("delete: %s" % delete_policy)
+        if assert_policy != None:
+            opt_str_list.append("assert: %s" % assert_policy)
+        if mode != None:
+            opt_str_list.append("mode: %s" % mode)
+        if node_flag or link_flag:
+            node_str_list = []
+            if link_name != None:
+                node_str_list.append("name: \"%s\"" % link_name)
+            if node_type != None:
+                node_str_list.append("type: %s" % node_type)
+            if durable:
+                node_str_list.append("durable: True")
+            if link_reliability != None:
+                node_str_list.append("reliability: %s" % link_reliability)
+            if len(x_declare_list) > 0:
+                node_str_list.append("x-declare: %s" % self._fmt_map(x_declare_list))
+            if len(x_bindings_list) > 0:
+                node_str_list.append("x-bindings: %s" % self._fmt_list(x_bindings_list))
+            if len(x_subscribe_list) > 0:
+                node_str_list.append("x-subscribe: %s" % self._fmt_map(x_subscribe_list))
+            if node_flag:
+                opt_str_list.append("node: %s" % self._fmt_map(node_str_list))
+            else:
+                opt_str_list.append("link: %s" % self._fmt_map(node_str_list))
+        addr_str = node_name
+        if node_subject != None:
+            addr_str += "/%s" % node_subject
+        if len(opt_str_list) > 0:
+            addr_str += "; %s" % self._fmt_map(opt_str_list)
+        return addr_str
+    
+    def snd_addr(self, node_name, **kwargs):
+        """ Create a send (node) address"""
+        # Get keyword args
+        topic = kwargs.get("topic")
+        topic_flag = kwargs.get("topic_flag", False)
+        auto_create = kwargs.get("auto_create", True)
+        auto_delete = kwargs.get("auto_delete", False)
+        durable = kwargs.get("durable", False)
+        exclusive = kwargs.get("exclusive", False)
+        ftd_count = kwargs.get("ftd_count")
+        ftd_size = kwargs.get("ftd_size")
+        policy = kwargs.get("policy", "flow-to-disk")
+        exchage_type = kwargs.get("exchage_type")
+        
+        create_policy = None
+        if auto_create:
+            create_policy = "always"
+        delete_policy = None
+        if auto_delete:
+            delete_policy = "always"
+        node_type = None
+        if topic != None or topic_flag:
+            node_type = "topic"
+        x_declare_list = ["\"exclusive\": %s" % exclusive]
+        if ftd_count != None or ftd_size != None:
+            queue_policy = ["\'qpid.policy_type\': %s" % policy]
+            if ftd_count:
+                queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+            if ftd_size:
+                queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+            x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
+        if exchage_type != None:
+            x_declare_list.append("type: %s" % exchage_type)
+            
+        return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy,
+                             node_type=node_type, durable=durable, x_declare_list=x_declare_list)
+    
+    def rcv_addr(self, node_name, **kwargs):
+        """ Create a receive (link) address"""
+        # Get keyword args
+        auto_create = kwargs.get("auto_create", True)
+        auto_delete = kwargs.get("auto_delete", False)
+        link_name = kwargs.get("link_name")
+        durable = kwargs.get("durable", False)
+        browse = kwargs.get("browse", False)
+        exclusive = kwargs.get("exclusive", False)
+        binding_list = kwargs.get("binding_list", [])
+        ftd_count = kwargs.get("ftd_count")
+        ftd_size = kwargs.get("ftd_size")
+        policy = kwargs.get("policy", "flow-to-disk")
+        
+        create_policy = None
+        if auto_create:
+            create_policy = "always"
+        delete_policy = None
+        if auto_delete:
+            delete_policy = "always"
+        mode = None
+        if browse:
+            mode = "browse" 
+        x_declare_list = ["\"exclusive\": %s" % exclusive]
+        if ftd_count != None or ftd_size != None:
+            queue_policy = ["\'qpid.policy_type\': %s" % policy]
+            if ftd_count:
+                queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+            if ftd_size:
+                queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+            x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
+        x_bindings_list = []
+        for binding in binding_list:
+            x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+        return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
+                             link_name=link_name, durable=durable, x_declare_list=x_declare_list,
+                             x_bindings_list=x_bindings_list)
+    
+    def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
+        """Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
+        return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse)
+        
+    def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False,
+                       emtpy_flag=False):
+        """Check that messages is on a queue by dequeuing them and comparing them to the expected messages"""
+        if emtpy_flag:
+            num_msgs = 0
+        else:
+            num_msgs = len(exp_msg_list)
+        ssn = broker.connect().session(transactional=transactional)
+        rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs)
+        if num_msgs > 0:
+            try:
+                recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)]
+            except Empty:
+                self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i))
+            for i in range(0, len(recieved_msg_list)):
+                self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content)
+                self.assertEqual(recieved_msg_list[i].correlation_id, exp_msg_list[i].correlation_id)
+        if empty:
+            self._chk_empty(queue, rcvr)
+        if ack:
+            ssn.acknowledge()
+            if transactional:
+                ssn.commit()
+            ssn.connection.close()
+        else:
+            if transactional:
+                ssn.commit()
+            return ssn
+    
+    # Functions for finding strings in the broker log file (or other files)
+
+    @staticmethod
+    def _read_file(file_name):
+        """Returns the content of file named file_name as a string"""
+        file_handle = file(file_name)
+        try:
+            return file_handle.read()
+        finally:
+            file_handle.close()
+    
+    def _get_hits(self, broker, search):
+        """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple
+        queues)"""
+        # TODO: Use sets when RHEL-4 is no longer supported
+        hits = []
+        for hit in search.findall(self._read_file(broker.log)):
+            if hit not in hits:
+                hits.append(hit) 
+        return hits
+    
+    def _reconsile_hits(self, broker, ftd_msgs, release_hits):
+        """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining
+        release_hits."""
+        for msg in ftd_msgs:
+            found = False
+            for hit in release_hits:
+                if str(msg.id) in hit:
+                    release_hits.remove(hit)
+                    #print "Found %s in %s" % (msg.id, broker.log)
+                    found = True
+                    break
+            if not found:
+                self.assert_(False, "Unable to locate released message %s in log %s" % (msg.id, broker.log))
+        if len(release_hits) > 0:
+            err = "Messages were unexpectedly released in log %s:\n" % broker.log
+            for hit in release_hits:
+                err += "  %s\n" % hit
+            self.assert_(False, err)
+        
+    def check_msg_release(self, broker, ftd_msgs):
+        """ Check for 'Content released' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+        
+    def check_msg_release_on_commit(self, broker, ftd_msgs):
+        """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released on commit$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+        
+    def check_msg_release_on_recover(self, broker, ftd_msgs):
+        """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released after recovery$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+    
+    def check_msg_block(self, broker, ftd_msgs):
+        """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content release blocked$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+     
+    def check_msg_block_on_commit(self, broker, ftd_msgs):
+        """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content release blocked on commit$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+       
+        

Modified: store/trunk/cpp/tests/run_long_python_tests
===================================================================
--- store/trunk/cpp/tests/run_long_python_tests	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/run_long_python_tests	2010-04-13 17:30:22 UTC (rev 3905)
@@ -21,4 +21,4 @@
 #
 # The GNU Lesser General Public License is available in the file COPYING.
 
-./run_old_python_tests LONG_TEST
+./run_new_python_tests LONG_TEST

Modified: store/trunk/cpp/tests/run_new_python_tests
===================================================================
--- store/trunk/cpp/tests/run_new_python_tests	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/run_new_python_tests	2010-04-13 17:30:22 UTC (rev 3905)
@@ -38,8 +38,27 @@
 
 echo "Running Python tests..."
 
-PYTHON_TESTS=${PYTHON_TESTS:-$*}
+case $1 in
+    SHORT_TEST)
+        DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2";;
+    LONG_TEST)
+        DEFAULT_PYTHON_TESTS=;;
+    *)
+        DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueueDurableMsg*.test_mixed_limit_1";;
+esac
 
+#if test -z $1; then
+#    DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueueDurableMsg*.test_mixed_limit_1"
+#else
+#    if test x$1 == xSHORT_TEST; then
+#        DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2"
+#    else
+#        DEFAULT_PYTHON_TESTS=$*
+#    fi
+#fi
+
+PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}}
+
 OUTDIR=new_python_tests.tmp
 rm -rf $OUTDIR
 

Deleted: store/trunk/cpp/tests/run_old_python_tests
===================================================================
--- store/trunk/cpp/tests/run_old_python_tests	2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/run_old_python_tests	2010-04-13 17:30:22 UTC (rev 3905)
@@ -1,96 +0,0 @@
-#!/bin/bash
-#
-# Copyright (c) 2008, 2009 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
-
-if test x$1 == x"LONG_TEST"; then
-	echo "Running long tests..."
-	LONG_TEST=1
-fi
-
-if test -z ${QPID_DIR} ; then
-    cat <<EOF
-
-	===========  WARNING: PYTHON TESTS DISABLED ==============
-	
-	QPID_DIR not set.
-
-	===========================================================
-
-EOF
-	exit
-fi
-
-QPID_PYTHON_DIR=${QPID_DIR}/python
-export PYTHONPATH=${QPID_PYTHON_DIR}:${abs_srcdir}
-
-if python -c "import qpid" ; then
-	PYTHON_TESTS=
-	FAILING_PYTHON_TESTS=${abs_srcdir}/failing_python_tests.txt
-else
-    cat <<EOF
-
-	===========  WARNING: PYTHON TESTS DISABLED ==============
-	
-	Unable to load python qpid module - skipping python tests.
-	
-    QPID_DIR=${QPID_DIR}"
-    PYTHONPATH=${PYTHONPATH}"
-
-	===========================================================
-
-EOF
-    exit
-fi
-
-STORE_DIR=${TMP_DATA_DIR}/python
-
-#Make sure temp dir exists if this is the first to use it
-if test -d ${STORE_DIR} ; then
-    rm -rf ${STORE_DIR}
-fi
-mkdir -p ${STORE_DIR}
-
-if test -z ${QPIDD} ; then
-	export QPIDD=${QPID_BLD}/src/qpidd
-fi
-
-trap stop_broker INT TERM QUIT
-
-start_broker() {
-    ${QPIDD} --daemon --port 0 --no-module-dir --load-module=${STORE_LIB} --data-dir=${STORE_DIR} --auth=no --log-enable info+ --log-to-file ${STORE_DIR}/broker.python-test.log > qpidd.port
-    LOCAL_PORT=`cat qpidd.port`
-    echo "run_old_python_tests: Started qpidd on port ${LOCAL_PORT}"
-}
-
-stop_broker() {
-       echo "run_old_python_tests: Stopping broker on port ${LOCAL_PORT}"
-    ${QPIDD} -q --port ${LOCAL_PORT}
-}
-
-fail=0
-
-# Run all python tests
-start_broker
-$QPID_PYTHON_DIR/qpid-python-test -m old_python_tests -b localhost:$LOCAL_PORT -I ${FAILING_PYTHON_TESTS} ${PYTHON_TESTS} || { echo "FAIL: old_python_tests"; fail=1; }
-stop_broker || fail=1
-
-exit ${fail}

Added: store/trunk/cpp/tests/run_short_python_tests
===================================================================
--- store/trunk/cpp/tests/run_short_python_tests	                        (rev 0)
+++ store/trunk/cpp/tests/run_short_python_tests	2010-04-13 17:30:22 UTC (rev 3905)
@@ -0,0 +1,24 @@
+#!/bin/bash
+#
+# Copyright (c) 2008, 2009 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+./run_new_python_tests SHORT_TEST


Property changes on: store/trunk/cpp/tests/run_short_python_tests
___________________________________________________________________
Name: svn:executable
   + *



More information about the rhmessaging-commits mailing list