Author: kpvdr
Date: 2010-04-13 13:30:22 -0400 (Tue, 13 Apr 2010)
New Revision: 3905
Added:
store/trunk/cpp/tests/new_python_tests/flow_to_disk.py
store/trunk/cpp/tests/new_python_tests/store_test.py
store/trunk/cpp/tests/run_short_python_tests
Removed:
store/trunk/cpp/tests/old_python_tests/
store/trunk/cpp/tests/run_old_python_tests
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/jrnl/jtt/Makefile.am
store/trunk/cpp/tests/new_python_tests/__init__.py
store/trunk/cpp/tests/new_python_tests/client_persistence.py
store/trunk/cpp/tests/run_long_python_tests
store/trunk/cpp/tests/run_new_python_tests
Log:
Fix for QPID-2470 - Broker does not honour flow-to-disk policy on recovery. Added new
flow-to-disk tests which detect this condition. Also reorganized tests into short, regular
and long tests.
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-04-13 17:30:22 UTC (rev 3905)
@@ -1007,8 +1007,7 @@
} // switch
} // while
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
- ": recoverMessages() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
": recoverMessages() failed: " + e.what());
}
}
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/Makefile.am 2010-04-13 17:30:22 UTC (rev 3905)
@@ -27,7 +27,7 @@
INCLUDES=-I$(top_srcdir)/lib -I$(top_srcdir)/lib/gen
TMP_DATA_DIR=$(abs_srcdir)/tmp_data_dir
-
+
if DO_CLUSTER_TESTS
SUBDIRS = jrnl . cluster
else
@@ -39,7 +39,6 @@
OrderingTest \
TransactionalTest \
TwoPhaseCommitTest \
- run_old_python_tests \
run_new_python_tests \
system_test.sh \
clean.sh
@@ -51,7 +50,7 @@
SHORT_TESTS = \
SimpleTest \
TransactionalTest \
- system_test.sh \
+ run_short_python_tests \
clean.sh
check_PROGRAMS = \
@@ -79,11 +78,10 @@
clean.sh \
failing_python_tests.txt \
new_python_tests \
- old_python_tests \
persistence.py \
run_long_python_tests \
- run_old_python_tests \
run_new_python_tests \
+ run_short_python_tests \
run_test \
start_broker \
stop_broker \
Modified: store/trunk/cpp/tests/jrnl/jtt/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/Makefile.am 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/jrnl/jtt/Makefile.am 2010-04-13 17:30:22 UTC (rev 3905)
@@ -40,8 +40,9 @@
.valgrind.supp: $(top_srcdir)/tests/.valgrind.supp
cp $^ .
-TESTS = \
+LONG_TESTS = \
_ut_data_src \
+ _ut_long_data_src \
_ut_jrnl_init_params \
_ut_read_arg \
_ut_test_case \
@@ -50,9 +51,6 @@
_ut_test_case_set \
_ut_jrnl_instance
-LONG_TESTS = \
- _ut_long_data_src
-
check_PROGRAMS = jtt \
_ut_data_src \
_ut_long_data_src \
Modified: store/trunk/cpp/tests/new_python_tests/__init__.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/__init__.py 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/new_python_tests/__init__.py 2010-04-13 17:30:22 UTC (rev 3905)
@@ -22,3 +22,4 @@
# The GNU Lesser General Public License is available in the file COPYING.
from client_persistence import *
+from flow_to_disk import *
Modified: store/trunk/cpp/tests/new_python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/client_persistence.py 2010-04-12 21:41:54 UTC
(rev 3904)
+++ store/trunk/cpp/tests/new_python_tests/client_persistence.py 2010-04-13 17:30:22 UTC
(rev 3905)
@@ -1,223 +1,169 @@
-# Copyright (c) 2008 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
+"""
+Copyright (c) 2008 Red Hat, Inc.
-from qpid.brokertest import *
-from qpid.messaging import Empty, Message
-from qmf.console import Session
-
-def storeArgs():
- assert BrokerTest.store_lib
- return ["--load-module", BrokerTest.store_lib]
+This file is part of the Qpid async store library msgstore.so.
-class Qmf:
- """
- QMF functions not yet available in the new QMF API. Remove this and replace with new
API when it becomes available.
- """
- def __init__(self, broker):
- self.__session = Session()
- self.__broker =
self.__session.addBroker("amqp://localhost:%d"%broker.port())
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
- def addExchange(self, exchangeName, exchangeType, altExchangeName=None,
passive=False, durable=False, arguments = {}):
- """Add a new exchange"""
- amqpSession = self.__broker.getAmqpSession()
- if altExchangeName:
- amqpSession.exchange_declare(exchange=exchangeName, type=exchangeType,
alternate_exchange=altExchangeName, passive=passive, durable=durable,
arguments=arguments)
- else:
- amqpSession.exchange_declare(exchange=exchangeName, type=exchangeType,
passive=passive, durable=durable, arguments=arguments)
-
- def addQueue(self, queueName, altExchangeName=None, passive=False, durable=False,
arguments = {}):
- """Add a new queue"""
- amqpSession = self.__broker.getAmqpSession()
- if altExchangeName:
- amqpSession = amqpSession.queue_declare(queueName,
alternate_exchange=altExchangeName, passive=passive, durable=durable,
arguments=arguments)
- else:
- amqpSession = amqpSession.queue_declare(queueName, passive=passive,
durable=durable, arguments=arguments)
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
- def __query(self, name, _class, package, altExchangeName=None):
- try:
- objList = self.__session.getObjects(_class=_class, _package=package)
- found = False
- for o in objList:
- if o.name == name:
- found = True
- if altExchangeName != None:
- altExchList = self.__session.getObjects(_objectId=o.altExchange)
- if len(altExchList) == 0 or altExchList[0].name !=
altExchangeName: return False
- break
- return found
- except: return False
-
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
- def queryExchange(self, exchangeName, altExchangeName=None):
- """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known value."""
- return self.__query(exchangeName, "exchange",
"org.apache.qpid.broker", altExchangeName)
-
- def queryQueue(self, queueName, altExchangeName=None):
- """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known value."""
- return self.__query(queueName, "queue",
"org.apache.qpid.broker", altExchangeName)
-
- def queueMsgCount(self, queueName):
- queueList = self.__session.getObjects(_class="queue", _name=queueName)
- if len(queueList):
- return queueList[0].msgDepth
-
- def queueEmpty(self, queueName):
- return self.queueMsgCount(queueName) == 0
+The GNU Lesser General Public License is available in the file COPYING.
+"""
-
-class StoreTest(BrokerTest):
- """
- This subclass of BrokerTest adds some convenience test/check functions
- """
+from qpid.brokertest import EXPECT_EXIT_OK
+from store_test import StoreTest, Qmf, store_args
+from qpid.messaging import Message
- def __chkEmpty(self, queue, receiver):
- try:
- msg = receiver.fetch(timeout=0)
- self.assert_(False, "Queue \"%s\" not empty: found message:
%s" % (queue, msg))
- except Empty: pass
-
- def chkMsg(self, broker, queue, msgChk, empty=False, ack=True):
- return self.chkMsgs(broker, queue, [msgChk], empty, ack)
-
- def chkMsgs(self, broker, queue, msgChkList, empty=False, ack=True):
- s = broker.connect().session()
- rcvr = s.receiver(queue + "; {create:always}",
capacity=len(msgChkList))
- try: rmList = [rcvr.fetch(timeout=0) for i in range(len(msgChkList))]
- except Empty: self.assert_(False, "Queue \"%s\" is empty, unable
to retrieve expected message %d." % (queue, i))
- for i in range(0, len(rmList)):
- self.assertEqual(rmList[i].content, msgChkList[i].content)
- self.assertEqual(rmList[i].correlation_id, msgChkList[i].correlation_id)
- if empty: self.__chkEmpty(queue, rcvr)
- if ack:
- s.acknowledge()
- s.connection.close()
- else:
- return s
-
class ExchangeQueueTests(StoreTest):
"""
Simple tests of the broker exchange and queue types
"""
- def testDirectExchange(self):
+ def test_direct_exchange(self):
"""Test Direct exchange."""
- broker = self.broker(storeArgs(), name="testDirectExchange",
expect=EXPECT_EXIT_OK)
- m1 = Message("A_Message1", durable=True,
correlation_id="Msg0001")
- m2 = Message("B_Message1", durable=True,
correlation_id="Msg0002")
- broker.send_message("a", m1)
- broker.send_message("b", m2)
+ broker = self.broker(store_args(), name="testDirectExchange",
expect=EXPECT_EXIT_OK)
+ msg1 = Message("A_Message1", durable=True,
correlation_id="Msg0001")
+ msg2 = Message("B_Message1", durable=True,
correlation_id="Msg0002")
+ broker.send_message("a", msg1)
+ broker.send_message("b", msg2)
broker.terminate()
- broker = self.broker(storeArgs(), name="testDirectExchange")
- self.chkMsg(broker, "a", m1, True)
- self.chkMsg(broker, "b", m2, True)
+ broker = self.broker(store_args(), name="testDirectExchange")
+ self.check_message(broker, "a", msg1, True)
+ self.check_message(broker, "b", msg2, True)
- def testTopicExchange(self):
+ def test_topic_exchange(self):
"""Test Topic exchange."""
- broker = self.broker(storeArgs(), name="testTopicExchange",
expect=EXPECT_EXIT_OK)
- s = broker.connect().session()
- snd1 = s.sender("abc/key1; {create:always, node-properties:{durable:True,
type:topic}}")
- snd2 = s.sender("abc/key2; {create:always, node-properties:{durable:True,
type:topic}}")
- s.receiver("a; {create:always, node-properties:{durable:True,
x-properties:{bindings:['abc/key1']}}}")
- s.receiver("b; {create:always, node-properties:{durable:True,
x-properties:{bindings:['abc/key1']}}}")
- s.receiver("c; {create:always, node-properties:{durable:True,
x-properties:{bindings:['abc/key1']}}}")
- m1 = Message("Message1", durable=True,
correlation_id="Msg0003")
- snd1.send(m1)
- m2 = Message("Message2", durable=True,
correlation_id="Msg0004")
- snd2.send(m2)
- s.connection.close()
+ broker = self.broker(store_args(), name="testTopicExchange",
expect=EXPECT_EXIT_OK)
+ ssn = broker.connect().session()
+ snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic,
durable:True}}")
+ snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic,
durable:True}}")
+ ssn.receiver("a; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key1}]}}")
+ ssn.receiver("b; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key1}]}}")
+ ssn.receiver("c; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key1}, "
+ "{exchange:abc, key: key2}]}}")
+ ssn.receiver("d; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key2}]}}")
+ ssn.receiver("e; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key2}]}}")
+ msg1 = Message("Message1", durable=True,
correlation_id="Msg0003")
+ snd1.send(msg1)
+ msg2 = Message("Message2", durable=True,
correlation_id="Msg0004")
+ snd2.send(msg2)
broker.terminate()
- broker = self.broker(storeArgs(), name="testTopicExchange")
- self.chkMsg(broker, "a", m1, True)
- self.chkMsg(broker, "b", m1, True)
- self.chkMsg(broker, "c", m1, True)
+ broker = self.broker(store_args(), name="testTopicExchange")
+ self.check_message(broker, "a", msg1, True)
+ self.check_message(broker, "b", msg1, True)
+ self.check_messages(broker, "c", [msg1, msg2], True)
+ self.check_message(broker, "d", msg2, True)
+ self.check_message(broker, "e", msg2, True)
- def testLVQ(self):
+ def test_lvq(self):
"""Test LVQ."""
- broker = self.broker(storeArgs(), name="testLVQ",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="testLVQ",
expect=EXPECT_EXIT_OK)
ma1 = Message("A1", durable=True, correlation_id="Msg0005",
properties={"qpid.LVQ_key":"A"})
ma2 = Message("A2", durable=True, correlation_id="Msg0006",
properties={"qpid.LVQ_key":"A"})
mb1 = Message("B1", durable=True, correlation_id="Msg0007",
properties={"qpid.LVQ_key":"B"})
mb2 = Message("B2", durable=True, correlation_id="Msg0008",
properties={"qpid.LVQ_key":"B"})
mb3 = Message("B3", durable=True, correlation_id="Msg0009",
properties={"qpid.LVQ_key":"B"})
mc1 = Message("C1", durable=True, correlation_id="Msg0010",
properties={"qpid.LVQ_key":"C"})
- broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
xprops="\"qpid.last_value_queue\":True")
+ broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
+
xprops="arguments:{\"qpid.last_value_queue\":True}")
broker.terminate()
- broker = self.broker(storeArgs(), name="testLVQ",
expect=EXPECT_EXIT_OK)
- s = self.chkMsgs(broker, "lvq-test", [ma2, mb3, mc1], empty=True,
ack=False)
+ broker = self.broker(store_args(), name="testLVQ",
expect=EXPECT_EXIT_OK)
+ ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1],
empty=True, ack=False)
# Add more messages while subscriber is active (no replacement):
ma3 = Message("A3", durable=True, correlation_id="Msg0011",
properties={"qpid.LVQ_key":"A"})
ma4 = Message("A4", durable=True, correlation_id="Msg0012",
properties={"qpid.LVQ_key":"A"})
mc2 = Message("C2", durable=True, correlation_id="Msg0013",
properties={"qpid.LVQ_key":"C"})
mc3 = Message("C3", durable=True, correlation_id="Msg0014",
properties={"qpid.LVQ_key":"C"})
mc4 = Message("C4", durable=True, correlation_id="Msg0015",
properties={"qpid.LVQ_key":"C"})
- broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4],
xprops="\"qpid.last_value_queue\":True", session=s)
- s.acknowledge()
- s.connection.close()
+ broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4],
session=ssn)
+ ssn.acknowledge()
broker.terminate()
- broker = self.broker(storeArgs(), name="testLVQ")
- self.chkMsgs(broker, "lvq-test", [mc4, ma4], True)
+ broker = self.broker(store_args(), name="testLVQ")
+ self.check_messages(broker, "lvq-test", [mc4, ma4], True)
+
+ def test_fanout_exchange(self):
+ """Test Fanout Exchange"""
+ broker = self.broker(store_args(), name="testFanout",
expect=EXPECT_EXIT_OK)
+ ssn = broker.connect().session()
+ snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic,
x-declare: {type: fanout}}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable:
True}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable:
True}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable:
True}}")
+ msg1 = Message("Msg1", durable=True,
correlation_id="Msg0001")
+ snd.send(msg1)
+ msg2 = Message("Msg2", durable=True,
correlation_id="Msg0002")
+ snd.send(msg2)
+ broker.terminate()
+ broker = self.broker(store_args(), name="testFanout")
+ self.check_messages(broker, "q1", [msg1, msg2], True)
+ self.check_messages(broker, "q2", [msg1, msg2], True)
+ self.check_messages(broker, "q3", [msg1, msg2], True)
+
class AlternateExchagePropertyTests(StoreTest):
"""
Test the persistence of the Alternate Exchange property for exchanges and queues.
"""
- def testExchange(self):
+ def test_exchange(self):
"""Exchange alternate exchange property persistence
test"""
- broker = self.broker(storeArgs(), name="testExchangeBroker",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="testExchangeBroker",
expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
- qmf.addExchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
- qmf.addExchange("testExch", "direct", durable=True,
altExchangeName="altExch")
+ qmf.add_exchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
+ qmf.add_exchange("testExch", "direct", durable=True,
alt_exchange_name="altExch")
broker.terminate()
- broker = self.broker(storeArgs(), name="testExchangeBroker")
+ broker = self.broker(store_args(), name="testExchangeBroker")
qmf = Qmf(broker)
- try: qmf.addExchange("altExch", "direct", passive=True)
- except Exception, e: self.fail("Alternate exchange (\"altExch\")
instance not recovered: %s" % e)
- try: qmf.addExchange("testExch", "direct", passive=True)
- except Exception, e: self.fail("Test exchange (\"testExch\")
instance not recovered: %s" % e)
- self.assertTrue(qmf.queryExchange("testExch", altExchangeName =
"altExch"), "Alternate exchange property not found or is incorrect on
exchange \"testExch\".")
+ try:
+ qmf.add_exchange("altExch", "direct", passive=True)
+ except Exception, error:
+ self.fail("Alternate exchange (\"altExch\") instance not
recovered: %s" % error)
+ try:
+ qmf.add_exchange("testExch", "direct", passive=True)
+ except Exception, error:
+ self.fail("Test exchange (\"testExch\") instance not
recovered: %s" % error)
+ self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name =
"altExch"),
+ "Alternate exchange property not found or is incorrect on
exchange \"testExch\".")
- def testQueue(self):
+ def test_queue(self):
"""Queue alternate exchange property persistexchangeNamece
test"""
- broker = self.broker(storeArgs(), name="testQueueBroker",
expect=EXPECT_EXIT_OK)
+ broker = self.broker(store_args(), name="testQueueBroker",
expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
- qmf.addExchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
- qmf.addQueue("testQueue", durable=True,
altExchangeName="altExch")
+ qmf.add_exchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
+ qmf.add_queue("testQueue", durable=True,
alt_exchange_name="altExch")
broker.terminate()
- broker = self.broker(storeArgs(), name="testQueueBroker")
+ broker = self.broker(store_args(), name="testQueueBroker")
qmf = Qmf(broker)
- try: qmf.addExchange("altExch", "direct", passive=True)
- except Exception, e: self.fail("Alternate exchange (\"altExch\")
instance not recovered: %s" % e)
- try: qmf.addQueue("testQueue", passive=True)
- except Exception, e: self.fail("Test queue (\"testQueue\")
instance not recovered: %s" % e)
- self.assertTrue(qmf.queryQueue("testQueue", altExchangeName =
"altExch"), "Alternate exchange property not found or is incorrect on queue
\"testQueue\".")
+ try:
+ qmf.add_exchange("altExch", "direct", passive=True)
+ except Exception, error:
+ self.fail("Alternate exchange (\"altExch\") instance not
recovered: %s" % error)
+ try:
+ qmf.add_queue("testQueue", passive=True)
+ except Exception, error:
+ self.fail("Test queue (\"testQueue\") instance not recovered:
%s" % error)
+ self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name =
"altExch"),
+ "Alternate exchange property not found or is incorrect on
queue \"testQueue\".")
class RedeliveredTests(StoreTest):
@@ -225,16 +171,16 @@
Test the behavior of the redelivered flag in the context of persistence
"""
- def testBrokerRecovery(self):
+ def test_broker_recovery(self):
"""Test that the redelivered flag is set on messages after
recovery of broker"""
- broker = self.broker(storeArgs(), name="testAfterRecover",
expect=EXPECT_EXIT_OK)
- mc = "xyz"*100
- m = Message(mc, durable=True)
- broker.send_message("testQueue", m)
+ broker = self.broker(store_args(), name="testAfterRecover",
expect=EXPECT_EXIT_OK)
+ msg_content = "xyz"*100
+ msg = Message(msg_content, durable=True)
+ broker.send_message("testQueue", msg)
broker.terminate()
- broker = self.broker(storeArgs(), name="testAfterRecover")
- rm = broker.get_message("testQueue")
- self.assertEqual(mc, rm.content)
- self.assertTrue(rm.redelivered)
+ broker = self.broker(store_args(), name="testAfterRecover")
+ rcv_msg = broker.get_message("testQueue")
+ self.assertEqual(msg_content, rcv_msg.content)
+ self.assertTrue(rcv_msg.redelivered)
Added: store/trunk/cpp/tests/new_python_tests/flow_to_disk.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/flow_to_disk.py (rev
0)
+++ store/trunk/cpp/tests/new_python_tests/flow_to_disk.py 2010-04-13 17:30:22 UTC (rev
3905)
@@ -0,0 +1,1127 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import qpid
+from qpid.brokertest import EXPECT_EXIT_OK, EXPECT_UNKNOWN
+from qpid.datatypes import uuid4
+from store_test import StoreTest, store_args
+from qpid.messaging import Message, SessionError, SendError
+
+class FlowToDisk(StoreTest):
+ """Tests for async store flow-to-disk"""
+
+ @staticmethod
+ def _broker_name(queue_name, txn_produce, txn_consume):
+ """Create a broker name based on the queue name and the
transaction parameters"""
+ name = queue_name
+ if txn_produce:
+ name += "_TxP"
+ if txn_consume:
+ name += "_TxC"
+ return name
+
+ def _tx_simple_limit(self, queue_name, kwargs):
+ """
+ Test a simple case of message limits which will force flow-to-disk.
+ * queue_args sets a limit - either max_count and/or max_size
+ * messages are added. Some will flow to disk.
+ * Consume all messages sent.
+ * Check the broker has no messages left.
+ """
+ # Unpack args
+ txn_produce = kwargs.get("txn_produce", False)
+ txn_consume = kwargs.get("txn_consume", False)
+ recover = kwargs.get("recover", False)
+ max_count = kwargs.get("max_count")
+ max_size = kwargs.get("max_size")
+ policy = kwargs.get("policy", "flow_to_disk")
+ num_msgs = kwargs.get("num_msgs", 15)
+ msg_size = kwargs.get("msg_size", 10)
+ msg_durable = kwargs.get("msg_durable", False)
+ sync = kwargs.get("sync", False)
+ browse = kwargs.get("browse", False)
+
+ bname = self._broker_name(queue_name, txn_produce, txn_consume)
+ if recover:
+ expect = EXPECT_UNKNOWN
+ else:
+ expect = EXPECT_EXIT_OK
+ broker = self.broker(store_args(), name=bname, expect=expect,
log_level="debug+")
+ prod_session = broker.connect().session(transactional=txn_produce)
+ sender = prod_session.sender(self.snd_addr(queue_name, auto_create=True,
durable=True, ftd_count=max_count,
+ ftd_size=max_size, policy=policy))
+
+ # Send messages
+ msgs = []
+ pre_recover_ftd_msgs = [] # msgs released before a recover
+ post_recover_ftd_msgs = [] # msgs released after a recover
+ cum_msg_size = 0
+ for index in range(0, num_msgs):
+ msg = Message(self.make_message(index, msg_size), durable=msg_durable,
id=uuid4(),
+ correlation_id="msg-%04d"%index)
+ #print "Sending msg %s" % msg.id
+ msgs.append(msg)
+ cum_msg_size += msg_size
+ if (max_count != None and index >= max_count) or (max_size != None and
cum_msg_size > max_size):
+ pre_recover_ftd_msgs.append(msg)
+ sender.send(msg, sync=sync)
+ if not sync:
+ sender.sync()
+ # Close transaction (if needed)
+ if txn_produce:
+ prod_session.commit()
+
+ # Browse messages
+ if browse:
+ self.check_messages(broker, queue_name, msgs, browse=True)
+
+ if recover:
+ broker.terminate()
+ if msg_durable:
+ post_recover_ftd_msgs = pre_recover_ftd_msgs
+ else:
+ del msgs[:] # Transient messages will be discarded on recover
+ old_broker = broker # keep for log analysis
+ broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
+
+ # Browse messages after recover
+ if browse:
+ self.check_messages(broker, queue_name, msgs, browse=True)
+
+ # Consume messages
+ self.check_messages(broker, queue_name, msgs, transactional=txn_consume,
empty=True)
+ broker.terminate()
+
+ # Check broker logs for released messages
+ if recover:
+ if txn_produce:
+ self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_release(old_broker, pre_recover_ftd_msgs)
+ self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)
+ else:
+ if txn_produce:
+ self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_release(broker, pre_recover_ftd_msgs)
+
+ def simple_limit(self, queue_name, **kwargs):
+ """Adapter for adding transactions to test"""
+ # Cycle through the produce/consume block transaction combinations
+ for index in range(0, 4):
+ kwargs["txn_produce"] = index & 1 != 0 # Transactional produce
+ kwargs["txn_consume"] = index & 2 != 0 # Transactional consume
+ self._tx_simple_limit(queue_name, kwargs)
+
+class SimpleMaxCountTest(FlowToDisk):
+ """Flow-to-disk tests based on setting max_count"""
+
+ def test_base(self):
+ """Base test"""
+ self.simple_limit("SimpleMaxCount", max_count=10)
+
+ def test_recover(self):
+ """Recover test"""
+ self.simple_limit("SimpleMaxCountRecover", max_count=10, recover=True)
+
+ def test_durable(self):
+ """Durable message test"""
+ self.simple_limit("SimpleMaxCountDurable", max_count=10,
msg_durable=True)
+
+ def test_durable_recover(self):
+ """Durable message recover test"""
+ self.simple_limit("SimpleMaxCountDurableRecover", max_count=10,
msg_durable=True, recover=True)
+
+ def test_browse(self):
+ """Browse test"""
+ self.simple_limit("SimpleMaxCountBrowse", max_count=10, browse=True)
+
+ def test_browse_recover(self):
+ """Browse before and after recover test"""
+ self.simple_limit("SimpleMaxCountBrowseRecover", max_count=10,
browse=True, recover=True)
+
+ def test_durable_browse(self):
+ """Browse durable message test"""
+ self.simple_limit("SimpleMaxCountDurableBrowse", max_count=10,
msg_durable=True, browse=True)
+
+ def test_durable_browse_recover(self):
+ """Browse durable messages before and after
recover"""
+ self.simple_limit("SimpleMaxCountDurableBrowseRecover", max_count=10,
msg_durable=True, browse=True,
+ recover=True)
+
+ def test_large_msg(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxCountLargeMsg", max_count=10,
max_size=10000000, num_msgs=100, msg_size=10000)
+
+ def test_large_msg_recover(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxCountLargeMsgRecover", max_count=10,
max_size=10000000, num_msgs=100,
+ msg_size=10000, recover=True)
+
+ def test_large_msg_durable(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurable", max_count=10,
max_size=10000000, msg_durable=True,
+ num_msgs=100, msg_size=10000)
+
+ def test_large_msg_durable_recover(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurableRecover", max_count=10,
max_size=10000000, msg_durable=True,
+ num_msgs=100, msg_size=10000, recover=True)
+
+ def test_large_msg_browse(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgBrowse", max_count=10,
max_size=10000000, browse=True, num_msgs=100,
+ msg_size=10000)
+
+ def test_large_msg_browse_recover(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgBrowseRecover", max_count=10,
max_size=10000000, browse=True,
+ num_msgs=100, msg_size=10000, recover=True)
+
+ def test_large_msg_durable_browse(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurableBrowse", max_count=10,
max_size=10000000, msg_durable=True,
+ browse=True, num_msgs=100, msg_size=10000)
+
+ def test_large_msg_durable_browse_recover(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurableBrowseRecover",
max_count=10, max_size=10000000,
+ msg_durable=True, browse=True, num_msgs=100, msg_size=10000,
recover=True)
+
+class SimpleMaxSizeTest(FlowToDisk):
+ """Flow-to-disk tests based on setting max_size"""
+
+ def test_base(self):
+ """Base test"""
+ self.simple_limit("SimpleMaxSize", max_size=100)
+
+ def test_recover(self):
+ """Recover test"""
+ self.simple_limit("SimpleMaxSizeRecover", max_size=100, recover=True)
+
+ def test_durable(self):
+ """Durable message test"""
+ self.simple_limit("SimpleMaxSizeDurable", max_size=100,
msg_durable=True)
+
+ def test_durable_recover(self):
+ """Durable message recover test"""
+ self.simple_limit("SimpleMaxSizeDurable", max_size=100,
msg_durable=True, recover=True)
+
+ def test_browse(self):
+ """Browse test"""
+ self.simple_limit("SimpleMaxSizeBrowse", max_size=100, browse=True)
+
+ def test_browse_recover(self):
+ """Browse before and after recover test"""
+ self.simple_limit("SimpleMaxSizeBrowseRecover", max_size=100,
browse=True, recover=True)
+
+ def test_durable_browse(self):
+ """Browse durable message test"""
+ self.simple_limit("SimpleMaxSizeDurableBrowse", max_size=100,
msg_durable=True, browse=True)
+
+ def test_durable_browse_recover(self):
+ """Browse durable messages before and after
recover"""
+ self.simple_limit("SimpleMaxSizeDurableBrowseRecover", max_size=100,
msg_durable=True, browse=True,
+ recover=True)
+
+ def test_large_msg(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsg", max_size=100000,
num_msgs=100, msg_size=10000)
+
+ def test_large_msg_recover(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgRecover", max_size=100000,
num_msgs=100, msg_size=10000, recover=True)
+
+ def test_large_msg_durable(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurable", max_size=100000,
msg_durable=True, num_msgs=100,
+ msg_size=10000)
+
+ def test_large_msg_durable_recover(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurableRecover",
max_size=100000, msg_durable=True, num_msgs=100,
+ msg_size=10000, recover=True)
+
+ def test_large_msg_browse(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgBrowse", max_size=100,
browse=True, num_msgs=100, msg_size=10000)
+
+ def test_large_msg_browse_recover(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgBrowseRecover", max_size=100,
browse=True, num_msgs=100, msg_size=10000,
+ recover=True)
+
+ def test_large_msg_durable_browse(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowse", max_size=100,
msg_durable=True, browse=True,
+ num_msgs=100, msg_size=10000)
+
+ def test_large_msg_durable_browse_recover(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowseRecover",
max_size=100, msg_durable=True, browse=True,
+ num_msgs=100, msg_size=10000, recover=True)
+
+class SimpleMaxSizeCountTest(FlowToDisk):
+ """Flow-to-disk tests based on setting both max_count and max_size at
the same time"""
+
+ def test_base(self):
+ """Base test"""
+ self.simple_limit("MaxSizeMaxCount", max_count=10, max_size=1000)
+
+ def test_recover(self):
+ """Recover test"""
+ self.simple_limit("MaxSizeMaxCountRecover", max_count=10,
max_size=1000, recover=True)
+
+ def test_durable(self):
+ """Durable message test"""
+ self.simple_limit("MaxSizeMaxCountDurable", max_count=10,
max_size=1000, msg_size=250)
+
+ def test_durable_recover(self):
+ """Durable message recover test"""
+ self.simple_limit("MaxSizeMaxCountDurableRecover", max_count=10,
max_size=1000, msg_size=250, recover=True)
+
+ def test_browse(self):
+ """Browse test"""
+ self.simple_limit("MaxSizeMaxCountBrowse", max_count=10, max_size=1000,
browse=True)
+
+ def test_browse_recover(self):
+ """Browse before and after recover test"""
+ self.simple_limit("MaxSizeMaxCountBrowseRecover", max_count=10,
max_size=1000, browse=True, recover=True)
+
+ def test_durable_browse(self):
+ """Browse durable message test"""
+ self.simple_limit("MaxSizeMaxCountDurableBrowse", max_count=10,
max_size=1000, msg_size=250, browse=True)
+
+ def test_durable_browse_recover(self):
+ """Browse durable messages before and after
recover"""
+ self.simple_limit("MaxSizeMaxCountDurableBrowseRecover", max_count=10,
max_size=1000, msg_size=250, browse=True,
+ recover=True)
+
+#
======================================================================================================================
+
+class MultiQueueFlowToDisk(FlowToDisk):
+ """Tests for async store flow-to-disk involving multiple
queues"""
+
+ def _multi_queue_setup(self, queue_map, broker, exchange_name, txn_produce,
txn_consume, policy, exclusive = False):
+ """Create the send session and receive sessions for multi-queue
scenarios"""
+ connection = broker.connect()
+ snd_session = connection.session(transactional=txn_produce)
+ addr = self.snd_addr(exchange_name, topic_flag=True,
exchage_type="fanout")
+ #print "snd_addr=\"%s\"" % addr
+ sndr = snd_session.sender(addr)
+ for queue_name, queue_properties in queue_map.iteritems():
+ if "durable" in queue_properties.keys():
+ durable = queue_properties["durable"]
+ else:
+ durable = False
+ max_count = None
+ if "max_count" in queue_properties.keys():
+ max_count = queue_properties["max_count"]
+ max_size = None
+ if "max_size" in queue_properties.keys():
+ max_size = queue_properties["max_size"]
+ rcv_session = connection.session(transactional=txn_consume)
+ addr = self.rcv_addr(exchange_name, auto_create=False, link_name=queue_name,
durable=durable,
+ exclusive=exclusive, ftd_count=max_count,
ftd_size=max_size, policy=policy)
+ #print "rcv_addr=\"%s\"" % addr
+ rcv_session.receiver(addr)
+ return snd_session, sndr
+
+ @staticmethod
+ def _make_policy_dict(src, marker, delim=";"):
+ """Create a dictionary of key/value strings from a formatted
string src of the form
+ '... marker key1=val1, key2=val2, ..., keyN=valN delimiter ...'
+ where the portion of interest starts at marker m until the following delimiter d
(default: ';')."""
+ pos = src.find(marker) + len(marker)
+ res = []
+ for index in src[pos:src.find(delim, pos)].split():
+ if "=" in index:
+ res.append(index.strip(",").split("="))
+ if len(res) > 0:
+ return dict(res)
+
+ @staticmethod
+ def _make_policy_val(src, marker, delim=";"):
+ """Return a string value from a formatted string of the form
'... marker val delimiter ...' where the value
+ lies between marker and delimiter d (default: ';')"""
+ pos = src.find(marker) + len(marker)
+ return src[pos:src.find(delim, pos)].strip()
+
+ @staticmethod
+ def _check_error(error_str, fail_list=None):
+ """Check a policy exception string to ensure the failure occurred
on the expected queue and at the expected
+ count."""
+ if error_str.startswith("resource-limit-exceeded"):
+ fail_policy = MultiQueueFlowToDisk._make_policy_val(error_str,
"type=", delim="(")
+ fail_queue_name = MultiQueueFlowToDisk._make_policy_val(error_str,
"Policy exceeded on ", delim=",")
+ fail_count_dict = MultiQueueFlowToDisk._make_policy_dict(error_str,
"count: ")
+ fail_size_dict = MultiQueueFlowToDisk._make_policy_dict(error_str,
"size: ")
+ #print "+++", fail_policy, fail_queue_name, fail_count_dict,
fail_size_dict
+ #print "===", fail_list
+ if fail_list == None:
+ return False # Not expected - no failure should have occurred
+ for fail in fail_list:
+ if fail_queue_name == fail["queue"]:
+ #print "<<<", fail
+ if fail_policy != fail["type"]:
+ return False
+ if (fail_count_dict != None and "count" in fail and \
+ int(fail_count_dict["current"]) !=
fail["count"]) \
+ or \
+ (fail_size_dict != None and "size" in fail and
int(fail_size_dict["current"]) != fail["size"]):
+ return False
+ #print ">>> Failure expected"
+ return True
+ return False
+
+ @staticmethod
+ def _check_send_error(error, fail_list=None):
+ """Check that an error is a SendError which in turn contains a
qpid.ops.ExecutionException."""
+ if not isinstance(error.args[0], qpid.ops.ExecutionException):
+ return False
+ if "description" not in error.args[0].args():
+ return False
+ return
MultiQueueFlowToDisk._check_error(error.args[0].args()["description"],
fail_list)
+
+ @staticmethod
+ def _check_session_error(error, txn=False):
+ """Check that an error is a SessionError which in turn contains a
qpid.ops.ExecutionException."""
+ if not isinstance(error.args[0], qpid.ops.ExecutionException):
+ return False
+ if "description" not in error.args[0].args():
+ return False
+ if txn and
error.args[0].args()["description"].startswith("internal-error: Commit
failed"):
+ #print ">>> Txn commit failure: expected"
+ return True
+ return False
+
+ @staticmethod
+ def _is_queue_durable(queue_map, index):
+ """Return true if the indexed queue is durable (indexed by
queue_map.keys() or queue_map.values())"""
+ return "durable" in queue_map.values()[index] and
queue_map.values()[index]["durable"]
+
+ @staticmethod
+ def _expected_msg_loss(fail_list):
+ """Examine the fail_list for expected failures and return a tuple
containing the expected failure conditions"""
+ count_exp_loss = None
+ count_exp_loss_queues = None
+ size_exp_loss = None
+ size_exp_loss_queues = None
+ if fail_list != None:
+ for fail in fail_list:
+ if "count" in fail:
+ this_count = fail["count"]
+ if count_exp_loss == None:
+ count_exp_loss = this_count
+ count_exp_loss_queues = [fail["queue"]]
+ elif this_count < count_exp_loss:
+ count_exp_loss = this_count
+ count_exp_loss_queues = [fail["queue"]]
+ elif this_count == count_exp_loss:
+ count_exp_loss_queues.append(fail["queue"])
+ if "size" in fail:
+ this_size = fail["size"]
+ if size_exp_loss == None:
+ size_exp_loss = this_size
+ size_exp_loss_queues = [fail["queue"]]
+ elif this_size < size_exp_loss:
+ size_exp_loss = this_size
+ size_exp_loss_queues = [fail["queue"]]
+ elif this_size == size_exp_loss:
+ size_exp_loss_queues.append(fail["queue"])
+ return (count_exp_loss, count_exp_loss_queues, size_exp_loss,
size_exp_loss_queues)
+
+ @staticmethod
+ def _expected_msg_ftd(queue_map):
+ max_count = None
+ max_size = None
+ for queue_props in queue_map.itervalues():
+ if "durable" in queue_props and queue_props["durable"]:
+ if "max_count" in queue_props and
queue_props["max_count"] != None and \
+ (max_count == None or queue_props["max_count"] <
max_count):
+ max_count = queue_props["max_count"]
+ if "max_size" in queue_props and
queue_props["max_size"] != None and \
+ (max_size == None or queue_props["max_size"] <
max_size):
+ max_size = queue_props["max_size"]
+ return (max_count, max_size)
+
+
+ def tx_multi_queue_limit(self, broker_base_name, queue_map, exchange_name,
**kwargs):
+ """ Test a multi-queue case
+ queue_map = queue map where map is queue name (key) against queue args (value)
+ """
+ # Unpack args
+ msg_durable = kwargs.get("msg_durable", False)
+ num_msgs = kwargs.get("num_msgs", 15)
+ msg_size = kwargs.get("msg_size", 10)
+ txn_produce = kwargs.get("txn_produce", False)
+ txn_consume = kwargs.get("txn_consume", False)
+ browse = kwargs.get("browse", False)
+ policy = kwargs.get("policy", "flow_to_disk")
+ recover = kwargs.get("recover", False)
+ sync = kwargs.get("sync", False)
+ fail_list = kwargs.get("fail_list")
+
+ bname = self._broker_name(broker_base_name, txn_produce, txn_consume)
+ broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
+ #print "+++ Started broker %s" % bname
+ snd_session, sndr = self._multi_queue_setup(queue_map, broker, exchange_name,
txn_produce, txn_consume, policy)
+
+ # Find expected limits
+ count_exp_loss, count_exp_loss_queues, size_exp_loss, size_exp_loss_queues =
self._expected_msg_loss(fail_list)
+ max_count, max_size = self._expected_msg_ftd(queue_map)
+
+ # Send messages
+ try:
+ msgs = []
+ pre_recover_ftd_msgs = [] # msgs released before a recover
+ post_recover_ftd_msgs = [] # msgs released after a recover
+ cum_msg_size = 0
+ target_queues = []
+ for index in range(0, num_msgs):
+ msg = Message(self.make_message(index, msg_size), durable=msg_durable,
id=uuid4(),
+ correlation_id="msg-%04d"%index)
+ #print "Sending msg %s" % msg.id
+ sndr.send(msg, sync=sync)
+ if msg_size != None:
+ cum_msg_size += msg_size
+ if count_exp_loss != None and index >= count_exp_loss:
+ target_queues.extend(count_exp_loss_queues)
+ if size_exp_loss != None and cum_msg_size > size_exp_loss:
+ target_queues.extend(size_exp_loss_queues)
+ if (count_exp_loss == None or index < count_exp_loss) and \
+ (size_exp_loss == None or cum_msg_size <= size_exp_loss):
+ msgs.append(msg)
+ if (max_count != None and index >= max_count) or (max_size != None and
cum_msg_size > max_size):
+ pre_recover_ftd_msgs.append(msg)
+ if not sync:
+ sndr.sync()
+ if txn_produce:
+ snd_session.commit()
+ except SendError, error:
+ if not self._check_send_error(error, fail_list):
+ raise
+ except SessionError, error:
+ msgs[:] = [] # Transaction failed, all messages lost
+ if not self._check_session_error(error, txn_produce):
+ raise
+
+ # Browse messages
+ if browse:
+ for index in range(0, len(queue_map)):
+ self.check_messages(broker, queue_map.keys()[index], msgs, browse=True)
+
+ if recover:
+ broker.terminate()
+ #print "--- Terminated broker %s" % bname
+ if msg_durable:
+ post_recover_ftd_msgs = pre_recover_ftd_msgs
+ else:
+ del msgs[:] # Transient messages will be discarded on recover
+ old_broker = broker # keep for log analysis
+ broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
+ #print "+++ Restarted broker %s" % bname
+ # Browse messages
+ if browse:
+ for index in range(0, len(queue_map)):
+ empty = not self._is_queue_durable(queue_map, index)
+ self.check_messages(broker, queue_map.keys()[index], msgs,
browse=True, emtpy_flag=empty)
+
+ # Consume messages
+ for index in range(0, len(queue_map)):
+ empty_chk = txn_produce or queue_map.keys()[index] in target_queues
+ empty = recover and not self._is_queue_durable(queue_map, index)
+ self.check_messages(broker, queue_map.keys()[index], msgs,
transactional=txn_consume, empty=empty_chk,
+ emtpy_flag=empty)
+
+ broker.terminate()
+ #print "--- Stopped broker %s" % bname
+
+ # Check broker logs for released messages
+ if recover:
+ if txn_produce:
+ if msg_durable:
+ self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block_on_commit(old_broker, pre_recover_ftd_msgs)
+ else:
+ if msg_durable:
+ self.check_msg_release(old_broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block(old_broker, pre_recover_ftd_msgs)
+ self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)
+ else:
+ if txn_produce:
+ if msg_durable:
+ self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block_on_commit(broker, pre_recover_ftd_msgs)
+ else:
+ if msg_durable:
+ self.check_msg_release(broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block(broker, pre_recover_ftd_msgs)
+
+# def multi_queue_limit(self, broker_name, queue_map, exchange_name, **kwargs):
+# """Adapter for adding transactions to test"""
+# # Cycle through the produce/consume block transaction combinations
+# for index in range(0, 4):
+# kwargs["txn_produce"] = index & 1 != 0 # Transactional
produce
+# kwargs["txn_consume"] = index & 2 != 0 # Transactional
consume
+# self._tx_multi_queue_limit(broker_name, queue_map, exchange_name, kwargs)
+
+ # --- Parameterized test methods ---
+
+ def no_limit(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """No policy test"""
+ queue_map_1 = {"a%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "b%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None} }
+ self.tx_multi_queue_limit("MultiQueue_NoLimit", queue_map_1,
exchange_name="Fanout_a%02d" % num,
+ msg_durable=msg_durable, browse=browse,
recover=recover, txn_produce=txn_produce,
+ txn_consume=txn_consume)
+
+ def max_count(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Count policy test"""
+ queue_map_2 = {"c%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "d%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"d%02d" % num,
"type":"reject", "count":10}]
+ self.tx_multi_queue_limit("MultiQueue_MaxCount", queue_map_2,
exchange_name="Fanout_b%02d" % num,
+ msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
+ txn_produce=txn_produce, txn_consume=txn_consume)
+
+ def max_size(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Size policy test"""
+ queue_map_3 = {"e%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "f%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"f%02d" % num,
"type":"reject", "size":1000}]
+ self.tx_multi_queue_limit("MultiQueue_MaxSize", queue_map_3,
exchange_name="Fanout_c%02d" % num, msg_size=100,
+ msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
+ txn_produce=txn_produce, txn_consume=txn_consume)
+
+ def dual_max_count(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False,
+ txn_produce=False, txn_consume=False):
+ """Multiple count policy test"""
+ queue_map_4 = {"g%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
+ "h%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": None} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"h%02d" % num,
"type":"reject", "count":8}]
+ self.tx_multi_queue_limit("MultiQueue_DualMaxCount", queue_map_4,
exchange_name="Fanout_d%02d" % num,
+ msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
+ txn_produce=txn_produce, txn_consume=txn_consume)
+
+ def dual_max_size(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Multiple size policy test"""
+ queue_map_5 = {"i%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000},
+ "j%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 800} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"j%02d" % num,
"type":"reject", "size":800}]
+ self.tx_multi_queue_limit("MultiQueue_DualMaxSize", queue_map_5,
exchange_name="Fanout_e%02d" % num,
+ msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
+ fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
+
+ def mixed_limit_1(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Both count and size polices active with the same queue having
equal probabilities of triggering the
+ policy"""
+ queue_map_6 = {"k%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "l%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
+ "m%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000},
+ "n%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": 800} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"n%02d" % num,
"type":"reject", "count":8, "size":800}]
+ self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_6,
exchange_name="Fanout_f%02d" % num,
+ msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
+ fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
+
+ def mixed_limit_2(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Both count and size polices active with different queues having
equal probabilities of triggering the
+ policy"""
+ queue_map_7 = {"o%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "p%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
+ "q%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 800},
+ "r%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": 1000} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"q%02d" % num,
"type":"reject", "size":800},
+ {"queue":"r%02d" % num,
"type":"reject", "count":8,}]
+ self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_7,
exchange_name="Fanout_g%02d" % num,
+ msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
+ fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
+
+ # --- Non-parameterized test methods - these will be run by Python test framework
---
+
+ _num = None
+ _queue_durable = False
+ _msg_durable = False
+ _browse = False
+ _recover = False
+ _txn_produce = False
+ _txn_consume = False
+
+ def test_no_limit(self):
+ """No policy test (non-parameterized)"""
+ self.no_limit(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
+ recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
+
+ def test_max_count(self):
+ """Count policy test (non-parameterized)"""
+ self.max_count(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
+ recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
+
+ def test_max_size(self):
+ """Size policy test (non-parameterized)"""
+ self.max_size(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
+ recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
+
+ def test_dual_max_count(self):
+ """Multiple count policy test
(non-parameterized)"""
+ self.dual_max_count(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+ def test_dual_max_size(self):
+ """Multiple size policy test
(non-parameterized)"""
+ self.dual_max_size(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+ def test_mixed_limit_1(self):
+ """Both count and size polices active with the same queue having
equal probabilities of triggering the
+ policy (non-parameterized)"""
+ self.mixed_limit_1(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+ def test_mixed_limit_2(self):
+ """Both count and size polices active with different queues having
equal probabilities of triggering the
+ policy (non-parameterized)"""
+ self.mixed_limit_2(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+# --- Tests ---
+
+class MultiQueueTest(MultiQueueFlowToDisk):
+ _num = 1
+
+class MultiDurableQueueTest(MultiQueueFlowToDisk):
+ _num = 2
+ _queue_durable = True
+
+class MultiQueueDurableMsgTest(MultiQueueFlowToDisk):
+ _num = 3
+ _msg_durable = True
+
+class MultiDurableQueueDurableMsgTest(MultiQueueFlowToDisk):
+ _num = 4
+ _queue_durable = True
+ _msg_durable = True
+
+class MultiQueueBrowseTest(MultiQueueFlowToDisk):
+ _num = 5
+ _browse = True
+
+class MultiDurableQueueBrowseTest(MultiQueueFlowToDisk):
+ _num = 6
+ _queue_durable = True
+ _browse = True
+
+class MultiQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
+ _num = 7
+ _msg_durable = True
+ _browse = True
+
+class MultiDurableQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
+ _num = 8
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+
+class MultiQueueRecoverTest(MultiQueueFlowToDisk):
+ _num = 9
+ _recover = True
+
+class MultiDurableQueueRecoverTest(MultiQueueFlowToDisk):
+ _num = 10
+ _queue_durable = True
+ _recover = True
+
+class MultiQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
+ _num = 11
+ _msg_durable = True
+ _recover = True
+
+class MultiDurableQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
+ _num = 12
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+
+class MultiQueueBrowseRecoverTest(MultiQueueFlowToDisk):
+ _num = 13
+ _browse = True
+ _recover = True
+
+class MultiDurableQueueBrowseRecoverTest(MultiQueueFlowToDisk):
+ _num = 14
+ _queue_durable = True
+ _browse = True
+ _recover = True
+
+class MultiQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
+ _num = 15
+ _msg_durable = True
+ _browse = True
+ _recover = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
+ _num = 16
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+
+class MultiQueueTxPTest(MultiQueueFlowToDisk):
+ _num = 17
+ _txn_produce = True
+
+class MultiDurableQueueTxPTest(MultiQueueFlowToDisk):
+ _num = 18
+ _queue_durable = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
+ _num = 19
+ _msg_durable = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
+ _num = 20
+ _queue_durable = True
+ _msg_durable = True
+ _txn_produce = True
+
+class MultiQueueBrowseTxPTest(MultiQueueFlowToDisk):
+ _num = 21
+ _browse = True
+ _txn_produce = True
+
+class MultiDurableQueueBrowseTxPTest(MultiQueueFlowToDisk):
+ _num = 22
+ _queue_durable = True
+ _browse = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
+ _num = 23
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
+ _num = 24
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+
+class MultiQueueRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 25
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 26
+ _queue_durable = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 27
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 28
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 29
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 30
+ _queue_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 31
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ _num = 32
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueTxCTest(MultiQueueFlowToDisk):
+ _num = 33
+ _txn_consume = True
+
+class MultiDurableQueueTxCTest(MultiQueueFlowToDisk):
+ _num = 34
+ _queue_durable = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
+ _num = 35
+ _msg_durable = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
+ _num = 36
+ _queue_durable = True
+ _msg_durable = True
+ _txn_consume = True
+
+class MultiQueueBrowseTxCTest(MultiQueueFlowToDisk):
+ _num = 37
+ _browse = True
+ _txn_consume = True
+
+class MultiDurableQueueBrowseTxCTest(MultiQueueFlowToDisk):
+ _num = 38
+ _queue_durable = True
+ _browse = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
+ _num = 39
+ _msg_durable = True
+ _browse = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
+ _num = 40
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _txn_consume = True
+
+class MultiQueueRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 41
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 42
+ _queue_durable = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 43
+ _msg_durable = True
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 44
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 45
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 46
+ _queue_durable = True
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 47
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ _num = 48
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 49
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 50
+ _queue_durable = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 51
+ _msg_durable = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 52
+ _queue_durable = True
+ _msg_durable = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 53
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 54
+ _queue_durable = True
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 55
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 56
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 57
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 58
+ _queue_durable = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 59
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 60
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 61
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 62
+ _queue_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 63
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ _num = 64
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+ # --- Long and randomized tests ---
+
+# def test_12_Randomized(self):
+# """Randomized flow-to-disk tests"""
+# seed = long(1000.0 * time.time())
+# print "seed=0x%x" % seed
+# random.seed(seed)
+# for index in range(0, 10):
+# self.randomLimit(index)
Added: store/trunk/cpp/tests/new_python_tests/store_test.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/store_test.py (rev 0)
+++ store/trunk/cpp/tests/new_python_tests/store_test.py 2010-04-13 17:30:22 UTC (rev
3905)
@@ -0,0 +1,407 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import re
+from qpid.brokertest import BrokerTest
+from qpid.messaging import Empty
+from qmf.console import Session
+
+
+def store_args():
+ """Return the broker args necessary to load the async
store"""
+ assert BrokerTest.store_lib
+ return ["--load-module", BrokerTest.store_lib]
+
+class Qmf:
+ """
+ QMF functions not yet available in the new QMF API. Remove this and replace with new
API when it becomes available.
+ """
+ def __init__(self, broker):
+ self.__session = Session()
+ self.__broker =
self.__session.addBroker("amqp://localhost:%d"%broker.port())
+
+ def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None,
passive=False, durable=False,
+ arguments = None):
+ """Add a new exchange"""
+ amqp_session = self.__broker.getAmqpSession()
+ if arguments == None:
+ arguments = {}
+ if alt_exchange_name:
+ amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
+ alternate_exchange=alt_exchange_name,
passive=passive, durable=durable,
+ arguments=arguments)
+ else:
+ amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
passive=passive, durable=durable,
+ arguments=arguments)
+
+ def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False,
arguments = None):
+ """Add a new queue"""
+ amqp_session = self.__broker.getAmqpSession()
+ if arguments == None:
+ arguments = {}
+ if alt_exchange_name:
+ amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name,
passive=passive,
+ durable=durable, arguments=arguments)
+ else:
+ amqp_session.queue_declare(queue_name, passive=passive, durable=durable,
arguments=arguments)
+
+ def delete_queue(self, queue_name):
+ """Delete an existing queue"""
+ amqp_session = self.__broker.getAmqpSession()
+ amqp_session.queue_delete(queue_name)
+
+ def _query(self, name, _class, package, alt_exchange_name=None):
+ """Qmf query function which can optionally look for the presence
of an alternate exchange name"""
+ try:
+ obj_list = self.__session.getObjects(_class=_class, _package=package)
+ found = False
+ for obj in obj_list:
+ if obj.name == name:
+ found = True
+ if alt_exchange_name != None:
+ alt_exch_list =
self.__session.getObjects(_objectId=obj.altExchange)
+ if len(alt_exch_list) == 0 or alt_exch_list[0].name !=
alt_exchange_name:
+ return False
+ break
+ return found
+ except Exception:
+ return False
+
+
+ def query_exchange(self, exchange_name, alt_exchange_name=None):
+ """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known
+ value."""
+ return self._query(exchange_name, "exchange",
"org.apache.qpid.broker", alt_exchange_name)
+
+ def query_queue(self, queue_name, alt_exchange_name=None):
+ """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known
+ value."""
+ return self._query(queue_name, "queue",
"org.apache.qpid.broker", alt_exchange_name)
+
+ def queue_message_count(self, queue_name):
+ """Query the number of messages on a queue"""
+ queue_list = self.__session.getObjects(_class="queue",
_name=queue_name)
+ if len(queue_list):
+ return queue_list[0].msgDepth
+
+ def queue_empty(self, queue_name):
+ """Check if a queue is empty (has no messages
waiting)"""
+ return self.queue_message_count(queue_name) == 0
+
+
+class StoreTest(BrokerTest):
+ """
+ This subclass of BrokerTest adds some convenience test/check functions
+ """
+
+ def _chk_empty(self, queue, receiver):
+ """Check if a queue is empty (has no more
messages)"""
+ try:
+ msg = receiver.fetch(timeout=0)
+ self.assert_(False, "Queue \"%s\" not empty: found message:
%s" % (queue, msg))
+ except Empty:
+ pass
+
+ @staticmethod
+ def make_message(msg_count, msg_size):
+ """Make message content. Format: 'abcdef....' followed by
'msg-NNNN', where NNNN is the message count"""
+ msg = "msg-%04d" % msg_count
+ msg_len = len(msg)
+ buff = ""
+ if msg_size != None and msg_size > msg_len:
+ for index in range(0, msg_size - msg_len):
+ if index == msg_size - msg_len - 1:
+ buff += "-"
+ else:
+ buff += chr(ord('a') + (index % 26))
+ return buff + msg
+
+ # Functions for formatting address strings
+
+ @staticmethod
+ def _fmt_csv(string_list, list_braces = None):
+ """Format a list using comma-separation. Braces are optionally
added."""
+ if len(string_list) == 0:
+ return ""
+ first = True
+ str_ = ""
+ if list_braces != None:
+ str_ += list_braces[0]
+ for string in string_list:
+ if string != None:
+ if first:
+ first = False
+ else:
+ str_ += ", "
+ str_ += string
+ if list_braces != None:
+ str_ += list_braces[1]
+ return str_
+
+ def _fmt_map(self, string_list):
+ """Format a map {l1, l2, l3, ...} from a string list. Each item in
the list must be a formatted map
+ element('key:val')."""
+ return self._fmt_csv(string_list, list_braces="{}")
+
+ def _fmt_list(self, string_list):
+ """Format a list [l1, l2, l3, ...] from a string
list."""
+ return self._fmt_csv(string_list, list_braces="[]")
+
+ def addr_fmt(self, node_name, **kwargs):
+ """Generic AMQP to new address formatter. Takes common (but not
all) AMQP options and formats an address
+ string."""
+ # Get keyword args
+ node_subject = kwargs.get("node_subject")
+ create_policy = kwargs.get("create_policy")
+ delete_policy = kwargs.get("delete_policy")
+ assert_policy = kwargs.get("assert_policy")
+ mode = kwargs.get("mode")
+ link = kwargs.get("link", False)
+ link_name = kwargs.get("link_name")
+ node_type = kwargs.get("node_type")
+ durable = kwargs.get("durable", False)
+ link_reliability = kwargs.get("link_reliability")
+ x_declare_list = kwargs.get("x_declare_list", [])
+ x_bindings_list = kwargs.get("x_bindings_list", [])
+ x_subscribe_list = kwargs.get("x_subscribe_list", [])
+
+ node_flag = not link and (node_type != None or durable or len(x_declare_list)
> 0 or len(x_bindings_list) > 0)
+ link_flag = link and (link_name != None or durable or link_reliability != None or
len(x_declare_list) > 0 or
+ len(x_bindings_list) > 0 or len(x_subscribe_list) >
0)
+ assert not (node_flag and link_flag)
+
+ opt_str_list = []
+ if create_policy != None:
+ opt_str_list.append("create: %s" % create_policy)
+ if delete_policy != None:
+ opt_str_list.append("delete: %s" % delete_policy)
+ if assert_policy != None:
+ opt_str_list.append("assert: %s" % assert_policy)
+ if mode != None:
+ opt_str_list.append("mode: %s" % mode)
+ if node_flag or link_flag:
+ node_str_list = []
+ if link_name != None:
+ node_str_list.append("name: \"%s\"" % link_name)
+ if node_type != None:
+ node_str_list.append("type: %s" % node_type)
+ if durable:
+ node_str_list.append("durable: True")
+ if link_reliability != None:
+ node_str_list.append("reliability: %s" % link_reliability)
+ if len(x_declare_list) > 0:
+ node_str_list.append("x-declare: %s" %
self._fmt_map(x_declare_list))
+ if len(x_bindings_list) > 0:
+ node_str_list.append("x-bindings: %s" %
self._fmt_list(x_bindings_list))
+ if len(x_subscribe_list) > 0:
+ node_str_list.append("x-subscribe: %s" %
self._fmt_map(x_subscribe_list))
+ if node_flag:
+ opt_str_list.append("node: %s" % self._fmt_map(node_str_list))
+ else:
+ opt_str_list.append("link: %s" % self._fmt_map(node_str_list))
+ addr_str = node_name
+ if node_subject != None:
+ addr_str += "/%s" % node_subject
+ if len(opt_str_list) > 0:
+ addr_str += "; %s" % self._fmt_map(opt_str_list)
+ return addr_str
+
+ def snd_addr(self, node_name, **kwargs):
+ """ Create a send (node) address"""
+ # Get keyword args
+ topic = kwargs.get("topic")
+ topic_flag = kwargs.get("topic_flag", False)
+ auto_create = kwargs.get("auto_create", True)
+ auto_delete = kwargs.get("auto_delete", False)
+ durable = kwargs.get("durable", False)
+ exclusive = kwargs.get("exclusive", False)
+ ftd_count = kwargs.get("ftd_count")
+ ftd_size = kwargs.get("ftd_size")
+ policy = kwargs.get("policy", "flow-to-disk")
+ exchage_type = kwargs.get("exchage_type")
+
+ create_policy = None
+ if auto_create:
+ create_policy = "always"
+ delete_policy = None
+ if auto_delete:
+ delete_policy = "always"
+ node_type = None
+ if topic != None or topic_flag:
+ node_type = "topic"
+ x_declare_list = ["\"exclusive\": %s" % exclusive]
+ if ftd_count != None or ftd_size != None:
+ queue_policy = ["\'qpid.policy_type\': %s" % policy]
+ if ftd_count:
+ queue_policy.append("\'qpid.max_count\': %d" %
ftd_count)
+ if ftd_size:
+ queue_policy.append("\'qpid.max_size\': %d" %
ftd_size)
+ x_declare_list.append("arguments: %s" %
self._fmt_map(queue_policy))
+ if exchage_type != None:
+ x_declare_list.append("type: %s" % exchage_type)
+
+ return self.addr_fmt(node_name, topic=topic, create_policy=create_policy,
delete_policy=delete_policy,
+ node_type=node_type, durable=durable,
x_declare_list=x_declare_list)
+
+ def rcv_addr(self, node_name, **kwargs):
+ """ Create a receive (link) address"""
+ # Get keyword args
+ auto_create = kwargs.get("auto_create", True)
+ auto_delete = kwargs.get("auto_delete", False)
+ link_name = kwargs.get("link_name")
+ durable = kwargs.get("durable", False)
+ browse = kwargs.get("browse", False)
+ exclusive = kwargs.get("exclusive", False)
+ binding_list = kwargs.get("binding_list", [])
+ ftd_count = kwargs.get("ftd_count")
+ ftd_size = kwargs.get("ftd_size")
+ policy = kwargs.get("policy", "flow-to-disk")
+
+ create_policy = None
+ if auto_create:
+ create_policy = "always"
+ delete_policy = None
+ if auto_delete:
+ delete_policy = "always"
+ mode = None
+ if browse:
+ mode = "browse"
+ x_declare_list = ["\"exclusive\": %s" % exclusive]
+ if ftd_count != None or ftd_size != None:
+ queue_policy = ["\'qpid.policy_type\': %s" % policy]
+ if ftd_count:
+ queue_policy.append("\'qpid.max_count\': %d" %
ftd_count)
+ if ftd_size:
+ queue_policy.append("\'qpid.max_size\': %d" %
ftd_size)
+ x_declare_list.append("arguments: %s" %
self._fmt_map(queue_policy))
+ x_bindings_list = []
+ for binding in binding_list:
+ x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+ return self.addr_fmt(node_name, create_policy=create_policy,
delete_policy=delete_policy, mode=mode, link=True,
+ link_name=link_name, durable=durable,
x_declare_list=x_declare_list,
+ x_bindings_list=x_bindings_list)
+
+ def check_message(self, broker, queue, exp_msg, transactional=False, empty=False,
ack=True, browse=False):
+ """Check that a message is on a queue by dequeuing it and
comparing it to the expected message"""
+ return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack,
browse)
+
+ def check_messages(self, broker, queue, exp_msg_list, transactional=False,
empty=False, ack=True, browse=False,
+ emtpy_flag=False):
+ """Check that messages is on a queue by dequeuing them and
comparing them to the expected messages"""
+ if emtpy_flag:
+ num_msgs = 0
+ else:
+ num_msgs = len(exp_msg_list)
+ ssn = broker.connect().session(transactional=transactional)
+ rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs)
+ if num_msgs > 0:
+ try:
+ recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)]
+ except Empty:
+ self.assert_(False, "Queue \"%s\" is empty, unable to
retrieve expected message %d." % (queue, i))
+ for i in range(0, len(recieved_msg_list)):
+ self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content)
+ self.assertEqual(recieved_msg_list[i].correlation_id,
exp_msg_list[i].correlation_id)
+ if empty:
+ self._chk_empty(queue, rcvr)
+ if ack:
+ ssn.acknowledge()
+ if transactional:
+ ssn.commit()
+ ssn.connection.close()
+ else:
+ if transactional:
+ ssn.commit()
+ return ssn
+
+ # Functions for finding strings in the broker log file (or other files)
+
+ @staticmethod
+ def _read_file(file_name):
+ """Returns the content of file named file_name as a
string"""
+ file_handle = file(file_name)
+ try:
+ return file_handle.read()
+ finally:
+ file_handle.close()
+
+ def _get_hits(self, broker, search):
+ """Find all occurrences of the search in the broker log
(eliminating possible duplicates from msgs on multiple
+ queues)"""
+ # TODO: Use sets when RHEL-4 is no longer supported
+ hits = []
+ for hit in search.findall(self._read_file(broker.log)):
+ if hit not in hits:
+ hits.append(hit)
+ return hits
+
+ def _reconsile_hits(self, broker, ftd_msgs, release_hits):
+ """Remove entries from list release_hits if they match the message
id in ftd_msgs. Check for remaining
+ release_hits."""
+ for msg in ftd_msgs:
+ found = False
+ for hit in release_hits:
+ if str(msg.id) in hit:
+ release_hits.remove(hit)
+ #print "Found %s in %s" % (msg.id, broker.log)
+ found = True
+ break
+ if not found:
+ self.assert_(False, "Unable to locate released message %s in log
%s" % (msg.id, broker.log))
+ if len(release_hits) > 0:
+ err = "Messages were unexpectedly released in log %s:\n" %
broker.log
+ for hit in release_hits:
+ err += " %s\n" % hit
+ self.assert_(False, err)
+
+ def check_msg_release(self, broker, ftd_msgs):
+ """ Check for 'Content released' messages in broker log
for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released$",
re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_release_on_commit(self, broker, ftd_msgs):
+ """ Check for 'Content released on commit' messages in
broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released on commit$",
re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_release_on_recover(self, broker, ftd_msgs):
+ """ Check for 'Content released after recovery' messages
in broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released after
recovery$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_block(self, broker, ftd_msgs):
+ """Check for 'Content release blocked' messages in broker
log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content release blocked$",
re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_block_on_commit(self, broker, ftd_msgs):
+ """Check for 'Content release blocked' messages in broker
log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content release blocked on
commit$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+
Modified: store/trunk/cpp/tests/run_long_python_tests
===================================================================
--- store/trunk/cpp/tests/run_long_python_tests 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/run_long_python_tests 2010-04-13 17:30:22 UTC (rev 3905)
@@ -21,4 +21,4 @@
#
# The GNU Lesser General Public License is available in the file COPYING.
-./run_old_python_tests LONG_TEST
+./run_new_python_tests LONG_TEST
Modified: store/trunk/cpp/tests/run_new_python_tests
===================================================================
--- store/trunk/cpp/tests/run_new_python_tests 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/run_new_python_tests 2010-04-13 17:30:22 UTC (rev 3905)
@@ -38,8 +38,27 @@
echo "Running Python tests..."
-PYTHON_TESTS=${PYTHON_TESTS:-$*}
+case $1 in
+ SHORT_TEST)
+
DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover
*.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2";;
+ LONG_TEST)
+ DEFAULT_PYTHON_TESTS=;;
+ *)
+ DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.*
*.flow_to_disk.MultiDurableQueueDurableMsg*.test_mixed_limit_1";;
+esac
+#if test -z $1; then
+# DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.*
*.flow_to_disk.MultiDurableQueueDurableMsg*.test_mixed_limit_1"
+#else
+# if test x$1 == xSHORT_TEST; then
+#
DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover
*.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2"
+# else
+# DEFAULT_PYTHON_TESTS=$*
+# fi
+#fi
+
+PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}}
+
OUTDIR=new_python_tests.tmp
rm -rf $OUTDIR
Deleted: store/trunk/cpp/tests/run_old_python_tests
===================================================================
--- store/trunk/cpp/tests/run_old_python_tests 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/run_old_python_tests 2010-04-13 17:30:22 UTC (rev 3905)
@@ -1,96 +0,0 @@
-#!/bin/bash
-#
-# Copyright (c) 2008, 2009 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
-
-if test x$1 == x"LONG_TEST"; then
- echo "Running long tests..."
- LONG_TEST=1
-fi
-
-if test -z ${QPID_DIR} ; then
- cat <<EOF
-
- =========== WARNING: PYTHON TESTS DISABLED ==============
-
- QPID_DIR not set.
-
- ===========================================================
-
-EOF
- exit
-fi
-
-QPID_PYTHON_DIR=${QPID_DIR}/python
-export PYTHONPATH=${QPID_PYTHON_DIR}:${abs_srcdir}
-
-if python -c "import qpid" ; then
- PYTHON_TESTS=
- FAILING_PYTHON_TESTS=${abs_srcdir}/failing_python_tests.txt
-else
- cat <<EOF
-
- =========== WARNING: PYTHON TESTS DISABLED ==============
-
- Unable to load python qpid module - skipping python tests.
-
- QPID_DIR=${QPID_DIR}"
- PYTHONPATH=${PYTHONPATH}"
-
- ===========================================================
-
-EOF
- exit
-fi
-
-STORE_DIR=${TMP_DATA_DIR}/python
-
-#Make sure temp dir exists if this is the first to use it
-if test -d ${STORE_DIR} ; then
- rm -rf ${STORE_DIR}
-fi
-mkdir -p ${STORE_DIR}
-
-if test -z ${QPIDD} ; then
- export QPIDD=${QPID_BLD}/src/qpidd
-fi
-
-trap stop_broker INT TERM QUIT
-
-start_broker() {
- ${QPIDD} --daemon --port 0 --no-module-dir --load-module=${STORE_LIB}
--data-dir=${STORE_DIR} --auth=no --log-enable info+ --log-to-file
${STORE_DIR}/broker.python-test.log > qpidd.port
- LOCAL_PORT=`cat qpidd.port`
- echo "run_old_python_tests: Started qpidd on port ${LOCAL_PORT}"
-}
-
-stop_broker() {
- echo "run_old_python_tests: Stopping broker on port ${LOCAL_PORT}"
- ${QPIDD} -q --port ${LOCAL_PORT}
-}
-
-fail=0
-
-# Run all python tests
-start_broker
-$QPID_PYTHON_DIR/qpid-python-test -m old_python_tests -b localhost:$LOCAL_PORT -I
${FAILING_PYTHON_TESTS} ${PYTHON_TESTS} || { echo "FAIL: old_python_tests";
fail=1; }
-stop_broker || fail=1
-
-exit ${fail}
Added: store/trunk/cpp/tests/run_short_python_tests
===================================================================
--- store/trunk/cpp/tests/run_short_python_tests (rev 0)
+++ store/trunk/cpp/tests/run_short_python_tests 2010-04-13 17:30:22 UTC (rev 3905)
@@ -0,0 +1,24 @@
+#!/bin/bash
+#
+# Copyright (c) 2008, 2009 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+./run_new_python_tests SHORT_TEST
Property changes on: store/trunk/cpp/tests/run_short_python_tests
___________________________________________________________________
Name: svn:executable
+ *