[rhmessaging-commits] rhmessaging commits: r4475 - in store/trunk/cpp: tests and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Sep 1 15:27:53 EDT 2011


Author: aconway
Date: 2011-09-01 15:27:52 -0400 (Thu, 01 Sep 2011)
New Revision: 4475

Added:
   store/trunk/cpp/tests/cluster/system_test.sh
Modified:
   store/trunk/cpp/docs/Makefile.am
   store/trunk/cpp/tests/cluster/Makefile.am
   store/trunk/cpp/tests/persistence.py
   store/trunk/cpp/tests/start_broker
Log:
Bug 727182, QPID-3384 - Support DTX transactions in a cluster

Add system_test.sh to cluster test suite for better coverage of DTX.


Modified: store/trunk/cpp/docs/Makefile.am
===================================================================
--- store/trunk/cpp/docs/Makefile.am	2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/docs/Makefile.am	2011-09-01 19:27:52 UTC (rev 4475)
@@ -38,7 +38,7 @@
 
 doxygen:
 if DOXYGEN
-	@doxygen jrnl_tmpl.dox
+	@doxygen ${srcdir}/jrnl_tmpl.dox
 # FIXME: doxygen seems to create files that do not compile under latex on 64-bit
 # so the following section is disabled until this is sorted out.
 #	@make -C latex

Modified: store/trunk/cpp/tests/cluster/Makefile.am
===================================================================
--- store/trunk/cpp/tests/cluster/Makefile.am	2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/tests/cluster/Makefile.am	2011-09-01 19:27:52 UTC (rev 4475)
@@ -37,7 +37,8 @@
 
 TESTS = \
   run_cpp_cluster_tests \
-  run_python_cluster_tests
+  run_python_cluster_tests \
+  system_test.sh
 
 LONG_TESTS = \
   run_long_python_cluster_tests

Added: store/trunk/cpp/tests/cluster/system_test.sh
===================================================================
--- store/trunk/cpp/tests/cluster/system_test.sh	                        (rev 0)
+++ store/trunk/cpp/tests/cluster/system_test.sh	2011-09-01 19:27:52 UTC (rev 4475)
@@ -0,0 +1,54 @@
+#!/bin/bash
+
+# Copyright (c) 2007, 2008 Red Hat, Inc.
+#
+# This file is part of the Qpid async store library msgstore.so.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+# USA
+#
+# The GNU Lesser General Public License is available in the file COPYING.
+
+
+error() { echo $*; exit 1; }
+
+# Make sure $QPID_DIR contains what we need.
+if ! test -d "$QPID_DIR" ; then
+    echo "WARNING: QPID_DIR is not set skipping system tests."
+    exit
+fi
+STORE_LIB=../../lib/.libs/msgstore.so
+CLUSTER_LIB=$QPID_BLD/src/.libs/cluster.so
+xml_spec=$QPID_DIR/specs/amqp.0-10-qpid-errata.xml
+test -f $xml_spec || error "$xml_spec not found: invalid \$QPID_DIR ?"
+export PYTHONPATH=$QPID_DIR/python:$QPID_DIR/extras/qmf/src/py
+
+echo "Using directory $TMP_DATA_DIR"
+
+fail=0
+
+# Run the broker as part of a cluster.
+BROKER_OPTS="--no-module-dir --load-module=$STORE_LIB --load-module=$CLUSTER_LIB --data-dir=$TMP_DATA_DIR --auth=no --wcache-page-size 16 --cluster-name=$HOSTNAME.$$"
+run_tests() {
+    for p in `seq 1 8`; do
+	$abs_srcdir/../start_broker "$@" ${BROKER_OPTS} || { echo "FAIL broker start";  return 1; }
+	python "$abs_srcdir/../persistence.py" -s "$xml_spec" -b localhost:`cat qpidd.port` -p $p -r 3 || fail=1; 
+	$abs_srcdir/../stop_broker
+    done
+}
+
+run_tests || fail=1
+
+exit $fail


Property changes on: store/trunk/cpp/tests/cluster/system_test.sh
___________________________________________________________________
Added: svn:executable
   + *
Added: svn:eol-style
   + native

Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py	2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/tests/persistence.py	2011-09-01 19:27:52 UTC (rev 4475)
@@ -39,7 +39,7 @@
     def createMessage(self, **kwargs):
         session = self.session
         dp = {}
-        dp['delivery_mode'] = 2 
+        dp['delivery_mode'] = 2
         mp = {}
         for k, v in kwargs.iteritems():
             if k in ['routing_key', 'delivery_mode']: dp[k] = v
@@ -53,7 +53,7 @@
 
     def phase1(self):
         session = self.session
-        
+
         session.queue_declare(queue="queue-a", durable=True)
         session.queue_declare(queue="queue-b", durable=True)
         session.exchange_bind(queue="queue-a", exchange="amq.direct", binding_key="a")
@@ -72,11 +72,11 @@
         session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B3"))
         session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C1"))
 
-        
 
-    def phase2(self):    
+
+    def phase2(self):
         session = self.session
-        
+
         #check queues exists
         session.queue_declare(queue="queue-a", durable=True, passive=True)
         session.queue_declare(queue="queue-b", durable=True, passive=True)
@@ -88,10 +88,10 @@
         for r in responses:
             self.assert_(not r.exchange_not_found)
             self.assert_(not r.queue_not_found)
-            self.assert_(not r.key_not_matched)        
+            self.assert_(not r.key_not_matched)
 
 
-        #check expected messages are there        
+        #check expected messages are there
         self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1")
         self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1")
 
@@ -132,7 +132,7 @@
         session.message_accept(accepted)
 
 
-    def phase3(self):    
+    def phase3(self):
         session = self.session
 
         #lvq recovery validation
@@ -154,14 +154,14 @@
         session.message_cancel(destination="lvq")
         session.queue_delete(queue="lvq-test")
 
-        
+
         #check queues exists
         session.queue_declare(queue="queue-a", durable=True, passive=True)
         session.queue_declare(queue="queue-b", durable=True, passive=True)
         session.queue_declare(queue="queue-c", durable=True, passive=True)
 
         session.tx_select()
-        #check expected messages are there        
+        #check expected messages are there
         self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2")
         self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2")
         self.assertMessageOnQueue("queue-c", "Msg0003", "AB_Message2")
@@ -177,10 +177,10 @@
             routing_key="queue-a", correlation_id="Msg0005", body="A_Message4"))
         session.message_transfer(message=self.createMessage(
             routing_key="queue-a", correlation_id="Msg0006", body="A_Message5"))
-        
+
         session.tx_commit()
 
-        
+
         #delete a queue
         session.queue_delete(queue="queue-c")
 
@@ -202,9 +202,9 @@
         session.tx_rollback()
 
 
-    def phase4(self):    
+    def phase4(self):
         session = self.session
-        
+
         #check queues exists
         session.queue_declare(queue="queue-a", durable=True, passive=True)
         session.queue_declare(queue="queue-b", durable=True, passive=True)
@@ -224,11 +224,13 @@
             self.assertEquals(404, e.args[0].error_code)
 
     def phase5(self):
+
         session = self.session
         queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
 
         for q in queues:
             session.queue_declare(queue=q, durable=True)
+            session.queue_purge(queue=q)
 
         session.message_transfer(message=self.createMessage(
             routing_key="queue-a1", correlation_id="MsgA", body="MessageA"))
@@ -239,7 +241,7 @@
         session.message_transfer(message=self.createMessage(
             routing_key="queue-d1", correlation_id="MsgD", body="MessageD"))
 
-        session.dtx_select()        
+        session.dtx_select()
         txa = self.xid('a')
         txb = self.xid('b')
         txc = self.xid('c')
@@ -277,12 +279,12 @@
 
         xids = session.dtx_recover().in_doubt
         ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
-        
-        if txc.global_id not in ids:    
+
+        if txc.global_id not in ids:
             self.fail("Recovered xids not as expected. missing: %s" % (txc))
-        if txd.global_id not in ids:    
+        if txd.global_id not in ids:
             self.fail("Recovered xids not as expected. missing: %s" % (txd))
-        self.assertEqual(2, len(xids))    
+        self.assertEqual(2, len(xids))
 
 
         queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
@@ -318,12 +320,12 @@
 
         xids = session.dtx_recover().in_doubt
         ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
-        
-        if txc.global_id in ids:    
+
+        if txc.global_id in ids:
             self.fail("Xid still present : %s" % (txc))
-        if txd.global_id in ids:    
+        if txd.global_id in ids:
             self.fail("Xid still present : %s" % (txc))
-        self.assertEqual(0, len(xids))    
+        self.assertEqual(0, len(xids))
 
         #test deletion of queue after publish
         #create queue
@@ -352,7 +354,7 @@
 
         #consume the message, cancel subscription (triggering auto-delete), then ack it
         msg = queue.get(timeout = 5)
-        session.message_cancel(destination = "a")        
+        session.message_cancel(destination = "a")
         self.ack(msg)
 
         #test implicit deletion of bindings when queue is deleted
@@ -370,9 +372,9 @@
         session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="a", arguments={"x-match":"all", "p":"a"})
         session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="b", arguments={"x-match":"all", "p":"b"})
         session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="c", arguments={"x-match":"all", "p":"c"})
-        #then restart broker...            
-        
+        #then restart broker...
 
+
     def phase8(self):
         session = self.session
 
@@ -381,7 +383,7 @@
         for k in ["abc", "pqr", "xyz"]:
             data = "first %s" % (k)
             session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
-        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:    
+        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
             data = "first %s" % (a["p"])
             session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
         #unbind some bindings (using final 0-10 semantics)
@@ -391,7 +393,7 @@
         for k in ["abc", "pqr", "xyz"]:
             data = "second %s" % (k)
             session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
-        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:    
+        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
             data = "second %s" % (a["p"])
             session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
 
@@ -399,11 +401,11 @@
         expected = []
         for k in ["abc", "pqr", "xyz"]:
             expected.append("first %s" % (k))
-        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:    
+        for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
             expected.append("first %s" % (a["p"]))
         for k in ["abc", "xyz"]:
             expected.append("second %s" % (k))
-        for a in [{"p":"a"}, {"p":"c"}]:    
+        for a in [{"p":"a"}, {"p":"c"}]:
             expected.append("second %s" % (a["p"]))
 
         session.message_subscribe(queue = "binding-test-queue", destination = "binding-test")
@@ -414,15 +416,15 @@
         while len(expected):
             msg = queue.get(timeout=1)
             if msg.body not in expected:
-                self.fail("Missing message: %s" % msg.body)                
+                self.fail("Missing message: %s" % msg.body)
             expected.remove(msg.body)
         try:
             msg = queue.get(timeout=1)
             self.fail("Got extra message: %s" % msg.body)
         except Empty: pass
 
-        
 
+
         session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
         session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
         session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
@@ -431,9 +433,9 @@
 
     def xid(self, txid, branchqual = ''):
         return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
-        
+
     def txswap(self, src, dest, tx):
-        self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)        
+        self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
         self.session.message_subscribe(destination="temp-swap", queue=src, accept_mode=0)
         self.session.message_flow(destination="temp-swap", unit=0, value=1)
         self.session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
@@ -443,16 +445,16 @@
                                                       body=msg.body))
         self.ack(msg)
         self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
-        
-    def assertEmptyQueue(self, name):    
+
+    def assertEmptyQueue(self, name):
         self.assertEqual(0, self.session.queue_query(queue=name).message_count)
 
-    def assertConnectionException(self, expectedCode, message): 
+    def assertConnectionException(self, expectedCode, message):
         self.assertEqual("connection", message.method.klass.name)
         self.assertEqual("close", message.method.name)
         self.assertEqual(expectedCode, message.reply_code)
 
-    def assertExpectedMethod(self, reply, klass, method):    
+    def assertExpectedMethod(self, reply, klass, method):
         self.assertEqual(klass, reply.method.klass.name)
         self.assertEqual(method, reply.method.name)
 
@@ -464,7 +466,7 @@
     def getProperty(self, msg, name):
         for h in msg.headers:
             if hasattr(h, name): return getattr(h, name)
-        return None            
+        return None
 
     def ack(self, *msgs):
         session = self.session
@@ -477,7 +479,7 @@
         session.channel.session_completed(session.receiver._completed)
 
     def assertExpectedGetResult(self, id, body):
-        return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body)    
+        return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body)
 
     def assertEqual(self, expected, actual, msg=''):
         if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual))
@@ -489,7 +491,7 @@
         msg = self.session.incoming("incoming-gets").get(timeout=1)
         self.assertExpectedContent(msg, id, body)
         self.ack(msg)
-        self.session.message_cancel(destination="incoming-gets")        
+        self.session.message_cancel(destination="incoming-gets")
 
 
     def __init__(self):
@@ -500,7 +502,7 @@
     def connect(self):
         """ Connects to the broker  """
         self.conn = Connection(connect(self.host, self.port))
-        self.conn.start(timeout=10)        
+        self.conn.start(timeout=10)
         self.session = self.conn.session("test-session", timeout=10)
 
     def run(self, args=sys.argv[1:]):
@@ -518,7 +520,7 @@
             if opt in ("-p", "--phase"): phase = int(value)
             if opt in ("-r", "--retry"): retry = int(value)
 
-        if not phase: self._die("please specify the phase to run")    
+        if not phase: self._die("please specify the phase to run")
         phase = "phase%d" % phase
         self.connect()
 
@@ -531,7 +533,7 @@
             traceback.print_exc()
             res = False
 
-        
+
         if not self.session.error(): self.session.close(timeout=10)
         self.conn.close(timeout=10)
 
@@ -539,7 +541,7 @@
          # Reduces occurrences of "Unhandled exception in thread" messages after each test
         import time
         time.sleep(1)
-        
+
         return res
 
 

Modified: store/trunk/cpp/tests/start_broker
===================================================================
--- store/trunk/cpp/tests/start_broker	2011-08-31 14:52:42 UTC (rev 4474)
+++ store/trunk/cpp/tests/start_broker	2011-09-01 19:27:52 UTC (rev 4475)
@@ -23,5 +23,5 @@
 
 QPIDD=$QPID_BLD/src/qpidd
 rm -f qpidd.vglog* qpidd.log
-test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=qpidd.vglog --"
+test -n "$VALGRIND" && VALGRIND="$VALGRIND --log-file=qpidd.vglog --suppressions=$QPID_DIR/cpp/src/tests/.valgrind.supp --"
 exec libtool --mode=execute $VALGRIND $QPIDD --daemon --port=0 --log-enable error+ --log-to-file qpidd.log "$@" > qpidd.port



More information about the rhmessaging-commits mailing list