Author: gordonsim
Date: 2008-04-23 10:09:44 -0400 (Wed, 23 Apr 2008)
New Revision: 1966
Modified:
store/trunk/cpp/tests/persistence.py
store/trunk/cpp/tests/system_test.sh
Log:
Switch system tests over to final 0-10 spec
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-04-23 09:25:04 UTC (rev 1965)
+++ store/trunk/cpp/tests/persistence.py 2008-04-23 14:09:44 UTC (rev 1966)
@@ -22,48 +22,66 @@
import sys, re, traceback, socket
from getopt import getopt, GetoptError
-import qpid.client, qpid.spec, qpid.content, qpid.testlib
-from qpid.client import Closed
+
+from qpid.connection import Connection
+from qpid.util import connect
+from qpid.spec010 import load
+from qpid.datatypes import Message, RangedSet
from qpid.queue import Empty
-from qpid.content import Content
-from struct import *
+from qpid.session import SessionException
+from qpid.testlib import TestBase010
from time import sleep
-class PersistenceTest(qpid.testlib.TestBase):
+class PersistenceTest(TestBase010):
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 0
+ def createMessage(self, **kwargs):
+ session = self.session
+ dp = {}
+ dp['delivery_mode'] = 2
+ mp = {}
+ for k, v in kwargs.iteritems():
+ if k in ['routing_key', 'delivery_mode']: dp[k] = v
+ if k in ['message_id', 'correlation_id',
'application_headers']: mp[k] = v
+ args = []
+ args.append(session.delivery_properties(**dp))
+ if len(mp):
+ args.append(session.message_properties(**mp))
+ if kwargs.has_key('body'): args.append(kwargs['body'])
+ return Message(*args)
+
def phase1(self):
- channel = self.channel
+ session = self.session
- channel.queue_declare(queue="queue-a", durable=True)
- channel.queue_declare(queue="queue-b", durable=True)
- channel.queue_bind(queue="queue-a", exchange="amq.direct",
routing_key="a")
- channel.queue_bind(queue="queue-b", exchange="amq.direct",
routing_key="b")
+ session.queue_declare(queue="queue-a", durable=True)
+ session.queue_declare(queue="queue-b", durable=True)
+ session.exchange_bind(queue="queue-a", exchange="amq.direct",
binding_key="a")
+ session.exchange_bind(queue="queue-b", exchange="amq.direct",
binding_key="b")
- channel.message_transfer(destination="amq.direct",
-
content=Content(properties={'routing_key':"a",
'message_id':"Msg0001", 'delivery_mode':2},
body="A_Message1"))
- channel.message_transfer(destination="amq.direct",
-
content=Content(properties={'routing_key':"b",
'message_id':"Msg0002", 'delivery_mode':2},
body="B_Message1"))
+ session.message_transfer(destination="amq.direct",
+ message=self.createMessage(routing_key="a",
correlation_id="Msg0001", body="A_Message1"))
+ session.message_transfer(destination="amq.direct",
+ message=self.createMessage(routing_key="b",
correlation_id="Msg0002", body="B_Message1"))
def phase2(self):
- channel = self.channel
+ session = self.session
#check queues exists
- channel.queue_declare(queue="queue-a", durable=True, passive=True)
- channel.queue_declare(queue="queue-b", durable=True, passive=True)
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
#check they are still bound to amq.direct correctly
responses = []
- responses.append(channel.binding_query(queue="queue-a",
exchange="amq.direct", routing_key="a"))
- responses.append(channel.binding_query(queue="queue-b",
exchange="amq.direct", routing_key="b"))
+ responses.append(session.exchange_bound(queue="queue-a",
exchange="amq.direct", binding_key="a"))
+ responses.append(session.exchange_bound(queue="queue-b",
exchange="amq.direct", binding_key="b"))
for r in responses:
- self.assertEqual(False, r.exchange_not_found)
- self.assertEqual(False, r.queue_not_found)
- self.assertEqual(False, r.key_not_matched)
+ self.assert_(not r.exchange_not_found)
+ self.assert_(not r.queue_not_found)
+ self.assert_(not r.key_not_matched)
#check expected messages are there
@@ -73,26 +91,26 @@
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
- channel.queue_declare(queue="queue-c", durable=True)
+ session.queue_declare(queue="queue-c", durable=True)
#send a message to a topic such that it reaches all queues
- channel.queue_bind(queue="queue-a", exchange="amq.topic",
routing_key="abc")
- channel.queue_bind(queue="queue-b", exchange="amq.topic",
routing_key="abc")
- channel.queue_bind(queue="queue-c", exchange="amq.topic",
routing_key="abc")
+ session.exchange_bind(queue="queue-a", exchange="amq.topic",
binding_key="abc")
+ session.exchange_bind(queue="queue-b", exchange="amq.topic",
binding_key="abc")
+ session.exchange_bind(queue="queue-c", exchange="amq.topic",
binding_key="abc")
- channel.message_transfer(destination="amq.topic",
-
content=Content(properties={'routing_key':"abc",
'message_id':"Msg0003", 'delivery_mode':2},
body="AB_Message2"))
+ session.message_transfer(destination="amq.topic",
+ message=self.createMessage(routing_key="abc",
correlation_id="Msg0003", body="AB_Message2"))
def phase3(self):
- channel = self.channel
+ session = self.session
#check queues exists
- channel.queue_declare(queue="queue-a", durable=True, passive=True)
- channel.queue_declare(queue="queue-b", durable=True, passive=True)
- channel.queue_declare(queue="queue-c", durable=True, passive=True)
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
+ session.queue_declare(queue="queue-c", durable=True, passive=True)
- channel.tx_select()
+ session.tx_select()
#check expected messages are there
self.assertMessageOnQueue("queue-a", "Msg0003",
"AB_Message2")
self.assertMessageOnQueue("queue-b", "Msg0003",
"AB_Message2")
@@ -103,46 +121,43 @@
self.assertEmptyQueue("queue-c")
#note: default bindings must be restored for this to work
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a",
'message_id':"Msg0004", 'delivery_mode':2},
- body="A_Message3"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a",
'message_id':"Msg0005", 'delivery_mode':2},
- body="A_Message4"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a",
'message_id':"Msg0006", 'delivery_mode':2},
- body="A_Message5"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0004",
body="A_Message3"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0005",
body="A_Message4"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0006",
body="A_Message5"))
- channel.tx_commit()
+ session.tx_commit()
#delete a queue
- channel.queue_delete(queue="queue-c")
+ session.queue_delete(queue="queue-c")
- channel.message_subscribe(destination="ctag",
queue="queue-a", confirm_mode=1)
- channel.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF)
- included = self.client.queue("ctag")
- msg = included.get(timeout=1)
- self.assertExpectedContent(msg, "Msg0004", "A_Message3")
- msg = included.get(timeout=1)
- self.assertExpectedContent(msg, "Msg0005", "A_Message4")
- msg = included.get(timeout=1)
- self.assertExpectedContent(msg, "Msg0006",
"A_Message5").complete()
+ session.message_subscribe(destination="ctag",
queue="queue-a", accept_mode=0)
+ session.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF)
+ included = session.incoming("ctag")
+ msg1 = included.get(timeout=1)
+ self.assertExpectedContent(msg1, "Msg0004", "A_Message3")
+ msg2 = included.get(timeout=1)
+ self.assertExpectedContent(msg2, "Msg0005", "A_Message4")
+ msg3 = included.get(timeout=1)
+ self.assertExpectedContent(msg3, "Msg0006", "A_Message5")
+ self.ack(msg1, msg2, msg3)
- channel.message_transfer(destination="amq.direct", content=Content(
- properties={'routing_key':"queue-b",
'message_id':"Msg0007", 'delivery_mode':2},
- body="B_Message3"))
+ session.message_transfer(destination="amq.direct",
message=self.createMessage(
+ routing_key="queue-b", correlation_id="Msg0007",
body="B_Message3"))
- channel.tx_rollback()
+ session.tx_rollback()
def phase4(self):
- channel = self.channel
+ session = self.session
#check queues exists
- channel.queue_declare(queue="queue-a", durable=True, passive=True)
- channel.queue_declare(queue="queue-b", durable=True, passive=True)
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
self.assertMessageOnQueue("queue-a", "Msg0004",
"A_Message3")
self.assertMessageOnQueue("queue-a", "Msg0005",
"A_Message4")
@@ -153,35 +168,28 @@
#check this queue doesn't exist
try:
- channel.queue_declare(queue="queue-c", durable=True, passive=True)
+ session.queue_declare(queue="queue-c", durable=True, passive=True)
raise Exception("Expected queue-c to have been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
- self.channel = self.client.channel(2)
- self.channel.session_open()
- channel = self.channel
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
def phase5(self):
- channel = self.channel
+ session = self.session
queues = ["queue-a1", "queue-a2", "queue-b1",
"queue-b2", "queue-c1", "queue-c2", "queue-d1",
"queue-d2"]
for q in queues:
- channel.queue_declare(queue=q, durable=True)
+ session.queue_declare(queue=q, durable=True)
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a1",
'message_id':"MsgA", 'delivery_mode':2},
- body="MessageA"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-b1",
'message_id':"MsgB", 'delivery_mode':2},
- body="MessageB"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-c1",
'message_id':"MsgC", 'delivery_mode':2},
- body="MessageC"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-d1",
'message_id':"MsgD", 'delivery_mode':2},
- body="MessageD"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a1", correlation_id="MsgA",
body="MessageA"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-b1", correlation_id="MsgB",
body="MessageB"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-c1", correlation_id="MsgC",
body="MessageC"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-d1", correlation_id="MsgD",
body="MessageD"))
- channel.dtx_demarcation_select()
+ session.dtx_select()
txa = self.xid('a')
txb = self.xid('b')
txc = self.xid('c')
@@ -194,33 +202,35 @@
#no queue should have any messages accessible
for q in queues:
- self.assertEqual(0, channel.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=txa,
one_phase=True).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=txb).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=txc).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=txd).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=txa, one_phase=True).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txb).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txc).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txd).status)
#further checks
not_empty = ["queue-a2", "queue-b1"]
for q in queues:
if q in not_empty:
- self.assertEqual(1, channel.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
+ self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
else:
- self.assertEqual(0, channel.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
def phase6(self):
- channel = self.channel
+ session = self.session
#check prepared transaction are reported correctly by recover
txc = self.xid('c')
txd = self.xid('d')
- xids = channel.dtx_coordination_recover().in_doubt
- if txc not in xids:
+ xids = session.dtx_recover().in_doubt
+ ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+
+ if txc.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
- if txd not in xids:
+ if txd.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
self.assertEqual(2, len(xids))
@@ -232,95 +242,95 @@
not_empty = ["queue-a2", "queue-b1"]
for q in queues:
if q in not_empty:
- self.assertEqual(1, channel.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
+ self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
else:
- self.assertEqual(0, channel.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad
count for %s" % (q))
#complete the prepared transactions
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=txc).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=txd).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=txc).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txd).status)
not_empty.append("queue-c2")
not_empty.append("queue-d1")
for q in queues:
if q in not_empty:
- self.assertEqual(1, channel.queue_query(queue=q).message_count)
+ self.assertEqual(1, session.queue_query(queue=q).message_count)
else:
- self.assertEqual(0, channel.queue_query(queue=q).message_count)
+ self.assertEqual(0, session.queue_query(queue=q).message_count)
def phase7(self):
- channel = self.channel
- channel.synchronous = False
+ session = self.session
+ session.synchronous = False
#test deletion of queue after publish
#create queue
- channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
#send message
for i in range(1, 10):
- channel.message_transfer(content=Content(properties={'routing_key' :
"q", 'delivery_mode':2}, body = "my-message"))
+ session.message_transfer(message=self.createMessage(routing_key =
"q", body = "my-message"))
- channel.synchronous = True
+ session.synchronous = True
#explicitly delete queue
- channel.queue_delete(queue = "q")
+ session.queue_delete(queue = "q")
#test acking of message from auto-deleted queue
#create queue
- channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
#send message
- channel.message_transfer(content=Content(properties={'routing_key' :
"q", 'delivery_mode':2}, body = "my-message"))
+ session.message_transfer(message=self.createMessage(routing_key = "q",
body = "my-message"))
#create consumer
- channel.message_subscribe(queue = "q", destination = "a",
confirm_mode = 1, acquire_mode=0)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- channel.message_flow(unit = 0, value = 10, destination = "a")
- queue = self.client.queue("a")
+ session.message_subscribe(queue = "q", destination = "a",
accept_mode=0, acquire_mode=0)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
+ queue = session.incoming("a")
#consume the message, cancel subscription (triggering auto-delete), then ack it
msg = queue.get(timeout = 5)
- channel.message_cancel(destination = "a")
- msg.complete()
+ session.message_cancel(destination = "a")
+ self.ack(msg)
#test implicit deletion of bindings when queue is deleted
- channel.queue_declare(queue = "durable-subscriber-queue",
exclusive=True, durable=True)
- channel.queue_bind(exchange="amq.topic",
queue="durable-subscriber-queue", routing_key="xyz")
- channel.message_transfer(destination= "amq.topic",
content=Content(properties={'routing_key' : "xyz",
'delivery_mode':2}, body = "my-message"))
- channel.queue_delete(queue = "durable-subscriber-queue")
+ session.queue_declare(queue = "durable-subscriber-queue",
exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic",
queue="durable-subscriber-queue", binding_key="xyz")
+ session.message_transfer(destination= "amq.topic",
message=self.createMessage(routing_key = "xyz", body = "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
#test unbind:
#create a series of bindings to a queue
- channel.queue_declare(queue = "binding-test-queue", durable=True)
- channel.queue_bind(exchange="amq.direct",
queue="binding-test-queue", routing_key="abc")
- channel.queue_bind(exchange="amq.direct",
queue="binding-test-queue", routing_key="pqr")
- channel.queue_bind(exchange="amq.direct",
queue="binding-test-queue", routing_key="xyz")
- channel.queue_bind(exchange="amq.match",
queue="binding-test-queue", routing_key="a",
arguments={"x-match":"all", "p":"a"})
- channel.queue_bind(exchange="amq.match",
queue="binding-test-queue", routing_key="b",
arguments={"x-match":"all", "p":"b"})
- channel.queue_bind(exchange="amq.match",
queue="binding-test-queue", routing_key="c",
arguments={"x-match":"all", "p":"c"})
+ session.queue_declare(queue = "binding-test-queue", durable=True)
+ session.exchange_bind(exchange="amq.direct",
queue="binding-test-queue", binding_key="abc")
+ session.exchange_bind(exchange="amq.direct",
queue="binding-test-queue", binding_key="pqr")
+ session.exchange_bind(exchange="amq.direct",
queue="binding-test-queue", binding_key="xyz")
+ session.exchange_bind(exchange="amq.match",
queue="binding-test-queue", binding_key="a",
arguments={"x-match":"all", "p":"a"})
+ session.exchange_bind(exchange="amq.match",
queue="binding-test-queue", binding_key="b",
arguments={"x-match":"all", "p":"b"})
+ session.exchange_bind(exchange="amq.match",
queue="binding-test-queue", binding_key="c",
arguments={"x-match":"all", "p":"c"})
#then restart broker...
def phase8(self):
- channel = self.channel
+ session = self.session
#continue testing unbind:
#send messages to the queue via each of the bindings
for k in ["abc", "pqr", "xyz"]:
data = "first %s" % (k)
- channel.message_transfer(destination= "amq.direct",
content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+ session.message_transfer(destination= "amq.direct",
message=self.createMessage(routing_key=k, body=data))
for a in [{"p":"a"}, {"p":"b"},
{"p":"c"}]:
data = "first %s" % (a["p"])
- channel.message_transfer(destination="amq.match",
content=Content(data, properties={'application_headers':a }))
+ session.message_transfer(destination="amq.match",
message=self.createMessage(application_headers=a, body=data))
#unbind some bindings (using final 0-10 semantics)
- channel.queue_unbind(exchange="amq.direct",
queue="binding-test-queue", routing_key="pqr")
- channel.queue_unbind(exchange="amq.match",
queue="binding-test-queue", routing_key="b")
+ session.exchange_unbind(exchange="amq.direct",
queue="binding-test-queue", binding_key="pqr")
+ session.exchange_unbind(exchange="amq.match",
queue="binding-test-queue", binding_key="b")
#send messages again
for k in ["abc", "pqr", "xyz"]:
data = "second %s" % (k)
- channel.message_transfer(destination= "amq.direct",
content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+ session.message_transfer(destination= "amq.direct",
message=self.createMessage(routing_key=k, body=data))
for a in [{"p":"a"}, {"p":"b"},
{"p":"c"}]:
data = "second %s" % (a["p"])
- channel.message_transfer(destination="amq.match",
content=Content(data, properties={'application_headers':a }))
+ session.message_transfer(destination="amq.match",
message=self.createMessage(application_headers=a, body=data))
#check that only the correct messages are received
expected = []
@@ -333,52 +343,47 @@
for a in [{"p":"a"}, {"p":"c"}]:
expected.append("second %s" % (a["p"]))
- channel.message_subscribe(queue = "binding-test-queue", destination =
"binding-test")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination =
"binding-test")
- channel.message_flow(unit = 0, value = 10, destination =
"binding-test")
- queue = self.client.queue("binding-test")
+ session.message_subscribe(queue = "binding-test-queue", destination =
"binding-test")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination =
"binding-test")
+ session.message_flow(unit = 0, value = 10, destination =
"binding-test")
+ queue = session.incoming("binding-test")
while len(expected):
msg = queue.get(timeout=1)
- if msg.content.body not in expected:
- self.fail("Missing message: %s" % msg.content.body)
- expected.remove(msg.content.body)
+ if msg.body not in expected:
+ self.fail("Missing message: %s" % msg.body)
+ expected.remove(msg.body)
try:
msg = queue.get(timeout=1)
- self.fail("Got extra message: %s" % msg.content.body)
+ self.fail("Got extra message: %s" % msg.body)
except Empty: pass
- channel.queue_declare(queue = "durable-subscriber-queue",
exclusive=True, durable=True)
- channel.queue_bind(exchange="amq.topic",
queue="durable-subscriber-queue", routing_key="xyz")
- channel.message_transfer(destination= "amq.topic",
content=Content(properties={'routing_key' : "xyz",
'delivery_mode':2}, body = "my-message"))
- channel.queue_delete(queue = "durable-subscriber-queue")
+ session.queue_declare(queue = "durable-subscriber-queue",
exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic",
queue="durable-subscriber-queue", binding_key="xyz")
+ session.message_transfer(destination= "amq.topic",
message=self.createMessage(routing_key = "xyz", body = "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
def xid(self, txid, branchqual = ''):
- return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
def txswap(self, src, dest, tx):
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
- self.channel.message_subscribe(destination="temp-swap", queue=src,
confirm_mode=1)
- self.channel.message_flow(destination="temp-swap", unit=0, value=1)
- self.channel.message_flow(destination="temp-swap", unit=1,
value=0xFFFFFFFF)
- msg = self.client.queue("temp-swap").get(timeout=1)
- self.channel.message_cancel(destination="temp-swap")
-
self.channel.message_transfer(content=Content(properties={'routing_key':dest,
'message_id':msg.content['message_id'], 'delivery_mode':2},
- body=msg.content.body))
- msg.complete();
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
+ self.session.message_subscribe(destination="temp-swap", queue=src,
accept_mode=0)
+ self.session.message_flow(destination="temp-swap", unit=0, value=1)
+ self.session.message_flow(destination="temp-swap", unit=1,
value=0xFFFFFFFF)
+ msg = self.session.incoming("temp-swap").get(timeout=1)
+ self.session.message_cancel(destination="temp-swap")
+ self.session.message_transfer(message=self.createMessage(routing_key=dest,
correlation_id=self.getProperty(msg, 'correlation_id'),
+ body=msg.body))
+ self.ack(msg)
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
def assertEmptyQueue(self, name):
- self.assertEqual(0, self.channel.queue_query(queue=name).message_count)
+ self.assertEqual(0, self.session.queue_query(queue=name).message_count)
- def assertChannelException(self, expectedCode, message):
- self.assertEqual("session", message.method.klass.name)
- self.assertEqual("closed", message.method.name)
- self.assertEqual(expectedCode, message.reply_code)
-
def assertConnectionException(self, expectedCode, message):
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
@@ -388,25 +393,40 @@
self.assertEqual(klass, reply.method.klass.name)
self.assertEqual(method, reply.method.name)
- def assertExpectedContent(self, content, id, body):
- self.assertEqual(id, content.content['message_id'])
- self.assertEqual(body, content.content.body)
- return content
+ def assertExpectedContent(self, msg, id, body):
+ self.assertEqual(id, self.getProperty(msg, 'correlation_id'))
+ self.assertEqual(body, msg.body)
+ return msg
+ def getProperty(self, msg, name):
+ for h in msg.headers:
+ if hasattr(h, name): return getattr(h, name)
+ return None
+
+ def ack(self, *msgs):
+ session = self.session
+ set = RangedSet()
+ for m in msgs:
+ set.add(m.id)
+ #TODO: tidy up completion
+ session.receiver._completed.add(m.id)
+ session.message_accept(set)
+ session.channel.session_completed(session.receiver._completed)
+
def assertExpectedGetResult(self, id, body):
- return
self.assertExpectedContent(self.client.queue("incoming-gets").get(timeout=1),
id, body)
+ return
self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id,
body)
def assertEqual(self, expected, actual, msg=''):
if expected != actual: raise Exception("%s expected: %s actual: %s" %
(msg, expected, actual))
def assertMessageOnQueue(self, queue, id, body):
- self.channel.message_subscribe(destination="incoming-gets",
queue=queue, confirm_mode=1)
- self.channel.message_flow(destination="incoming-gets", unit=0,
value=1)
- self.channel.message_flow(destination="incoming-gets", unit=1,
value=0xFFFFFFFF)
- msg = self.client.queue("incoming-gets").get(timeout=1)
+ self.session.message_subscribe(destination="incoming-gets",
queue=queue, accept_mode=0)
+ self.session.message_flow(destination="incoming-gets", unit=0,
value=1)
+ self.session.message_flow(destination="incoming-gets", unit=1,
value=0xFFFFFFFF)
+ msg = self.session.incoming("incoming-gets").get(timeout=1)
self.assertExpectedContent(msg, id, body)
- msg.complete()
- self.channel.message_cancel(destination="incoming-gets")
+ self.ack(msg)
+ self.session.message_cancel(destination="incoming-gets")
def __init__(self):
@@ -415,10 +435,9 @@
def connect(self):
""" Connects to the broker """
- self.client = qpid.client.Client(self.host, self.port, qpid.spec.load(self.spec,
*self.errata))
- self.client.start("\x00" + self.user + "\x00" +
self.password, mechanism="PLAIN")
- self.channel = self.client.channel(1)
- self.channel.session_open()
+ self.conn = Connection(connect(self.host, self.port), load(self.spec,
*self.errata))
+ self.conn.start(timeout=10)
+ self.session = self.conn.session("test-session", timeout=10)
def run(self, args=sys.argv[1:]):
try:
@@ -449,7 +468,8 @@
res = False
- self.channel.session_close()
+ if not self.session.error(): self.session.close(timeout=10)
+ self.conn.close(timeout=10)
# Crude fix to wait for thread in client to exit after return from
session_close()
# Reduces occurrences of "Unhandled exception in thread" messages
after each test
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-04-23 09:25:04 UTC (rev 1965)
+++ store/trunk/cpp/tests/system_test.sh 2008-04-23 14:09:44 UTC (rev 1966)
@@ -25,7 +25,7 @@
# Make sure $QPID_DIR contains what we need.
test -d "$QPID_DIR" || error "WARNING: QPID_DIR is not set skipping system
tests."
-xml_spec=$QPID_DIR/specs/amqp.0-10-preview.xml
+xml_spec=$QPID_DIR/specs/amqp.0-10-qpid-errata.xml
test -f $xml_spec || error "$xml_spec or $spec_errata or $dtx_preview not found:
invalid \$QPID_DIR ?"
export PYTHONPATH=$QPID_DIR/python