Author: kpvdr
Date: 2010-04-14 10:41:04 -0400 (Wed, 14 Apr 2010)
New Revision: 3910
Added:
store/trunk/cpp/tests/python_tests/
store/trunk/cpp/tests/python_tests/__init__.py
store/trunk/cpp/tests/python_tests/client_persistence.py
store/trunk/cpp/tests/python_tests/flow_to_disk.py
store/trunk/cpp/tests/python_tests/store_test.py
store/trunk/cpp/tests/run_python_tests
Removed:
store/trunk/cpp/tests/new_python_tests/
store/trunk/cpp/tests/python_tests/__init__.py
store/trunk/cpp/tests/python_tests/client_persistence.py
store/trunk/cpp/tests/run_new_python_tests
Modified:
store/trunk/cpp/configure.ac
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/run_long_python_tests
store/trunk/cpp/tests/run_short_python_tests
Log:
Fixed exception handling errors introduced in long tests by r.933560. Renamed pyton
directories to more appropriate names. Applied patch from J?\195?\161n
S?\195?\161ren?\195?\173k for handling newest version of BDB in configure. Adjusted
content of make-short, make and make-long tests. Renamed package from "rhm" to
"msg-store".
Modified: store/trunk/cpp/configure.ac
===================================================================
--- store/trunk/cpp/configure.ac 2010-04-14 14:22:16 UTC (rev 3909)
+++ store/trunk/cpp/configure.ac 2010-04-14 14:41:04 UTC (rev 3910)
@@ -21,7 +21,7 @@
dnl
dnl Process this file with autoconf to produce a configure script.
-AC_INIT([rhm], [0.6], [rhemrg-users-list(a)redhat.com])
+AC_INIT([msg-store], [0.6], [rhemrg-users-list(a)redhat.com])
AC_CONFIG_AUX_DIR([build-aux])
AM_INIT_AUTOMAKE([dist-bzip2])
@@ -55,7 +55,7 @@
# Warnings: Enable as many as possible, keep the code clean. Please
# do not disable warnings or remove -Werror without discussing on
-# rhm-users list.
+# rhemrg-users-list list.
#
# The following warnings are deliberately omitted, they warn on valid code.
# -Wunreachable-code -Wpadded -Winline
@@ -177,17 +177,17 @@
AC_SUBST([LIB_DLOPEN])
LIBS=$gl_saved_libs
-# Require libdb_cxx (any version between 4.2 and 4.7), for the library, and for
db_cxx.h.
+# Require libdb_cxx (any version between 4.2 and 4.8), for the library, and for
db_cxx.h.
db4_devel_fail=0
AC_CHECK_HEADER([db_cxx.h], ,[db4_devel_fail=1])
test $db4_devel_fail == 1 && \
AC_MSG_ERROR([db4-devel package missing. Please ensure both db4 and db4-devel are
installed. (hint: "yum install db4-devel" should do it...)])
gl_saved_libs=$LIBS
-AC_SEARCH_LIBS([__db_open], [db_cxx-4.7 db_cxx-4.6 db_cxx-4.5 db_cxx-4.4 db_cxx-4.3
db_cxx-4.2],
+AC_SEARCH_LIBS([__db_open], [db_cxx-4.8 db_cxx-4.7 db_cxx-4.6 db_cxx-4.5 db_cxx-4.4
db_cxx-4.3 db_cxx-4.2],
[test "$ac_cv_search___db_open" = "none required" ||
LIB_BERKELEY_DB=$ac_cv_search___db_open],
- AC_MSG_ERROR([Couldn't find required library in range db_cxx-4.2 through
db_cxx-4.6]))
+ AC_MSG_ERROR([Couldn't find required library in range db_cxx-4.2 through
db_cxx-4.8]))
AC_SUBST([LIB_BERKELEY_DB])
LIBS=$gl_saved_libs
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2010-04-14 14:22:16 UTC (rev 3909)
+++ store/trunk/cpp/tests/Makefile.am 2010-04-14 14:41:04 UTC (rev 3910)
@@ -39,12 +39,17 @@
OrderingTest \
TransactionalTest \
TwoPhaseCommitTest \
- run_new_python_tests \
+ run_python_tests \
system_test.sh \
clean.sh
LONG_TESTS = \
+ SimpleTest \
+ OrderingTest \
+ TransactionalTest \
+ TwoPhaseCommitTest \
run_long_python_tests \
+ system_test.sh \
clean.sh
SHORT_TESTS = \
@@ -77,10 +82,10 @@
EXTRA_DIST = \
clean.sh \
failing_python_tests.txt \
- new_python_tests \
+ python_tests \
persistence.py \
run_long_python_tests \
- run_new_python_tests \
+ run_python_tests \
run_short_python_tests \
run_test \
start_broker \
@@ -104,10 +109,10 @@
# Note: Auto-recursion is not supported for custom targets, so add a ${MAKE} -C for each
dir in the SUBDIRS list above.
check-long: all
$(MAKE) -C jrnl check-long
+ $(MAKE) check TESTS="$(LONG_TESTS)" SUBDIRS=.
if DO_CLUSTER_TESTS
$(MAKE) -C cluster check-long
endif
- $(MAKE) check TESTS="$(LONG_TESTS)" SUBDIRS=.
check-short: all
$(MAKE) check TESTS="$(SHORT_TESTS)" SUBDIRS=.
Copied: store/trunk/cpp/tests/python_tests (from rev 3904,
store/trunk/cpp/tests/new_python_tests)
Deleted: store/trunk/cpp/tests/python_tests/__init__.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/__init__.py 2010-04-12 21:41:54 UTC (rev 3904)
+++ store/trunk/cpp/tests/python_tests/__init__.py 2010-04-14 14:41:04 UTC (rev 3910)
@@ -1,24 +0,0 @@
-# Do not delete - marks this directory as a python package.
-
-# Copyright (c) 2008, 2009 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
-
-from client_persistence import *
Copied: store/trunk/cpp/tests/python_tests/__init__.py (from rev 3905,
store/trunk/cpp/tests/new_python_tests/__init__.py)
===================================================================
--- store/trunk/cpp/tests/python_tests/__init__.py (rev 0)
+++ store/trunk/cpp/tests/python_tests/__init__.py 2010-04-14 14:41:04 UTC (rev 3910)
@@ -0,0 +1,25 @@
+# Do not delete - marks this directory as a python package.
+
+# Copyright (c) 2008, 2009 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+from client_persistence import *
+from flow_to_disk import *
Deleted: store/trunk/cpp/tests/python_tests/client_persistence.py
===================================================================
--- store/trunk/cpp/tests/new_python_tests/client_persistence.py 2010-04-12 21:41:54 UTC
(rev 3904)
+++ store/trunk/cpp/tests/python_tests/client_persistence.py 2010-04-14 14:41:04 UTC (rev
3910)
@@ -1,240 +0,0 @@
-# Copyright (c) 2008 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
-
-from qpid.brokertest import *
-from qpid.messaging import Empty, Message
-from qmf.console import Session
-
-def storeArgs():
- assert BrokerTest.store_lib
- return ["--load-module", BrokerTest.store_lib]
-
-class Qmf:
- """
- QMF functions not yet available in the new QMF API. Remove this and replace with new
API when it becomes available.
- """
- def __init__(self, broker):
- self.__session = Session()
- self.__broker =
self.__session.addBroker("amqp://localhost:%d"%broker.port())
-
- def addExchange(self, exchangeName, exchangeType, altExchangeName=None,
passive=False, durable=False, arguments = {}):
- """Add a new exchange"""
- amqpSession = self.__broker.getAmqpSession()
- if altExchangeName:
- amqpSession.exchange_declare(exchange=exchangeName, type=exchangeType,
alternate_exchange=altExchangeName, passive=passive, durable=durable,
arguments=arguments)
- else:
- amqpSession.exchange_declare(exchange=exchangeName, type=exchangeType,
passive=passive, durable=durable, arguments=arguments)
-
- def addQueue(self, queueName, altExchangeName=None, passive=False, durable=False,
arguments = {}):
- """Add a new queue"""
- amqpSession = self.__broker.getAmqpSession()
- if altExchangeName:
- amqpSession = amqpSession.queue_declare(queueName,
alternate_exchange=altExchangeName, passive=passive, durable=durable,
arguments=arguments)
- else:
- amqpSession = amqpSession.queue_declare(queueName, passive=passive,
durable=durable, arguments=arguments)
-
- def __query(self, name, _class, package, altExchangeName=None):
- try:
- objList = self.__session.getObjects(_class=_class, _package=package)
- found = False
- for o in objList:
- if o.name == name:
- found = True
- if altExchangeName != None:
- altExchList = self.__session.getObjects(_objectId=o.altExchange)
- if len(altExchList) == 0 or altExchList[0].name !=
altExchangeName: return False
- break
- return found
- except: return False
-
-
- def queryExchange(self, exchangeName, altExchangeName=None):
- """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known value."""
- return self.__query(exchangeName, "exchange",
"org.apache.qpid.broker", altExchangeName)
-
- def queryQueue(self, queueName, altExchangeName=None):
- """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known value."""
- return self.__query(queueName, "queue",
"org.apache.qpid.broker", altExchangeName)
-
- def queueMsgCount(self, queueName):
- queueList = self.__session.getObjects(_class="queue", _name=queueName)
- if len(queueList):
- return queueList[0].msgDepth
-
- def queueEmpty(self, queueName):
- return self.queueMsgCount(queueName) == 0
-
-
-class StoreTest(BrokerTest):
- """
- This subclass of BrokerTest adds some convenience test/check functions
- """
-
- def __chkEmpty(self, queue, receiver):
- try:
- msg = receiver.fetch(timeout=0)
- self.assert_(False, "Queue \"%s\" not empty: found message:
%s" % (queue, msg))
- except Empty: pass
-
- def chkMsg(self, broker, queue, msgChk, empty=False, ack=True):
- return self.chkMsgs(broker, queue, [msgChk], empty, ack)
-
- def chkMsgs(self, broker, queue, msgChkList, empty=False, ack=True):
- s = broker.connect().session()
- rcvr = s.receiver(queue + "; {create:always}",
capacity=len(msgChkList))
- try: rmList = [rcvr.fetch(timeout=0) for i in range(len(msgChkList))]
- except Empty: self.assert_(False, "Queue \"%s\" is empty, unable
to retrieve expected message %d." % (queue, i))
- for i in range(0, len(rmList)):
- self.assertEqual(rmList[i].content, msgChkList[i].content)
- self.assertEqual(rmList[i].correlation_id, msgChkList[i].correlation_id)
- if empty: self.__chkEmpty(queue, rcvr)
- if ack:
- s.acknowledge()
- s.connection.close()
- else:
- return s
-
-
-class ExchangeQueueTests(StoreTest):
- """
- Simple tests of the broker exchange and queue types
- """
-
- def testDirectExchange(self):
- """Test Direct exchange."""
- broker = self.broker(storeArgs(), name="testDirectExchange",
expect=EXPECT_EXIT_OK)
- m1 = Message("A_Message1", durable=True,
correlation_id="Msg0001")
- m2 = Message("B_Message1", durable=True,
correlation_id="Msg0002")
- broker.send_message("a", m1)
- broker.send_message("b", m2)
- broker.terminate()
-
- broker = self.broker(storeArgs(), name="testDirectExchange")
- self.chkMsg(broker, "a", m1, True)
- self.chkMsg(broker, "b", m2, True)
-
- def testTopicExchange(self):
- """Test Topic exchange."""
- broker = self.broker(storeArgs(), name="testTopicExchange",
expect=EXPECT_EXIT_OK)
- s = broker.connect().session()
- snd1 = s.sender("abc/key1; {create:always, node-properties:{durable:True,
type:topic}}")
- snd2 = s.sender("abc/key2; {create:always, node-properties:{durable:True,
type:topic}}")
- s.receiver("a; {create:always, node-properties:{durable:True,
x-properties:{bindings:['abc/key1']}}}")
- s.receiver("b; {create:always, node-properties:{durable:True,
x-properties:{bindings:['abc/key1']}}}")
- s.receiver("c; {create:always, node-properties:{durable:True,
x-properties:{bindings:['abc/key1']}}}")
- m1 = Message("Message1", durable=True,
correlation_id="Msg0003")
- snd1.send(m1)
- m2 = Message("Message2", durable=True,
correlation_id="Msg0004")
- snd2.send(m2)
- s.connection.close()
- broker.terminate()
-
- broker = self.broker(storeArgs(), name="testTopicExchange")
- self.chkMsg(broker, "a", m1, True)
- self.chkMsg(broker, "b", m1, True)
- self.chkMsg(broker, "c", m1, True)
-
-
- def testLVQ(self):
- """Test LVQ."""
- broker = self.broker(storeArgs(), name="testLVQ",
expect=EXPECT_EXIT_OK)
- ma1 = Message("A1", durable=True, correlation_id="Msg0005",
properties={"qpid.LVQ_key":"A"})
- ma2 = Message("A2", durable=True, correlation_id="Msg0006",
properties={"qpid.LVQ_key":"A"})
- mb1 = Message("B1", durable=True, correlation_id="Msg0007",
properties={"qpid.LVQ_key":"B"})
- mb2 = Message("B2", durable=True, correlation_id="Msg0008",
properties={"qpid.LVQ_key":"B"})
- mb3 = Message("B3", durable=True, correlation_id="Msg0009",
properties={"qpid.LVQ_key":"B"})
- mc1 = Message("C1", durable=True, correlation_id="Msg0010",
properties={"qpid.LVQ_key":"C"})
- broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
xprops="\"qpid.last_value_queue\":True")
- broker.terminate()
-
- broker = self.broker(storeArgs(), name="testLVQ",
expect=EXPECT_EXIT_OK)
- s = self.chkMsgs(broker, "lvq-test", [ma2, mb3, mc1], empty=True,
ack=False)
- # Add more messages while subscriber is active (no replacement):
- ma3 = Message("A3", durable=True, correlation_id="Msg0011",
properties={"qpid.LVQ_key":"A"})
- ma4 = Message("A4", durable=True, correlation_id="Msg0012",
properties={"qpid.LVQ_key":"A"})
- mc2 = Message("C2", durable=True, correlation_id="Msg0013",
properties={"qpid.LVQ_key":"C"})
- mc3 = Message("C3", durable=True, correlation_id="Msg0014",
properties={"qpid.LVQ_key":"C"})
- mc4 = Message("C4", durable=True, correlation_id="Msg0015",
properties={"qpid.LVQ_key":"C"})
- broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4],
xprops="\"qpid.last_value_queue\":True", session=s)
- s.acknowledge()
- s.connection.close()
- broker.terminate()
-
- broker = self.broker(storeArgs(), name="testLVQ")
- self.chkMsgs(broker, "lvq-test", [mc4, ma4], True)
-
-
-class AlternateExchagePropertyTests(StoreTest):
- """
- Test the persistence of the Alternate Exchange property for exchanges and queues.
- """
-
- def testExchange(self):
- """Exchange alternate exchange property persistence
test"""
- broker = self.broker(storeArgs(), name="testExchangeBroker",
expect=EXPECT_EXIT_OK)
- qmf = Qmf(broker)
- qmf.addExchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
- qmf.addExchange("testExch", "direct", durable=True,
altExchangeName="altExch")
- broker.terminate()
-
- broker = self.broker(storeArgs(), name="testExchangeBroker")
- qmf = Qmf(broker)
- try: qmf.addExchange("altExch", "direct", passive=True)
- except Exception, e: self.fail("Alternate exchange (\"altExch\")
instance not recovered: %s" % e)
- try: qmf.addExchange("testExch", "direct", passive=True)
- except Exception, e: self.fail("Test exchange (\"testExch\")
instance not recovered: %s" % e)
- self.assertTrue(qmf.queryExchange("testExch", altExchangeName =
"altExch"), "Alternate exchange property not found or is incorrect on
exchange \"testExch\".")
-
- def testQueue(self):
- """Queue alternate exchange property persistexchangeNamece
test"""
- broker = self.broker(storeArgs(), name="testQueueBroker",
expect=EXPECT_EXIT_OK)
- qmf = Qmf(broker)
- qmf.addExchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
- qmf.addQueue("testQueue", durable=True,
altExchangeName="altExch")
- broker.terminate()
-
- broker = self.broker(storeArgs(), name="testQueueBroker")
- qmf = Qmf(broker)
- try: qmf.addExchange("altExch", "direct", passive=True)
- except Exception, e: self.fail("Alternate exchange (\"altExch\")
instance not recovered: %s" % e)
- try: qmf.addQueue("testQueue", passive=True)
- except Exception, e: self.fail("Test queue (\"testQueue\")
instance not recovered: %s" % e)
- self.assertTrue(qmf.queryQueue("testQueue", altExchangeName =
"altExch"), "Alternate exchange property not found or is incorrect on queue
\"testQueue\".")
-
-
-class RedeliveredTests(StoreTest):
- """
- Test the behavior of the redelivered flag in the context of persistence
- """
-
- def testBrokerRecovery(self):
- """Test that the redelivered flag is set on messages after
recovery of broker"""
- broker = self.broker(storeArgs(), name="testAfterRecover",
expect=EXPECT_EXIT_OK)
- mc = "xyz"*100
- m = Message(mc, durable=True)
- broker.send_message("testQueue", m)
- broker.terminate()
-
- broker = self.broker(storeArgs(), name="testAfterRecover")
- rm = broker.get_message("testQueue")
- self.assertEqual(mc, rm.content)
- self.assertTrue(rm.redelivered)
-
Copied: store/trunk/cpp/tests/python_tests/client_persistence.py (from rev 3905,
store/trunk/cpp/tests/new_python_tests/client_persistence.py)
===================================================================
--- store/trunk/cpp/tests/python_tests/client_persistence.py (rev
0)
+++ store/trunk/cpp/tests/python_tests/client_persistence.py 2010-04-14 14:41:04 UTC (rev
3910)
@@ -0,0 +1,186 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+from qpid.brokertest import EXPECT_EXIT_OK
+from store_test import StoreTest, Qmf, store_args
+from qpid.messaging import Message
+
+
+class ExchangeQueueTests(StoreTest):
+ """
+ Simple tests of the broker exchange and queue types
+ """
+
+ def test_direct_exchange(self):
+ """Test Direct exchange."""
+ broker = self.broker(store_args(), name="testDirectExchange",
expect=EXPECT_EXIT_OK)
+ msg1 = Message("A_Message1", durable=True,
correlation_id="Msg0001")
+ msg2 = Message("B_Message1", durable=True,
correlation_id="Msg0002")
+ broker.send_message("a", msg1)
+ broker.send_message("b", msg2)
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testDirectExchange")
+ self.check_message(broker, "a", msg1, True)
+ self.check_message(broker, "b", msg2, True)
+
+ def test_topic_exchange(self):
+ """Test Topic exchange."""
+ broker = self.broker(store_args(), name="testTopicExchange",
expect=EXPECT_EXIT_OK)
+ ssn = broker.connect().session()
+ snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic,
durable:True}}")
+ snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic,
durable:True}}")
+ ssn.receiver("a; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key1}]}}")
+ ssn.receiver("b; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key1}]}}")
+ ssn.receiver("c; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key1}, "
+ "{exchange:abc, key: key2}]}}")
+ ssn.receiver("d; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key2}]}}")
+ ssn.receiver("e; {create:always, link:{durable:True,
x-bindings:[{exchange:abc, key:key2}]}}")
+ msg1 = Message("Message1", durable=True,
correlation_id="Msg0003")
+ snd1.send(msg1)
+ msg2 = Message("Message2", durable=True,
correlation_id="Msg0004")
+ snd2.send(msg2)
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testTopicExchange")
+ self.check_message(broker, "a", msg1, True)
+ self.check_message(broker, "b", msg1, True)
+ self.check_messages(broker, "c", [msg1, msg2], True)
+ self.check_message(broker, "d", msg2, True)
+ self.check_message(broker, "e", msg2, True)
+
+
+ def test_lvq(self):
+ """Test LVQ."""
+ broker = self.broker(store_args(), name="testLVQ",
expect=EXPECT_EXIT_OK)
+ ma1 = Message("A1", durable=True, correlation_id="Msg0005",
properties={"qpid.LVQ_key":"A"})
+ ma2 = Message("A2", durable=True, correlation_id="Msg0006",
properties={"qpid.LVQ_key":"A"})
+ mb1 = Message("B1", durable=True, correlation_id="Msg0007",
properties={"qpid.LVQ_key":"B"})
+ mb2 = Message("B2", durable=True, correlation_id="Msg0008",
properties={"qpid.LVQ_key":"B"})
+ mb3 = Message("B3", durable=True, correlation_id="Msg0009",
properties={"qpid.LVQ_key":"B"})
+ mc1 = Message("C1", durable=True, correlation_id="Msg0010",
properties={"qpid.LVQ_key":"C"})
+ broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
+
xprops="arguments:{\"qpid.last_value_queue\":True}")
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testLVQ",
expect=EXPECT_EXIT_OK)
+ ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1],
empty=True, ack=False)
+ # Add more messages while subscriber is active (no replacement):
+ ma3 = Message("A3", durable=True, correlation_id="Msg0011",
properties={"qpid.LVQ_key":"A"})
+ ma4 = Message("A4", durable=True, correlation_id="Msg0012",
properties={"qpid.LVQ_key":"A"})
+ mc2 = Message("C2", durable=True, correlation_id="Msg0013",
properties={"qpid.LVQ_key":"C"})
+ mc3 = Message("C3", durable=True, correlation_id="Msg0014",
properties={"qpid.LVQ_key":"C"})
+ mc4 = Message("C4", durable=True, correlation_id="Msg0015",
properties={"qpid.LVQ_key":"C"})
+ broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4],
session=ssn)
+ ssn.acknowledge()
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testLVQ")
+ self.check_messages(broker, "lvq-test", [mc4, ma4], True)
+
+ def test_fanout_exchange(self):
+ """Test Fanout Exchange"""
+ broker = self.broker(store_args(), name="testFanout",
expect=EXPECT_EXIT_OK)
+ ssn = broker.connect().session()
+ snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic,
x-declare: {type: fanout}}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable:
True}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable:
True}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable:
True}}")
+ msg1 = Message("Msg1", durable=True,
correlation_id="Msg0001")
+ snd.send(msg1)
+ msg2 = Message("Msg2", durable=True,
correlation_id="Msg0002")
+ snd.send(msg2)
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testFanout")
+ self.check_messages(broker, "q1", [msg1, msg2], True)
+ self.check_messages(broker, "q2", [msg1, msg2], True)
+ self.check_messages(broker, "q3", [msg1, msg2], True)
+
+
+class AlternateExchagePropertyTests(StoreTest):
+ """
+ Test the persistence of the Alternate Exchange property for exchanges and queues.
+ """
+
+ def test_exchange(self):
+ """Exchange alternate exchange property persistence
test"""
+ broker = self.broker(store_args(), name="testExchangeBroker",
expect=EXPECT_EXIT_OK)
+ qmf = Qmf(broker)
+ qmf.add_exchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
+ qmf.add_exchange("testExch", "direct", durable=True,
alt_exchange_name="altExch")
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testExchangeBroker")
+ qmf = Qmf(broker)
+ try:
+ qmf.add_exchange("altExch", "direct", passive=True)
+ except Exception, error:
+ self.fail("Alternate exchange (\"altExch\") instance not
recovered: %s" % error)
+ try:
+ qmf.add_exchange("testExch", "direct", passive=True)
+ except Exception, error:
+ self.fail("Test exchange (\"testExch\") instance not
recovered: %s" % error)
+ self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name =
"altExch"),
+ "Alternate exchange property not found or is incorrect on
exchange \"testExch\".")
+
+ def test_queue(self):
+ """Queue alternate exchange property persistexchangeNamece
test"""
+ broker = self.broker(store_args(), name="testQueueBroker",
expect=EXPECT_EXIT_OK)
+ qmf = Qmf(broker)
+ qmf.add_exchange("altExch", "direct", durable=True) # Serves
as alternate exchange instance
+ qmf.add_queue("testQueue", durable=True,
alt_exchange_name="altExch")
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testQueueBroker")
+ qmf = Qmf(broker)
+ try:
+ qmf.add_exchange("altExch", "direct", passive=True)
+ except Exception, error:
+ self.fail("Alternate exchange (\"altExch\") instance not
recovered: %s" % error)
+ try:
+ qmf.add_queue("testQueue", passive=True)
+ except Exception, error:
+ self.fail("Test queue (\"testQueue\") instance not recovered:
%s" % error)
+ self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name =
"altExch"),
+ "Alternate exchange property not found or is incorrect on
queue \"testQueue\".")
+
+
+class RedeliveredTests(StoreTest):
+ """
+ Test the behavior of the redelivered flag in the context of persistence
+ """
+
+ def test_broker_recovery(self):
+ """Test that the redelivered flag is set on messages after
recovery of broker"""
+ broker = self.broker(store_args(), name="testAfterRecover",
expect=EXPECT_EXIT_OK)
+ msg_content = "xyz"*100
+ msg = Message(msg_content, durable=True)
+ broker.send_message("testQueue", msg)
+ broker.terminate()
+
+ broker = self.broker(store_args(), name="testAfterRecover")
+ rcv_msg = broker.get_message("testQueue")
+ self.assertEqual(msg_content, rcv_msg.content)
+ self.assertTrue(rcv_msg.redelivered)
+
Copied: store/trunk/cpp/tests/python_tests/flow_to_disk.py (from rev 3905,
store/trunk/cpp/tests/new_python_tests/flow_to_disk.py)
===================================================================
--- store/trunk/cpp/tests/python_tests/flow_to_disk.py (rev 0)
+++ store/trunk/cpp/tests/python_tests/flow_to_disk.py 2010-04-14 14:41:04 UTC (rev 3910)
@@ -0,0 +1,1208 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import qpid
+from qpid.brokertest import EXPECT_EXIT_OK, EXPECT_UNKNOWN
+from qpid.datatypes import uuid4
+from store_test import StoreTest, store_args
+from qpid.messaging import Message, TargetCapacityExceeded, ServerError #SessionError,
SendError
+
+class FlowToDisk(StoreTest):
+ """Tests for async store flow-to-disk"""
+
+ @staticmethod
+ def _broker_name(queue_name, txn_produce, txn_consume):
+ """Create a broker name based on the queue name and the
transaction parameters"""
+ name = queue_name
+ if txn_produce:
+ name += "_TxP"
+ if txn_consume:
+ name += "_TxC"
+ return name
+
+ def _tx_simple_limit(self, queue_name, kwargs):
+ """
+ Test a simple case of message limits which will force flow-to-disk.
+ * queue_args sets a limit - either max_count and/or max_size
+ * messages are added. Some will flow to disk.
+ * Consume all messages sent.
+ * Check the broker has no messages left.
+ """
+ # Unpack args
+ txn_produce = kwargs.get("txn_produce", False)
+ txn_consume = kwargs.get("txn_consume", False)
+ recover = kwargs.get("recover", False)
+ max_count = kwargs.get("max_count")
+ max_size = kwargs.get("max_size")
+ policy = kwargs.get("policy", "flow_to_disk")
+ num_msgs = kwargs.get("num_msgs", 15)
+ msg_size = kwargs.get("msg_size", 10)
+ msg_durable = kwargs.get("msg_durable", False)
+ sync = kwargs.get("sync", False)
+ browse = kwargs.get("browse", False)
+
+ bname = self._broker_name(queue_name, txn_produce, txn_consume)
+ if recover:
+ expect = EXPECT_UNKNOWN
+ else:
+ expect = EXPECT_EXIT_OK
+ broker = self.broker(store_args(), name=bname, expect=expect,
log_level="debug+")
+ prod_session = broker.connect().session(transactional=txn_produce)
+ sender = prod_session.sender(self.snd_addr(queue_name, auto_create=True,
durable=True, ftd_count=max_count,
+ ftd_size=max_size, policy=policy))
+
+ # Send messages
+ msgs = []
+ pre_recover_ftd_msgs = [] # msgs released before a recover
+ post_recover_ftd_msgs = [] # msgs released after a recover
+ cum_msg_size = 0
+ for index in range(0, num_msgs):
+ msg = Message(self.make_message(index, msg_size), durable=msg_durable,
id=uuid4(),
+ correlation_id="msg-%04d"%index)
+ #print "Sending msg %s" % msg.id
+ msgs.append(msg)
+ cum_msg_size += msg_size
+ if (max_count != None and index >= max_count) or (max_size != None and
cum_msg_size > max_size):
+ pre_recover_ftd_msgs.append(msg)
+ sender.send(msg, sync=sync)
+ if not sync:
+ sender.sync()
+ # Close transaction (if needed)
+ if txn_produce:
+ prod_session.commit()
+
+ # Browse messages
+ if browse:
+ self.check_messages(broker, queue_name, msgs, browse=True)
+
+ if recover:
+ broker.terminate()
+ if msg_durable:
+ post_recover_ftd_msgs = pre_recover_ftd_msgs
+ else:
+ del msgs[:] # Transient messages will be discarded on recover
+ old_broker = broker # keep for log analysis
+ broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
+
+ # Browse messages after recover
+ if browse:
+ self.check_messages(broker, queue_name, msgs, browse=True)
+
+ # Consume messages
+ self.check_messages(broker, queue_name, msgs, transactional=txn_consume,
empty=True)
+ broker.terminate()
+
+ # Check broker logs for released messages
+ if recover:
+ if txn_produce:
+ self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_release(old_broker, pre_recover_ftd_msgs)
+ self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)
+ else:
+ if txn_produce:
+ self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_release(broker, pre_recover_ftd_msgs)
+
+ def simple_limit(self, queue_name, **kwargs):
+ """Adapter for adding transactions to test"""
+ # Cycle through the produce/consume block transaction combinations
+ for index in range(0, 4):
+ kwargs["txn_produce"] = index & 1 != 0 # Transactional produce
+ kwargs["txn_consume"] = index & 2 != 0 # Transactional consume
+ self._tx_simple_limit(queue_name, kwargs)
+
+class SimpleMaxCountTest(FlowToDisk):
+ """Flow-to-disk tests based on setting max_count"""
+
+ def test_base(self):
+ """Base test"""
+ self.simple_limit("SimpleMaxCount", max_count=10)
+
+ def test_recover(self):
+ """Recover test"""
+ self.simple_limit("SimpleMaxCountRecover", max_count=10, recover=True)
+
+ def test_durable(self):
+ """Durable message test"""
+ self.simple_limit("SimpleMaxCountDurable", max_count=10,
msg_durable=True)
+
+ def test_durable_recover(self):
+ """Durable message recover test"""
+ self.simple_limit("SimpleMaxCountDurableRecover", max_count=10,
msg_durable=True, recover=True)
+
+ def test_browse(self):
+ """Browse test"""
+ self.simple_limit("SimpleMaxCountBrowse", max_count=10, browse=True)
+
+ def test_browse_recover(self):
+ """Browse before and after recover test"""
+ self.simple_limit("SimpleMaxCountBrowseRecover", max_count=10,
browse=True, recover=True)
+
+ def test_durable_browse(self):
+ """Browse durable message test"""
+ self.simple_limit("SimpleMaxCountDurableBrowse", max_count=10,
msg_durable=True, browse=True)
+
+ def test_durable_browse_recover(self):
+ """Browse durable messages before and after
recover"""
+ self.simple_limit("SimpleMaxCountDurableBrowseRecover", max_count=10,
msg_durable=True, browse=True,
+ recover=True)
+
+ def test_large_msg(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxCountLargeMsg", max_count=10,
max_size=10000000, num_msgs=100, msg_size=10000)
+
+ def test_large_msg_recover(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxCountLargeMsgRecover", max_count=10,
max_size=10000000, num_msgs=100,
+ msg_size=10000, recover=True)
+
+ def test_large_msg_durable(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurable", max_count=10,
max_size=10000000, msg_durable=True,
+ num_msgs=100, msg_size=10000)
+
+ def test_large_msg_durable_recover(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurableRecover", max_count=10,
max_size=10000000, msg_durable=True,
+ num_msgs=100, msg_size=10000, recover=True)
+
+ def test_large_msg_browse(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgBrowse", max_count=10,
max_size=10000000, browse=True, num_msgs=100,
+ msg_size=10000)
+
+ def test_large_msg_browse_recover(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgBrowseRecover", max_count=10,
max_size=10000000, browse=True,
+ num_msgs=100, msg_size=10000, recover=True)
+
+ def test_large_msg_durable_browse(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurableBrowse", max_count=10,
max_size=10000000, msg_durable=True,
+ browse=True, num_msgs=100, msg_size=10000)
+
+ def test_large_msg_durable_browse_recover(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxCountLargeMsgDurableBrowseRecover",
max_count=10, max_size=10000000,
+ msg_durable=True, browse=True, num_msgs=100, msg_size=10000,
recover=True)
+
+class SimpleMaxSizeTest(FlowToDisk):
+ """Flow-to-disk tests based on setting max_size"""
+
+ def test_base(self):
+ """Base test"""
+ self.simple_limit("SimpleMaxSize", max_size=100)
+
+ def test_recover(self):
+ """Recover test"""
+ self.simple_limit("SimpleMaxSizeRecover", max_size=100, recover=True)
+
+ def test_durable(self):
+ """Durable message test"""
+ self.simple_limit("SimpleMaxSizeDurable", max_size=100,
msg_durable=True)
+
+ def test_durable_recover(self):
+ """Durable message recover test"""
+ self.simple_limit("SimpleMaxSizeDurable", max_size=100,
msg_durable=True, recover=True)
+
+ def test_browse(self):
+ """Browse test"""
+ self.simple_limit("SimpleMaxSizeBrowse", max_size=100, browse=True)
+
+ def test_browse_recover(self):
+ """Browse before and after recover test"""
+ self.simple_limit("SimpleMaxSizeBrowseRecover", max_size=100,
browse=True, recover=True)
+
+ def test_durable_browse(self):
+ """Browse durable message test"""
+ self.simple_limit("SimpleMaxSizeDurableBrowse", max_size=100,
msg_durable=True, browse=True)
+
+ def test_durable_browse_recover(self):
+ """Browse durable messages before and after
recover"""
+ self.simple_limit("SimpleMaxSizeDurableBrowseRecover", max_size=100,
msg_durable=True, browse=True,
+ recover=True)
+
+ def test_large_msg(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsg", max_size=100000,
num_msgs=100, msg_size=10000)
+
+ def test_large_msg_recover(self):
+ """Large message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgRecover", max_size=100000,
num_msgs=100, msg_size=10000, recover=True)
+
+ def test_large_msg_durable(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurable", max_size=100000,
msg_durable=True, num_msgs=100,
+ msg_size=10000)
+
+ def test_large_msg_durable_recover(self):
+ """Large durable message test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurableRecover",
max_size=100000, msg_durable=True, num_msgs=100,
+ msg_size=10000, recover=True)
+
+ def test_large_msg_browse(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgBrowse", max_size=100,
browse=True, num_msgs=100, msg_size=10000)
+
+ def test_large_msg_browse_recover(self):
+ """Large message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgBrowseRecover", max_size=100,
browse=True, num_msgs=100, msg_size=10000,
+ recover=True)
+
+ def test_large_msg_durable_browse(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowse", max_size=100,
msg_durable=True, browse=True,
+ num_msgs=100, msg_size=10000)
+
+ def test_large_msg_durable_browse_recover(self):
+ """Large durable message browse test"""
+ self.simple_limit("SimpleMaxSizeLargeMsgDurableBrowseRecover",
max_size=100, msg_durable=True, browse=True,
+ num_msgs=100, msg_size=10000, recover=True)
+
+class SimpleMaxSizeCountTest(FlowToDisk):
+ """Flow-to-disk tests based on setting both max_count and max_size at
the same time"""
+
+ def test_base(self):
+ """Base test"""
+ self.simple_limit("MaxSizeMaxCount", max_count=10, max_size=1000)
+
+ def test_recover(self):
+ """Recover test"""
+ self.simple_limit("MaxSizeMaxCountRecover", max_count=10,
max_size=1000, recover=True)
+
+ def test_durable(self):
+ """Durable message test"""
+ self.simple_limit("MaxSizeMaxCountDurable", max_count=10,
max_size=1000, msg_size=250)
+
+ def test_durable_recover(self):
+ """Durable message recover test"""
+ self.simple_limit("MaxSizeMaxCountDurableRecover", max_count=10,
max_size=1000, msg_size=250, recover=True)
+
+ def test_browse(self):
+ """Browse test"""
+ self.simple_limit("MaxSizeMaxCountBrowse", max_count=10, max_size=1000,
browse=True)
+
+ def test_browse_recover(self):
+ """Browse before and after recover test"""
+ self.simple_limit("MaxSizeMaxCountBrowseRecover", max_count=10,
max_size=1000, browse=True, recover=True)
+
+ def test_durable_browse(self):
+ """Browse durable message test"""
+ self.simple_limit("MaxSizeMaxCountDurableBrowse", max_count=10,
max_size=1000, msg_size=250, browse=True)
+
+ def test_durable_browse_recover(self):
+ """Browse durable messages before and after
recover"""
+ self.simple_limit("MaxSizeMaxCountDurableBrowseRecover", max_count=10,
max_size=1000, msg_size=250, browse=True,
+ recover=True)
+
+#
======================================================================================================================
+
+class MultiQueueFlowToDisk(FlowToDisk):
+ """Tests for async store flow-to-disk involving multiple
queues"""
+
+ def _multi_queue_setup(self, queue_map, broker, exchange_name, txn_produce,
txn_consume, policy, exclusive = False):
+ """Create the send session and receive sessions for multi-queue
scenarios"""
+ connection = broker.connect()
+ snd_session = connection.session(transactional=txn_produce)
+ addr = self.snd_addr(exchange_name, topic_flag=True,
exchage_type="fanout")
+ #print "snd_addr=\"%s\"" % addr
+ sndr = snd_session.sender(addr)
+ for queue_name, queue_properties in queue_map.iteritems():
+ if "durable" in queue_properties.keys():
+ durable = queue_properties["durable"]
+ else:
+ durable = False
+ max_count = None
+ if "max_count" in queue_properties.keys():
+ max_count = queue_properties["max_count"]
+ max_size = None
+ if "max_size" in queue_properties.keys():
+ max_size = queue_properties["max_size"]
+ rcv_session = connection.session(transactional=txn_consume)
+ addr = self.rcv_addr(exchange_name, auto_create=False, link_name=queue_name,
durable=durable,
+ exclusive=exclusive, ftd_count=max_count,
ftd_size=max_size, policy=policy)
+ #print "rcv_addr=\"%s\"" % addr
+ rcv_session.receiver(addr)
+ return snd_session, sndr
+
+ @staticmethod
+ def _make_policy_dict(src, marker, delim=";"):
+ """Create a dictionary of key/value strings from a formatted
string src of the form
+ '... marker key1=val1, key2=val2, ..., keyN=valN delimiter ...'
+ where the portion of interest starts at marker m until the following delimiter d
(default: ';')."""
+ pos = src.find(marker) + len(marker)
+ res = []
+ for index in src[pos:src.find(delim, pos)].split():
+ if "=" in index:
+ res.append(index.strip(",").split("="))
+ if len(res) > 0:
+ return dict(res)
+
+ @staticmethod
+ def _make_policy_val(src, marker, delim=";"):
+ """Return a string value from a formatted string of the form
'... marker val delimiter ...' where the value
+ lies between marker and delimiter d (default: ';')"""
+ pos = src.find(marker) + len(marker)
+ return src[pos:src.find(delim, pos)].strip()
+
+ @staticmethod
+ def _check_error(error_str, fail_list=None):
+ """Check a policy exception string to ensure the failure occurred
on the expected queue and at the expected
+ count."""
+ if error_str.startswith("resource-limit-exceeded"):
+ fail_policy = MultiQueueFlowToDisk._make_policy_val(error_str,
"type=", delim="(")
+ fail_queue_name = MultiQueueFlowToDisk._make_policy_val(error_str,
"Policy exceeded on ", delim=",")
+ fail_count_dict = MultiQueueFlowToDisk._make_policy_dict(error_str,
"count: ")
+ fail_size_dict = MultiQueueFlowToDisk._make_policy_dict(error_str,
"size: ")
+ if fail_list == None:
+ return False # Not expected - no failure should have occurred
+ for fail in fail_list:
+ if fail_queue_name == fail["queue"]:
+ if fail_policy != fail["type"]:
+ return False
+ if (fail_count_dict != None and "count" in fail and \
+ int(fail_count_dict["current"]) !=
fail["count"]) \
+ or \
+ (fail_size_dict != None and "size" in fail and
int(fail_size_dict["current"]) != fail["size"]):
+ return False
+ return True
+ return False
+
+ @staticmethod
+ def _check_target_capacity_exceeded_error(err, fail_list=None):
+ """Check that an error is a
TargetCapacityExceeded."""
+ if not isinstance(err, TargetCapacityExceeded):
+ return False
+ return MultiQueueFlowToDisk._check_error(str(err), fail_list)
+
+ @staticmethod
+ def _check_server_error(err, txn=False):
+ """Check that an error is a ServerError."""
+ if not isinstance(err, ServerError):
+ return False
+ if txn and str(err).startswith("internal-error: Commit failed"):
+ return True
+ return False
+
+ @staticmethod
+ def _is_queue_durable(queue_map, index):
+ """Return true if the indexed queue is durable (indexed by
queue_map.keys() or queue_map.values())"""
+ return "durable" in queue_map.values()[index] and
queue_map.values()[index]["durable"]
+
+ @staticmethod
+ def _expected_msg_loss(fail_list):
+ """Examine the fail_list for expected failures and return a tuple
containing the expected failure conditions"""
+ count_exp_loss = None
+ count_exp_loss_queues = None
+ size_exp_loss = None
+ size_exp_loss_queues = None
+ if fail_list != None:
+ for fail in fail_list:
+ if "count" in fail:
+ this_count = fail["count"]
+ if count_exp_loss == None:
+ count_exp_loss = this_count
+ count_exp_loss_queues = [fail["queue"]]
+ elif this_count < count_exp_loss:
+ count_exp_loss = this_count
+ count_exp_loss_queues = [fail["queue"]]
+ elif this_count == count_exp_loss:
+ count_exp_loss_queues.append(fail["queue"])
+ if "size" in fail:
+ this_size = fail["size"]
+ if size_exp_loss == None:
+ size_exp_loss = this_size
+ size_exp_loss_queues = [fail["queue"]]
+ elif this_size < size_exp_loss:
+ size_exp_loss = this_size
+ size_exp_loss_queues = [fail["queue"]]
+ elif this_size == size_exp_loss:
+ size_exp_loss_queues.append(fail["queue"])
+ return (count_exp_loss, count_exp_loss_queues, size_exp_loss,
size_exp_loss_queues)
+
+ @staticmethod
+ def _expected_msg_ftd(queue_map):
+ max_count = None
+ max_size = None
+ for queue_props in queue_map.itervalues():
+ if "durable" in queue_props and queue_props["durable"]:
+ if "max_count" in queue_props and
queue_props["max_count"] != None and \
+ (max_count == None or queue_props["max_count"] <
max_count):
+ max_count = queue_props["max_count"]
+ if "max_size" in queue_props and
queue_props["max_size"] != None and \
+ (max_size == None or queue_props["max_size"] <
max_size):
+ max_size = queue_props["max_size"]
+ return (max_count, max_size)
+
+
+ def tx_multi_queue_limit(self, broker_base_name, queue_map, exchange_name,
**kwargs):
+ """ Test a multi-queue case
+ queue_map = queue map where map is queue name (key) against queue args (value)
+ """
+ # Unpack args
+ msg_durable = kwargs.get("msg_durable", False)
+ num_msgs = kwargs.get("num_msgs", 15)
+ msg_size = kwargs.get("msg_size", 10)
+ txn_produce = kwargs.get("txn_produce", False)
+ txn_consume = kwargs.get("txn_consume", False)
+ browse = kwargs.get("browse", False)
+ policy = kwargs.get("policy", "flow_to_disk")
+ recover = kwargs.get("recover", False)
+ sync = kwargs.get("sync", False)
+ fail_list = kwargs.get("fail_list")
+
+ bname = self._broker_name(broker_base_name, txn_produce, txn_consume)
+ broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
+ snd_session, sndr = self._multi_queue_setup(queue_map, broker, exchange_name,
txn_produce, txn_consume, policy)
+
+ # Find expected limits
+ count_exp_loss, count_exp_loss_queues, size_exp_loss, size_exp_loss_queues =
self._expected_msg_loss(fail_list)
+ max_count, max_size = self._expected_msg_ftd(queue_map)
+
+ # Send messages
+ try:
+ msgs = []
+ pre_recover_ftd_msgs = [] # msgs released before a recover
+ post_recover_ftd_msgs = [] # msgs released after a recover
+ cum_msg_size = 0
+ target_queues = []
+ for index in range(0, num_msgs):
+ msg = Message(self.make_message(index, msg_size), durable=msg_durable,
id=uuid4(),
+ correlation_id="msg-%04d"%index)
+ #print "Sending msg %s" % msg.id
+ sndr.send(msg, sync=sync)
+ if msg_size != None:
+ cum_msg_size += msg_size
+ if count_exp_loss != None and index >= count_exp_loss:
+ target_queues.extend(count_exp_loss_queues)
+ if size_exp_loss != None and cum_msg_size > size_exp_loss:
+ target_queues.extend(size_exp_loss_queues)
+ if (count_exp_loss == None or index < count_exp_loss) and \
+ (size_exp_loss == None or cum_msg_size <= size_exp_loss):
+ msgs.append(msg)
+ if (max_count != None and index >= max_count) or (max_size != None and
cum_msg_size > max_size):
+ pre_recover_ftd_msgs.append(msg)
+ if not sync:
+ sndr.sync()
+ if txn_produce:
+ snd_session.commit()
+ except TargetCapacityExceeded, err:
+ if not self._check_target_capacity_exceeded_error(err, fail_list):
+ raise
+ except ServerError, err:
+ msgs[:] = [] # Transaction failed, all messages lost
+ if not self._check_server_error(err, txn_produce):
+ raise
+
+ # Browse messages
+ if browse:
+ for index in range(0, len(queue_map)):
+ self.check_messages(broker, queue_map.keys()[index], msgs, browse=True)
+
+ if recover:
+ broker.terminate()
+ if msg_durable:
+ post_recover_ftd_msgs = pre_recover_ftd_msgs
+ else:
+ del msgs[:] # Transient messages will be discarded on recover
+ old_broker = broker # keep for log analysis
+ broker = self.broker(store_args(), name=bname, expect=EXPECT_EXIT_OK,
log_level="debug+")
+ # Browse messages
+ if browse:
+ for index in range(0, len(queue_map)):
+ empty = not self._is_queue_durable(queue_map, index)
+ self.check_messages(broker, queue_map.keys()[index], msgs,
browse=True, emtpy_flag=empty)
+
+ # Consume messages
+ for index in range(0, len(queue_map)):
+ empty_chk = txn_produce or queue_map.keys()[index] in target_queues
+ empty = recover and not self._is_queue_durable(queue_map, index)
+ self.check_messages(broker, queue_map.keys()[index], msgs,
transactional=txn_consume, empty=empty_chk,
+ emtpy_flag=empty)
+
+ broker.terminate()
+
+ # Check broker logs for released messages
+ if recover:
+ if txn_produce:
+ if msg_durable:
+ self.check_msg_release_on_commit(old_broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block_on_commit(old_broker, pre_recover_ftd_msgs)
+ else:
+ if msg_durable:
+ self.check_msg_release(old_broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block(old_broker, pre_recover_ftd_msgs)
+ self.check_msg_release_on_recover(broker, post_recover_ftd_msgs)
+ else:
+ if txn_produce:
+ if msg_durable:
+ self.check_msg_release_on_commit(broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block_on_commit(broker, pre_recover_ftd_msgs)
+ else:
+ if msg_durable:
+ self.check_msg_release(broker, pre_recover_ftd_msgs)
+ else:
+ self.check_msg_block(broker, pre_recover_ftd_msgs)
+
+ # --- Parameterized test methods ---
+
+ def no_limit(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """No policy test"""
+ queue_map_1 = {"a%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "b%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None} }
+ self.tx_multi_queue_limit("MultiQueue_NoLimit", queue_map_1,
exchange_name="Fanout_a%02d" % num,
+ msg_durable=msg_durable, browse=browse,
recover=recover, txn_produce=txn_produce,
+ txn_consume=txn_consume)
+
+ def max_count(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Count policy test"""
+ queue_map_2 = {"c%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "d%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"d%02d" % num,
"type":"reject", "count":10}]
+ self.tx_multi_queue_limit("MultiQueue_MaxCount", queue_map_2,
exchange_name="Fanout_b%02d" % num,
+ msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
+ txn_produce=txn_produce, txn_consume=txn_consume)
+
+ def max_size(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Size policy test"""
+ queue_map_3 = {"e%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "f%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"f%02d" % num,
"type":"reject", "size":1000}]
+ self.tx_multi_queue_limit("MultiQueue_MaxSize", queue_map_3,
exchange_name="Fanout_c%02d" % num, msg_size=100,
+ msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
+ txn_produce=txn_produce, txn_consume=txn_consume)
+
+ def dual_max_count(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False,
+ txn_produce=False, txn_consume=False):
+ """Multiple count policy test"""
+ queue_map_4 = {"g%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
+ "h%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": None} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"h%02d" % num,
"type":"reject", "count":8}]
+ self.tx_multi_queue_limit("MultiQueue_DualMaxCount", queue_map_4,
exchange_name="Fanout_d%02d" % num,
+ msg_durable=msg_durable, browse=browse,
recover=recover, fail_list=fail_list,
+ txn_produce=txn_produce, txn_consume=txn_consume)
+
+ def dual_max_size(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Multiple size policy test"""
+ queue_map_5 = {"i%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000},
+ "j%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 800} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"j%02d" % num,
"type":"reject", "size":800}]
+ self.tx_multi_queue_limit("MultiQueue_DualMaxSize", queue_map_5,
exchange_name="Fanout_e%02d" % num,
+ msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
+ fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
+
+ def mixed_limit_1(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Both count and size polices active with the same queue having
equal probabilities of triggering the
+ policy"""
+ queue_map_6 = {"k%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "l%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
+ "m%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 1000},
+ "n%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": 800} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"n%02d" % num,
"type":"reject", "count":8, "size":800}]
+ self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_6,
exchange_name="Fanout_f%02d" % num,
+ msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
+ fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
+
+ def mixed_limit_2(self, num, queue_durable=False, msg_durable=False, browse=False,
recover=False, txn_produce=False,
+ txn_consume=False):
+ """Both count and size polices active with different queues having
equal probabilities of triggering the
+ policy"""
+ queue_map_7 = {"o%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": None},
+ "p%02d" % num : {"durable":queue_durable,
"max_count":10, "max_size": None},
+ "q%02d" % num : {"durable":queue_durable,
"max_count":None, "max_size": 800},
+ "r%02d" % num : {"durable":queue_durable,
"max_count":8, "max_size": 1000} }
+ fail_list = None
+ if not queue_durable:
+ fail_list = [{"queue":"q%02d" % num,
"type":"reject", "size":800},
+ {"queue":"r%02d" % num,
"type":"reject", "count":8,}]
+ self.tx_multi_queue_limit("MultiQueue_MixedLimit", queue_map_7,
exchange_name="Fanout_g%02d" % num,
+ msg_size=100, msg_durable=msg_durable, browse=browse,
recover=recover,
+ fail_list=fail_list, txn_produce=txn_produce,
txn_consume=txn_consume)
+
+ # --- Non-parameterized test methods - these will be run by Python test framework
---
+
+ _num = None
+ _queue_durable = False
+ _msg_durable = False
+ _browse = False
+ _recover = False
+ _txn_produce = False
+ _txn_consume = False
+
+ def test_no_limit(self):
+ """No policy test (non-parameterized)"""
+ self.no_limit(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
+ recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
+
+ def test_max_count(self):
+ """Count policy test (non-parameterized)"""
+ self.max_count(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
+ recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
+
+ def test_max_size(self):
+ """Size policy test (non-parameterized)"""
+ self.max_size(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable, browse=self._browse,
+ recover=self._recover, txn_produce=self._txn_produce,
txn_consume=self._txn_consume)
+
+ def test_dual_max_count(self):
+ """Multiple count policy test
(non-parameterized)"""
+ self.dual_max_count(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+ def test_dual_max_size(self):
+ """Multiple size policy test
(non-parameterized)"""
+ self.dual_max_size(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+ def test_mixed_limit_1(self):
+ """Both count and size polices active with the same queue having
equal probabilities of triggering the
+ policy (non-parameterized)"""
+ self.mixed_limit_1(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+ def test_mixed_limit_2(self):
+ """Both count and size polices active with different queues having
equal probabilities of triggering the
+ policy (non-parameterized)"""
+ self.mixed_limit_2(self._num, queue_durable=self._queue_durable,
msg_durable=self._msg_durable,
+ browse=self._browse, recover=self._recover,
txn_produce=self._txn_produce,
+ txn_consume=self._txn_consume)
+
+# --- Tests ---
+
+class MultiQueueTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient
queues"""
+ _num = 1
+
+class MultiDurableQueueTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable
queues"""
+ _num = 2
+ _queue_durable = True
+
+class MultiQueueDurableMsgTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient
queues"""
+ _num = 3
+ _msg_durable = True
+
+class MultiDurableQueueDurableMsgTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues"""
+ _num = 4
+ _queue_durable = True
+ _msg_durable = True
+
+class MultiQueueBrowseTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues with messages
browsed before being consumed"""
+ _num = 5
+ _browse = True
+
+class MultiDurableQueueBrowseTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues with messages
browsed before being consumed"""
+ _num = 6
+ _queue_durable = True
+ _browse = True
+
+class MultiQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues with messages
browsed before being consumed"""
+ _num = 7
+ _msg_durable = True
+ _browse = True
+
+class MultiDurableQueueDurableMsgBrowseTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues with messages
browsed before being consumed"""
+ _num = 8
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+
+class MultiQueueRecoverTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues and broker
terminated/recovered"""
+ _num = 9
+ _recover = True
+
+class MultiDurableQueueRecoverTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues and broker
terminated/recovered"""
+ _num = 10
+ _queue_durable = True
+ _recover = True
+
+class MultiQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues and broker
terminated/recovered"""
+ _num = 11
+ _msg_durable = True
+ _recover = True
+
+class MultiDurableQueueDurableMsgRecoverTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues and broker
terminated/recovered"""
+ _num = 12
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+
+class MultiQueueBrowseRecoverTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
+ are consumed"""
+ _num = 13
+ _browse = True
+ _recover = True
+
+class MultiDurableQueueBrowseRecoverTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and
+ are consumed"""
+ _num = 14
+ _queue_durable = True
+ _browse = True
+ _recover = True
+
+class MultiQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
+ are consumed"""
+ _num = 15
+ _msg_durable = True
+ _browse = True
+ _recover = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and are
+ consumed"""
+ _num = 16
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+
+class MultiQueueTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce"""
+ _num = 17
+ _txn_produce = True
+
+class MultiDurableQueueTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce"""
+ _num = 18
+ _queue_durable = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce"""
+ _num = 19
+ _msg_durable = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce"""
+ _num = 20
+ _queue_durable = True
+ _msg_durable = True
+ _txn_produce = True
+
+class MultiQueueBrowseTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
+ being consumed"""
+ _num = 21
+ _browse = True
+ _txn_produce = True
+
+class MultiDurableQueueBrowseTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
+ being consumed"""
+ _num = 22
+ _queue_durable = True
+ _browse = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
+ being consumed"""
+ _num = 23
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgBrowseTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before being
+ consumed"""
+ _num = 24
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+
+class MultiQueueRecoverTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce and broker
+ terminated/recovered"""
+ _num = 25
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueRecoverTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce and broker terminated/recovered"""
+ _num = 26
+ _queue_durable = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce and broker terminated/recovered"""
+ _num = 27
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgRecoverTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce and broker terminated/recovered"""
+ _num = 28
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
+ broker terminated/recovered and are consumed"""
+ _num = 29
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
+ broker terminated/recovered and are consumed"""
+ _num = 30
+ _queue_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
+ broker terminated/recovered and are consumed"""
+ _num = 31
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxPTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before broker
+ terminated/recovered and are consumed"""
+ _num = 32
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiQueueTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues and consumed
transactionally"""
+ _num = 33
+ _txn_consume = True
+
+class MultiDurableQueueTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues and consumed
transactionally"""
+ _num = 34
+ _queue_durable = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues and consumed
transactionally"""
+ _num = 35
+ _msg_durable = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues and consumed
transactionally"""
+ _num = 36
+ _queue_durable = True
+ _msg_durable = True
+ _txn_consume = True
+
+class MultiQueueBrowseTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues with messages
browsed before being consumed
+ transactionally"""
+ _num = 37
+ _browse = True
+ _txn_consume = True
+
+class MultiDurableQueueBrowseTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues with messages
browsed before being consumed transactionally"""
+ _num = 38
+ _queue_durable = True
+ _browse = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues with messages
browsed before being consumed transactionally"""
+ _num = 39
+ _msg_durable = True
+ _browse = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues with messages
browsed before being consumed transactionally"""
+ _num = 40
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _txn_consume = True
+
+class MultiQueueRecoverTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues and broker
terminated/recovered before being consumed
+ transactionally"""
+ _num = 41
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueRecoverTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues and broker
terminated/recovered before being consumed
+ transactionally"""
+ _num = 42
+ _queue_durable = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues and broker
terminated/recovered before being consumed
+ transactionally"""
+ _num = 43
+ _msg_durable = True
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgRecoverTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues and broker
terminated/recovered before being consumed
+ transactionally"""
+ _num = 44
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
+ are consumed transactionally"""
+ _num = 45
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and
+ are consumed transactionally"""
+ _num = 46
+ _queue_durable = True
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues with messages
browsed before broker terminated/recovered and
+ are consumed transactionally"""
+ _num = 47
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues with messages
browsed before broker terminated/recovered and
+ are consumed transactionally"""
+ _num = 48
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_consume = True
+
+class MultiQueueTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce and are consumed
+ transactionally"""
+ _num = 49
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce and are consumed
+ transactionally"""
+ _num = 50
+ _queue_durable = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce and are consumed
+ transactionally"""
+ _num = 51
+ _msg_durable = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce and are consumed
+ transactionally"""
+ _num = 52
+ _queue_durable = True
+ _msg_durable = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
+ being consumed transactionally"""
+ _num = 53
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
+ being consumed transactionally"""
+ _num = 54
+ _queue_durable = True
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
+ being consumed transactionally"""
+ _num = 55
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before being
+ consumed transactionally"""
+ _num = 56
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce and broker
+ terminated/recovered before they are consumed transactionally"""
+ _num = 57
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce and broker terminated/recovered
+ before they are consumed transactionally"""
+ _num = 58
+ _queue_durable = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce and broker terminated/recovered
+ before they are consumed transactionally"""
+ _num = 59
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce and broker terminated/recovered
+ before they are consumed transactionally"""
+ _num = 60
+ _queue_durable = True
+ _msg_durable = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple transient queues under
transactional produce with messages browsed before
+ broker terminated/recovered and are consumed transactionally"""
+ _num = 61
+ _browse = True
+ _recover = True
+ _txn_produce = True
+
+class MultiDurableQueueBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Transient messages sent to multiple durable queues under
transactional produce with messages browsed before
+ broker terminated/recovered and are consumed transactionally"""
+ _num = 62
+ _queue_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple transient queues under
transactional produce with messages browsed before
+ broker terminated/recovered and are consumed transactionally"""
+ _num = 63
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+class MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest(MultiQueueFlowToDisk):
+ """Durable messages sent to multiple durable queues under
transactional produce with messages browsed before broker
+ terminated/recovered and are consumed transactionally"""
+ _num = 64
+ _queue_durable = True
+ _msg_durable = True
+ _browse = True
+ _recover = True
+ _txn_produce = True
+ _txn_consume = True
+
+ # --- Long and randomized tests ---
+
+# def test_12_Randomized(self):
+# """Randomized flow-to-disk tests"""
+# seed = long(1000.0 * time.time())
+# print "seed=0x%x" % seed
+# random.seed(seed)
+# for index in range(0, 10):
+# self.randomLimit(index)
Copied: store/trunk/cpp/tests/python_tests/store_test.py (from rev 3905,
store/trunk/cpp/tests/new_python_tests/store_test.py)
===================================================================
--- store/trunk/cpp/tests/python_tests/store_test.py (rev 0)
+++ store/trunk/cpp/tests/python_tests/store_test.py 2010-04-14 14:41:04 UTC (rev 3910)
@@ -0,0 +1,407 @@
+"""
+Copyright (c) 2008 Red Hat, Inc.
+
+This file is part of the Qpid async store library msgstore.so.
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
+
+The GNU Lesser General Public License is available in the file COPYING.
+"""
+
+import re
+from qpid.brokertest import BrokerTest
+from qpid.messaging import Empty
+from qmf.console import Session
+
+
+def store_args():
+ """Return the broker args necessary to load the async
store"""
+ assert BrokerTest.store_lib
+ return ["--load-module", BrokerTest.store_lib]
+
+class Qmf:
+ """
+ QMF functions not yet available in the new QMF API. Remove this and replace with new
API when it becomes available.
+ """
+ def __init__(self, broker):
+ self.__session = Session()
+ self.__broker =
self.__session.addBroker("amqp://localhost:%d"%broker.port())
+
+ def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None,
passive=False, durable=False,
+ arguments = None):
+ """Add a new exchange"""
+ amqp_session = self.__broker.getAmqpSession()
+ if arguments == None:
+ arguments = {}
+ if alt_exchange_name:
+ amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
+ alternate_exchange=alt_exchange_name,
passive=passive, durable=durable,
+ arguments=arguments)
+ else:
+ amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
passive=passive, durable=durable,
+ arguments=arguments)
+
+ def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False,
arguments = None):
+ """Add a new queue"""
+ amqp_session = self.__broker.getAmqpSession()
+ if arguments == None:
+ arguments = {}
+ if alt_exchange_name:
+ amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name,
passive=passive,
+ durable=durable, arguments=arguments)
+ else:
+ amqp_session.queue_declare(queue_name, passive=passive, durable=durable,
arguments=arguments)
+
+ def delete_queue(self, queue_name):
+ """Delete an existing queue"""
+ amqp_session = self.__broker.getAmqpSession()
+ amqp_session.queue_delete(queue_name)
+
+ def _query(self, name, _class, package, alt_exchange_name=None):
+ """Qmf query function which can optionally look for the presence
of an alternate exchange name"""
+ try:
+ obj_list = self.__session.getObjects(_class=_class, _package=package)
+ found = False
+ for obj in obj_list:
+ if obj.name == name:
+ found = True
+ if alt_exchange_name != None:
+ alt_exch_list =
self.__session.getObjects(_objectId=obj.altExchange)
+ if len(alt_exch_list) == 0 or alt_exch_list[0].name !=
alt_exchange_name:
+ return False
+ break
+ return found
+ except Exception:
+ return False
+
+
+ def query_exchange(self, exchange_name, alt_exchange_name=None):
+ """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known
+ value."""
+ return self._query(exchange_name, "exchange",
"org.apache.qpid.broker", alt_exchange_name)
+
+ def query_queue(self, queue_name, alt_exchange_name=None):
+ """Test for the presence of an exchange, and optionally whether it
has an alternate exchange set to a known
+ value."""
+ return self._query(queue_name, "queue",
"org.apache.qpid.broker", alt_exchange_name)
+
+ def queue_message_count(self, queue_name):
+ """Query the number of messages on a queue"""
+ queue_list = self.__session.getObjects(_class="queue",
_name=queue_name)
+ if len(queue_list):
+ return queue_list[0].msgDepth
+
+ def queue_empty(self, queue_name):
+ """Check if a queue is empty (has no messages
waiting)"""
+ return self.queue_message_count(queue_name) == 0
+
+
+class StoreTest(BrokerTest):
+ """
+ This subclass of BrokerTest adds some convenience test/check functions
+ """
+
+ def _chk_empty(self, queue, receiver):
+ """Check if a queue is empty (has no more
messages)"""
+ try:
+ msg = receiver.fetch(timeout=0)
+ self.assert_(False, "Queue \"%s\" not empty: found message:
%s" % (queue, msg))
+ except Empty:
+ pass
+
+ @staticmethod
+ def make_message(msg_count, msg_size):
+ """Make message content. Format: 'abcdef....' followed by
'msg-NNNN', where NNNN is the message count"""
+ msg = "msg-%04d" % msg_count
+ msg_len = len(msg)
+ buff = ""
+ if msg_size != None and msg_size > msg_len:
+ for index in range(0, msg_size - msg_len):
+ if index == msg_size - msg_len - 1:
+ buff += "-"
+ else:
+ buff += chr(ord('a') + (index % 26))
+ return buff + msg
+
+ # Functions for formatting address strings
+
+ @staticmethod
+ def _fmt_csv(string_list, list_braces = None):
+ """Format a list using comma-separation. Braces are optionally
added."""
+ if len(string_list) == 0:
+ return ""
+ first = True
+ str_ = ""
+ if list_braces != None:
+ str_ += list_braces[0]
+ for string in string_list:
+ if string != None:
+ if first:
+ first = False
+ else:
+ str_ += ", "
+ str_ += string
+ if list_braces != None:
+ str_ += list_braces[1]
+ return str_
+
+ def _fmt_map(self, string_list):
+ """Format a map {l1, l2, l3, ...} from a string list. Each item in
the list must be a formatted map
+ element('key:val')."""
+ return self._fmt_csv(string_list, list_braces="{}")
+
+ def _fmt_list(self, string_list):
+ """Format a list [l1, l2, l3, ...] from a string
list."""
+ return self._fmt_csv(string_list, list_braces="[]")
+
+ def addr_fmt(self, node_name, **kwargs):
+ """Generic AMQP to new address formatter. Takes common (but not
all) AMQP options and formats an address
+ string."""
+ # Get keyword args
+ node_subject = kwargs.get("node_subject")
+ create_policy = kwargs.get("create_policy")
+ delete_policy = kwargs.get("delete_policy")
+ assert_policy = kwargs.get("assert_policy")
+ mode = kwargs.get("mode")
+ link = kwargs.get("link", False)
+ link_name = kwargs.get("link_name")
+ node_type = kwargs.get("node_type")
+ durable = kwargs.get("durable", False)
+ link_reliability = kwargs.get("link_reliability")
+ x_declare_list = kwargs.get("x_declare_list", [])
+ x_bindings_list = kwargs.get("x_bindings_list", [])
+ x_subscribe_list = kwargs.get("x_subscribe_list", [])
+
+ node_flag = not link and (node_type != None or durable or len(x_declare_list)
> 0 or len(x_bindings_list) > 0)
+ link_flag = link and (link_name != None or durable or link_reliability != None or
len(x_declare_list) > 0 or
+ len(x_bindings_list) > 0 or len(x_subscribe_list) >
0)
+ assert not (node_flag and link_flag)
+
+ opt_str_list = []
+ if create_policy != None:
+ opt_str_list.append("create: %s" % create_policy)
+ if delete_policy != None:
+ opt_str_list.append("delete: %s" % delete_policy)
+ if assert_policy != None:
+ opt_str_list.append("assert: %s" % assert_policy)
+ if mode != None:
+ opt_str_list.append("mode: %s" % mode)
+ if node_flag or link_flag:
+ node_str_list = []
+ if link_name != None:
+ node_str_list.append("name: \"%s\"" % link_name)
+ if node_type != None:
+ node_str_list.append("type: %s" % node_type)
+ if durable:
+ node_str_list.append("durable: True")
+ if link_reliability != None:
+ node_str_list.append("reliability: %s" % link_reliability)
+ if len(x_declare_list) > 0:
+ node_str_list.append("x-declare: %s" %
self._fmt_map(x_declare_list))
+ if len(x_bindings_list) > 0:
+ node_str_list.append("x-bindings: %s" %
self._fmt_list(x_bindings_list))
+ if len(x_subscribe_list) > 0:
+ node_str_list.append("x-subscribe: %s" %
self._fmt_map(x_subscribe_list))
+ if node_flag:
+ opt_str_list.append("node: %s" % self._fmt_map(node_str_list))
+ else:
+ opt_str_list.append("link: %s" % self._fmt_map(node_str_list))
+ addr_str = node_name
+ if node_subject != None:
+ addr_str += "/%s" % node_subject
+ if len(opt_str_list) > 0:
+ addr_str += "; %s" % self._fmt_map(opt_str_list)
+ return addr_str
+
+ def snd_addr(self, node_name, **kwargs):
+ """ Create a send (node) address"""
+ # Get keyword args
+ topic = kwargs.get("topic")
+ topic_flag = kwargs.get("topic_flag", False)
+ auto_create = kwargs.get("auto_create", True)
+ auto_delete = kwargs.get("auto_delete", False)
+ durable = kwargs.get("durable", False)
+ exclusive = kwargs.get("exclusive", False)
+ ftd_count = kwargs.get("ftd_count")
+ ftd_size = kwargs.get("ftd_size")
+ policy = kwargs.get("policy", "flow-to-disk")
+ exchage_type = kwargs.get("exchage_type")
+
+ create_policy = None
+ if auto_create:
+ create_policy = "always"
+ delete_policy = None
+ if auto_delete:
+ delete_policy = "always"
+ node_type = None
+ if topic != None or topic_flag:
+ node_type = "topic"
+ x_declare_list = ["\"exclusive\": %s" % exclusive]
+ if ftd_count != None or ftd_size != None:
+ queue_policy = ["\'qpid.policy_type\': %s" % policy]
+ if ftd_count:
+ queue_policy.append("\'qpid.max_count\': %d" %
ftd_count)
+ if ftd_size:
+ queue_policy.append("\'qpid.max_size\': %d" %
ftd_size)
+ x_declare_list.append("arguments: %s" %
self._fmt_map(queue_policy))
+ if exchage_type != None:
+ x_declare_list.append("type: %s" % exchage_type)
+
+ return self.addr_fmt(node_name, topic=topic, create_policy=create_policy,
delete_policy=delete_policy,
+ node_type=node_type, durable=durable,
x_declare_list=x_declare_list)
+
+ def rcv_addr(self, node_name, **kwargs):
+ """ Create a receive (link) address"""
+ # Get keyword args
+ auto_create = kwargs.get("auto_create", True)
+ auto_delete = kwargs.get("auto_delete", False)
+ link_name = kwargs.get("link_name")
+ durable = kwargs.get("durable", False)
+ browse = kwargs.get("browse", False)
+ exclusive = kwargs.get("exclusive", False)
+ binding_list = kwargs.get("binding_list", [])
+ ftd_count = kwargs.get("ftd_count")
+ ftd_size = kwargs.get("ftd_size")
+ policy = kwargs.get("policy", "flow-to-disk")
+
+ create_policy = None
+ if auto_create:
+ create_policy = "always"
+ delete_policy = None
+ if auto_delete:
+ delete_policy = "always"
+ mode = None
+ if browse:
+ mode = "browse"
+ x_declare_list = ["\"exclusive\": %s" % exclusive]
+ if ftd_count != None or ftd_size != None:
+ queue_policy = ["\'qpid.policy_type\': %s" % policy]
+ if ftd_count:
+ queue_policy.append("\'qpid.max_count\': %d" %
ftd_count)
+ if ftd_size:
+ queue_policy.append("\'qpid.max_size\': %d" %
ftd_size)
+ x_declare_list.append("arguments: %s" %
self._fmt_map(queue_policy))
+ x_bindings_list = []
+ for binding in binding_list:
+ x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+ return self.addr_fmt(node_name, create_policy=create_policy,
delete_policy=delete_policy, mode=mode, link=True,
+ link_name=link_name, durable=durable,
x_declare_list=x_declare_list,
+ x_bindings_list=x_bindings_list)
+
+ def check_message(self, broker, queue, exp_msg, transactional=False, empty=False,
ack=True, browse=False):
+ """Check that a message is on a queue by dequeuing it and
comparing it to the expected message"""
+ return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack,
browse)
+
+ def check_messages(self, broker, queue, exp_msg_list, transactional=False,
empty=False, ack=True, browse=False,
+ emtpy_flag=False):
+ """Check that messages is on a queue by dequeuing them and
comparing them to the expected messages"""
+ if emtpy_flag:
+ num_msgs = 0
+ else:
+ num_msgs = len(exp_msg_list)
+ ssn = broker.connect().session(transactional=transactional)
+ rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs)
+ if num_msgs > 0:
+ try:
+ recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)]
+ except Empty:
+ self.assert_(False, "Queue \"%s\" is empty, unable to
retrieve expected message %d." % (queue, i))
+ for i in range(0, len(recieved_msg_list)):
+ self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content)
+ self.assertEqual(recieved_msg_list[i].correlation_id,
exp_msg_list[i].correlation_id)
+ if empty:
+ self._chk_empty(queue, rcvr)
+ if ack:
+ ssn.acknowledge()
+ if transactional:
+ ssn.commit()
+ ssn.connection.close()
+ else:
+ if transactional:
+ ssn.commit()
+ return ssn
+
+ # Functions for finding strings in the broker log file (or other files)
+
+ @staticmethod
+ def _read_file(file_name):
+ """Returns the content of file named file_name as a
string"""
+ file_handle = file(file_name)
+ try:
+ return file_handle.read()
+ finally:
+ file_handle.close()
+
+ def _get_hits(self, broker, search):
+ """Find all occurrences of the search in the broker log
(eliminating possible duplicates from msgs on multiple
+ queues)"""
+ # TODO: Use sets when RHEL-4 is no longer supported
+ hits = []
+ for hit in search.findall(self._read_file(broker.log)):
+ if hit not in hits:
+ hits.append(hit)
+ return hits
+
+ def _reconsile_hits(self, broker, ftd_msgs, release_hits):
+ """Remove entries from list release_hits if they match the message
id in ftd_msgs. Check for remaining
+ release_hits."""
+ for msg in ftd_msgs:
+ found = False
+ for hit in release_hits:
+ if str(msg.id) in hit:
+ release_hits.remove(hit)
+ #print "Found %s in %s" % (msg.id, broker.log)
+ found = True
+ break
+ if not found:
+ self.assert_(False, "Unable to locate released message %s in log
%s" % (msg.id, broker.log))
+ if len(release_hits) > 0:
+ err = "Messages were unexpectedly released in log %s:\n" %
broker.log
+ for hit in release_hits:
+ err += " %s\n" % hit
+ self.assert_(False, err)
+
+ def check_msg_release(self, broker, ftd_msgs):
+ """ Check for 'Content released' messages in broker log
for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released$",
re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_release_on_commit(self, broker, ftd_msgs):
+ """ Check for 'Content released on commit' messages in
broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released on commit$",
re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_release_on_recover(self, broker, ftd_msgs):
+ """ Check for 'Content released after recovery' messages
in broker log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content released after
recovery$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_block(self, broker, ftd_msgs):
+ """Check for 'Content release blocked' messages in broker
log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content release blocked$",
re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+ def check_msg_block_on_commit(self, broker, ftd_msgs):
+ """Check for 'Content release blocked' messages in broker
log for messages in ftd_msgs"""
+ hits = self._get_hits(broker, re.compile("debug Message
id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+ "Content release blocked on
commit$", re.MULTILINE))
+ self._reconsile_hits(broker, ftd_msgs, hits)
+
+
Modified: store/trunk/cpp/tests/run_long_python_tests
===================================================================
--- store/trunk/cpp/tests/run_long_python_tests 2010-04-14 14:22:16 UTC (rev 3909)
+++ store/trunk/cpp/tests/run_long_python_tests 2010-04-14 14:41:04 UTC (rev 3910)
@@ -21,4 +21,4 @@
#
# The GNU Lesser General Public License is available in the file COPYING.
-./run_new_python_tests LONG_TEST
+./run_python_tests LONG_TEST
Deleted: store/trunk/cpp/tests/run_new_python_tests
===================================================================
--- store/trunk/cpp/tests/run_new_python_tests 2010-04-14 14:22:16 UTC (rev 3909)
+++ store/trunk/cpp/tests/run_new_python_tests 2010-04-14 14:41:04 UTC (rev 3910)
@@ -1,73 +0,0 @@
-#!/bin/bash
-#
-# Copyright (c) 2008, 2009 Red Hat, Inc.
-#
-# This file is part of the Qpid async store library msgstore.so.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-# USA
-#
-# The GNU Lesser General Public License is available in the file COPYING.
-
-if test -z ${QPID_DIR} ; then
- cat <<EOF
-
- =========== WARNING: PYTHON TESTS DISABLED ==============
-
- QPID_DIR not set.
-
- ===========================================================
-
-EOF
- exit
-fi
-
-. `dirname $0`/tests_env.sh
-
-echo "Running Python tests..."
-
-case $1 in
- SHORT_TEST)
-
DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover
*.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2";;
- LONG_TEST)
- DEFAULT_PYTHON_TESTS=;;
- *)
- DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.*
*.flow_to_disk.MultiDurableQueueDurableMsg*.test_mixed_limit_1";;
-esac
-
-#if test -z $1; then
-# DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.*
*.flow_to_disk.MultiDurableQueueDurableMsg*.test_mixed_limit_1"
-#else
-# if test x$1 == xSHORT_TEST; then
-#
DEFAULT_PYTHON_TESTS="*.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover
*.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2"
-# else
-# DEFAULT_PYTHON_TESTS=$*
-# fi
-#fi
-
-PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}}
-
-OUTDIR=new_python_tests.tmp
-rm -rf $OUTDIR
-
-# To debug a test, add the following options to the end of the following line:
-# -v DEBUG -c qpid.messaging.io.ops [*.testName]
-${PYTHON_DIR}/qpid-python-test -m new_python_tests -I ${FAILING_PYTHON_TESTS}
${PYTHON_TESTS} -DOUTDIR=$OUTDIR
-RETCODE=$?
-
-if test x${RETCODE} != x0; then
- exit 1;
-fi
-exit 0
Copied: store/trunk/cpp/tests/run_python_tests (from rev 3905,
store/trunk/cpp/tests/run_new_python_tests)
===================================================================
--- store/trunk/cpp/tests/run_python_tests (rev 0)
+++ store/trunk/cpp/tests/run_python_tests 2010-04-14 14:41:04 UTC (rev 3910)
@@ -0,0 +1,67 @@
+#!/bin/bash
+#
+# Copyright (c) 2008, 2009 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+if test -z ${QPID_DIR} ; then
+ cat <<EOF
+
+ =========== WARNING: PYTHON TESTS DISABLED ==============
+
+ QPID_DIR not set.
+
+ ===========================================================
+
+EOF
+ exit
+fi
+
+. `dirname $0`/tests_env.sh
+
+MODULENAME=python_tests
+
+echo "Running Python tests in module ${MODULENAME}..."
+
+case x$1 in
+ xSHORT_TEST)
+ DEFAULT_PYTHON_TESTS="*.client_persistence.ExchangeQueueTests.*
*.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover
*.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2"
;;
+ xLONG_TEST)
+ DEFAULT_PYTHON_TESTS= ;;
+ x)
+ DEFAULT_PYTHON_TESTS="*.client_persistence.*
*.flow_to_disk.SimpleMaxSizeCountTest.*
*.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1
*.flow_to_disk.MultiQueue*.test_mixed_limit_1" ;;
+ *)
+ DEFAULT_PYTHON_TESTS=$1
+esac
+
+PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}}
+
+OUTDIR=${MODULENAME}.tmp
+rm -rf $OUTDIR
+
+# To debug a test, add the following options to the end of the following line:
+# -v DEBUG -c qpid.messaging.io.ops [*.testName]
+${PYTHON_DIR}/qpid-python-test -m ${MODULENAME} -I ${FAILING_PYTHON_TESTS}
${PYTHON_TESTS} -DOUTDIR=$OUTDIR
+RETCODE=$?
+
+if test x${RETCODE} != x0; then
+ exit 1;
+fi
+exit 0
Modified: store/trunk/cpp/tests/run_short_python_tests
===================================================================
--- store/trunk/cpp/tests/run_short_python_tests 2010-04-14 14:22:16 UTC (rev 3909)
+++ store/trunk/cpp/tests/run_short_python_tests 2010-04-14 14:41:04 UTC (rev 3910)
@@ -21,4 +21,4 @@
#
# The GNU Lesser General Public License is available in the file COPYING.
-./run_new_python_tests SHORT_TEST
+./run_python_tests SHORT_TEST