rhmessaging commits: r4475 - in store/trunk/cpp: tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2011-09-01 15:27:52 -0400 (Thu, 01 Sep 2011)
New Revision: 4475
Added:
store/trunk/cpp/tests/cluster/system_test.sh
Modified:
store/trunk/cpp/docs/Makefile.am
store/trunk/cpp/tests/cluster/Makefile.am
store/trunk/cpp/tests/persistence.py
store/trunk/cpp/tests/start_broker
Log:
Bug 727182, QPID-3384 - Support DTX transactions in a cluster
Add system_test.sh to cluster test suite for better coverage of DTX.
Modified: store/trunk/cpp/docs/Makefile.am
===================================================================
--- store/trunk/cpp/docs/Makefile.am 2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/docs/Makefile.am 2011-09-01 19:27:52 UTC (rev 4475)
@@ -38,7 +38,7 @@
doxygen:
if DOXYGEN
- @doxygen jrnl_tmpl.dox
+ @doxygen ${srcdir}/jrnl_tmpl.dox
# FIXME: doxygen seems to create files that do not compile under latex on 64-bit
# so the following section is disabled until this is sorted out.
# @make -C latex
Modified: store/trunk/cpp/tests/cluster/Makefile.am
===================================================================
--- store/trunk/cpp/tests/cluster/Makefile.am 2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/tests/cluster/Makefile.am 2011-09-01 19:27:52 UTC (rev 4475)
@@ -37,7 +37,8 @@
TESTS = \
run_cpp_cluster_tests \
- run_python_cluster_tests
+ run_python_cluster_tests \
+ system_test.sh
LONG_TESTS = \
run_long_python_cluster_tests
Added: store/trunk/cpp/tests/cluster/system_test.sh
===================================================================
--- store/trunk/cpp/tests/cluster/system_test.sh (rev 0)
+++ store/trunk/cpp/tests/cluster/system_test.sh 2011-09-01 19:27:52 UTC (rev 4475)
@@ -0,0 +1,54 @@
+#!/bin/bash
+
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+
+error() { echo $*; exit 1; }
+
+# Make sure $QPID_DIR contains what we need.
+if ! test -d "$QPID_DIR" ; then
+ echo "WARNING: QPID_DIR is not set skipping system tests."
+ exit
+fi
+STORE_LIB=../../lib/.libs/msgstore.so
+CLUSTER_LIB=$QPID_BLD/src/.libs/cluster.so
+xml_spec=$QPID_DIR/specs/amqp.0-10-qpid-errata.xml
+test -f $xml_spec || error "$xml_spec not found: invalid \$QPID_DIR ?"
+export PYTHONPATH=$QPID_DIR/python:$QPID_DIR/extras/qmf/src/py
+
+echo "Using directory $TMP_DATA_DIR"
+
+fail=0
+
+# Run the broker as part of a cluster.
+BROKER_OPTS="--no-module-dir --load-module=$STORE_LIB --load-module=$CLUSTER_LIB --data-dir=$TMP_DATA_DIR --auth=no --wcache-page-size 16 --cluster-name=$HOSTNAME.$$"
+run_tests() {
+ for p in `seq 1 8`; do
+ $abs_srcdir/../start_broker "$@" ${BROKER_OPTS} || { echo "FAIL broker start"; return 1; }
+ python "$abs_srcdir/../persistence.py" -s "$xml_spec" -b localhost:`cat qpidd.port` -p $p -r 3 || fail=1;
+ $abs_srcdir/../stop_broker
+ done
+}
+
+run_tests || fail=1
+
+exit $fail
Property changes on: store/trunk/cpp/tests/cluster/system_test.sh
___________________________________________________________________
Added: svn:executable
+ *
Added: svn:eol-style
+ native
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/tests/persistence.py 2011-09-01 19:27:52 UTC (rev 4475)
@@ -39,7 +39,7 @@
def createMessage(self, **kwargs):
session = self.session
dp = {}
- dp['delivery_mode'] = 2
+ dp['delivery_mode'] = 2
mp = {}
for k, v in kwargs.iteritems():
if k in ['routing_key', 'delivery_mode']: dp[k] = v
@@ -53,7 +53,7 @@
def phase1(self):
session = self.session
-
+
session.queue_declare(queue="queue-a", durable=True)
session.queue_declare(queue="queue-b", durable=True)
session.exchange_bind(queue="queue-a", exchange="amq.direct", binding_key="a")
@@ -72,11 +72,11 @@
session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B3"))
session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C1"))
-
- def phase2(self):
+
+ def phase2(self):
session = self.session
-
+
#check queues exists
session.queue_declare(queue="queue-a", durable=True, passive=True)
session.queue_declare(queue="queue-b", durable=True, passive=True)
@@ -88,10 +88,10 @@
for r in responses:
self.assert_(not r.exchange_not_found)
self.assert_(not r.queue_not_found)
- self.assert_(not r.key_not_matched)
+ self.assert_(not r.key_not_matched)
- #check expected messages are there
+ #check expected messages are there
self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1")
self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1")
@@ -132,7 +132,7 @@
session.message_accept(accepted)
- def phase3(self):
+ def phase3(self):
session = self.session
#lvq recovery validation
@@ -154,14 +154,14 @@
session.message_cancel(destination="lvq")
session.queue_delete(queue="lvq-test")
-
+
#check queues exists
session.queue_declare(queue="queue-a", durable=True, passive=True)
session.queue_declare(queue="queue-b", durable=True, passive=True)
session.queue_declare(queue="queue-c", durable=True, passive=True)
session.tx_select()
- #check expected messages are there
+ #check expected messages are there
self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2")
self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2")
self.assertMessageOnQueue("queue-c", "Msg0003", "AB_Message2")
@@ -177,10 +177,10 @@
routing_key="queue-a", correlation_id="Msg0005", body="A_Message4"))
session.message_transfer(message=self.createMessage(
routing_key="queue-a", correlation_id="Msg0006", body="A_Message5"))
-
+
session.tx_commit()
-
+
#delete a queue
session.queue_delete(queue="queue-c")
@@ -202,9 +202,9 @@
session.tx_rollback()
- def phase4(self):
+ def phase4(self):
session = self.session
-
+
#check queues exists
session.queue_declare(queue="queue-a", durable=True, passive=True)
session.queue_declare(queue="queue-b", durable=True, passive=True)
@@ -224,11 +224,13 @@
self.assertEquals(404, e.args[0].error_code)
def phase5(self):
+
session = self.session
queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
for q in queues:
session.queue_declare(queue=q, durable=True)
+ session.queue_purge(queue=q)
session.message_transfer(message=self.createMessage(
routing_key="queue-a1", correlation_id="MsgA", body="MessageA"))
@@ -239,7 +241,7 @@
session.message_transfer(message=self.createMessage(
routing_key="queue-d1", correlation_id="MsgD", body="MessageD"))
- session.dtx_select()
+ session.dtx_select()
txa = self.xid('a')
txb = self.xid('b')
txc = self.xid('c')
@@ -277,12 +279,12 @@
xids = session.dtx_recover().in_doubt
ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
-
- if txc.global_id not in ids:
+
+ if txc.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
- if txd.global_id not in ids:
+ if txd.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txd))
- self.assertEqual(2, len(xids))
+ self.assertEqual(2, len(xids))
queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
@@ -318,12 +320,12 @@
xids = session.dtx_recover().in_doubt
ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
-
- if txc.global_id in ids:
+
+ if txc.global_id in ids:
self.fail("Xid still present : %s" % (txc))
- if txd.global_id in ids:
+ if txd.global_id in ids:
self.fail("Xid still present : %s" % (txc))
- self.assertEqual(0, len(xids))
+ self.assertEqual(0, len(xids))
#test deletion of queue after publish
#create queue
@@ -352,7 +354,7 @@
#consume the message, cancel subscription (triggering auto-delete), then ack it
msg = queue.get(timeout = 5)
- session.message_cancel(destination = "a")
+ session.message_cancel(destination = "a")
self.ack(msg)
#test implicit deletion of bindings when queue is deleted
@@ -370,9 +372,9 @@
session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="a", arguments={"x-match":"all", "p":"a"})
session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="b", arguments={"x-match":"all", "p":"b"})
session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="c", arguments={"x-match":"all", "p":"c"})
- #then restart broker...
-
+ #then restart broker...
+
def phase8(self):
session = self.session
@@ -381,7 +383,7 @@
for k in ["abc", "pqr", "xyz"]:
data = "first %s" % (k)
session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
- for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
+ for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
data = "first %s" % (a["p"])
session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
#unbind some bindings (using final 0-10 semantics)
@@ -391,7 +393,7 @@
for k in ["abc", "pqr", "xyz"]:
data = "second %s" % (k)
session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
- for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
+ for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
data = "second %s" % (a["p"])
session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
@@ -399,11 +401,11 @@
expected = []
for k in ["abc", "pqr", "xyz"]:
expected.append("first %s" % (k))
- for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
+ for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
expected.append("first %s" % (a["p"]))
for k in ["abc", "xyz"]:
expected.append("second %s" % (k))
- for a in [{"p":"a"}, {"p":"c"}]:
+ for a in [{"p":"a"}, {"p":"c"}]:
expected.append("second %s" % (a["p"]))
session.message_subscribe(queue = "binding-test-queue", destination = "binding-test")
@@ -414,15 +416,15 @@
while len(expected):
msg = queue.get(timeout=1)
if msg.body not in expected:
- self.fail("Missing message: %s" % msg.body)
+ self.fail("Missing message: %s" % msg.body)
expected.remove(msg.body)
try:
msg = queue.get(timeout=1)
self.fail("Got extra message: %s" % msg.body)
except Empty: pass
-
+
session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
@@ -431,9 +433,9 @@
def xid(self, txid, branchqual = ''):
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
-
+
def txswap(self, src, dest, tx):
- self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
self.session.message_subscribe(destination="temp-swap", queue=src, accept_mode=0)
self.session.message_flow(destination="temp-swap", unit=0, value=1)
self.session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
@@ -443,16 +445,16 @@
body=msg.body))
self.ack(msg)
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
-
- def assertEmptyQueue(self, name):
+
+ def assertEmptyQueue(self, name):
self.assertEqual(0, self.session.queue_query(queue=name).message_count)
- def assertConnectionException(self, expectedCode, message):
+ def assertConnectionException(self, expectedCode, message):
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
- def assertExpectedMethod(self, reply, klass, method):
+ def assertExpectedMethod(self, reply, klass, method):
self.assertEqual(klass, reply.method.klass.name)
self.assertEqual(method, reply.method.name)
@@ -464,7 +466,7 @@
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
- return None
+ return None
def ack(self, *msgs):
session = self.session
@@ -477,7 +479,7 @@
session.channel.session_completed(session.receiver._completed)
def assertExpectedGetResult(self, id, body):
- return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body)
+ return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body)
def assertEqual(self, expected, actual, msg=''):
if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual))
@@ -489,7 +491,7 @@
msg = self.session.incoming("incoming-gets").get(timeout=1)
self.assertExpectedContent(msg, id, body)
self.ack(msg)
- self.session.message_cancel(destination="incoming-gets")
+ self.session.message_cancel(destination="incoming-gets")
def __init__(self):
@@ -500,7 +502,7 @@
def connect(self):
""" Connects to the broker """
self.conn = Connection(connect(self.host, self.port))
- self.conn.start(timeout=10)
+ self.conn.start(timeout=10)
self.session = self.conn.session("test-session", timeout=10)
def run(self, args=sys.argv[1:]):
@@ -518,7 +520,7 @@
if opt in ("-p", "--phase"): phase = int(value)
if opt in ("-r", "--retry"): retry = int(value)
- if not phase: self._die("please specify the phase to run")
+ if not phase: self._die("please specify the phase to run")
phase = "phase%d" % phase
self.connect()
@@ -531,7 +533,7 @@
traceback.print_exc()
res = False
-
+
if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
@@ -539,7 +541,7 @@
# Reduces occurrences of "Unhandled exception in thread" messages after each test
import time
time.sleep(1)
-
+
return res
Modified: store/trunk/cpp/tests/start_broker
===================================================================
--- store/trunk/cpp/tests/start_broker 2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/tests/start_broker 2011-09-01 19:27:52 UTC (rev 4475)
@@ -23,5 +23,5 @@
QPIDD=$QPID_BLD/src/qpidd
rm -f qpidd.vglog* qpidd.log
-test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=qpidd.vglog --"
+test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=qpidd.vglog --suppressions=$QPID_DIR/cpp/src/tests/.valgrind.supp --"
exec libtool --mode=execute $VALGRIND $QPIDD --daemon --port=0 --log-enable error+ --log-to-file qpidd.log "$@" > qpidd.port