rhmessaging commits: r1968 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-04-23 15:31:21 -0400 (Wed, 23 Apr 2008)
New Revision: 1968
Modified:
mgmt/cumin/python/cumin/page.py
Log:
A slightly longer sleep for our workaround
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2008-04-23 19:22:47 UTC (rev 1967)
+++ mgmt/cumin/python/cumin/page.py 2008-04-23 19:31:21 UTC (rev 1968)
@@ -183,7 +183,7 @@
# resolution of TimestampCol (lastLoggedOut) is too
# coarse for the subsequent comparison of
# lastLoggedOut and lastChallenged
- sleep(1)
+ sleep(2)
self.page.set_redirect_url(session, session.marshal())
16 years, 8 months
rhmessaging commits: r1967 - in mgmt: mint/python/mint and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-04-23 15:22:47 -0400 (Wed, 23 Apr 2008)
New Revision: 1967
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/mint/python/mint/__init__.py
mgmt/notes/justin-todo.txt
Log:
Fix logging configuration
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-04-23 14:09:44 UTC (rev 1966)
+++ mgmt/cumin/python/cumin/__init__.py 2008-04-23 19:22:47 UTC (rev 1967)
@@ -20,6 +20,8 @@
from action import ActionPage
from util import Config
+log = logging.getLogger("cumin")
+
class Cumin(Application):
def __init__(self, home, data_uri, spec_path):
super(Cumin, self).__init__()
@@ -82,11 +84,10 @@
for reg in BrokerRegistration.select():
if reg.broker is None or reg.broker.managedBroker not in \
self.model.data.connections:
-
attempts = self.attempts.get(reg, 0)
attempts += 1
self.attempts[reg] = attempts
-
+
if attempts < 10:
reg.connect(self.model.data)
elif attempts < 100 and attempts % 10 == 0:
@@ -158,33 +159,28 @@
self.add_param("debug", bool, False, summ)
def init(self):
- root = logging.getLogger("cumin")
- root.setLevel(logging.NOTSET)
+ root = logging.getLogger()
self.load_defaults()
self.load_args(sys.argv)
- h = self.get_console_handler()
+ h = logging.StreamHandler()
root.addHandler(h)
self.load_file(os.path.join(self.home, "etc", "cumin.conf"))
self.load_file(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
self.load_args(sys.argv)
root.removeHandler(h)
- h = self.get_console_handler()
+ h = logging.StreamHandler()
+ h.setLevel(self.debug and logging.DEBUG or logging.ERROR)
root.addHandler(h)
h = logging.FileHandler(self.log)
- if self.debug:
- h.setLevel(logging.DEBUG)
- else:
- h.setLevel(logging.INFO)
+ h.setLevel(self.debug and logging.DEBUG or logging.INFO)
root.addHandler(h)
+ log.setLevel(self.debug and logging.DEBUG or logging.INFO)
+
def get_console_handler(self):
h = logging.StreamHandler()
- if self.debug:
- h.setLevel(logging.DEBUG)
- else:
- h.setLevel(logging.ERROR)
return h
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-04-23 14:09:44 UTC (rev 1966)
+++ mgmt/mint/python/mint/__init__.py 2008-04-23 19:22:47 UTC (rev 1967)
@@ -260,6 +260,9 @@
assert MintModel.staticInstance is None
MintModel.staticInstance = self
+ if self.debug:
+ log.setLevel(logging.DEBUG)
+
def check(self):
try:
connectionForURI(self.dataUri)
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2008-04-23 14:09:44 UTC (rev 1966)
+++ mgmt/notes/justin-todo.txt 2008-04-23 19:22:47 UTC (rev 1967)
@@ -32,6 +32,10 @@
* Handle parameter exceptions
+ * Talk about AMQP_SPEC in the readme
+
+ * Fix double login bug
+
Deferred
* Change the way CuminAction.invoke works
16 years, 8 months
rhmessaging commits: r1966 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-04-23 10:09:44 -0400 (Wed, 23 Apr 2008)
New Revision: 1966
Modified:
store/trunk/cpp/tests/persistence.py
store/trunk/cpp/tests/system_test.sh
Log:
Switch system tests over to final 0-10 spec
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2008-04-23 09:25:04 UTC (rev 1965)
+++ store/trunk/cpp/tests/persistence.py 2008-04-23 14:09:44 UTC (rev 1966)
@@ -22,48 +22,66 @@
import sys, re, traceback, socket
from getopt import getopt, GetoptError
-import qpid.client, qpid.spec, qpid.content, qpid.testlib
-from qpid.client import Closed
+
+from qpid.connection import Connection
+from qpid.util import connect
+from qpid.spec010 import load
+from qpid.datatypes import Message, RangedSet
from qpid.queue import Empty
-from qpid.content import Content
-from struct import *
+from qpid.session import SessionException
+from qpid.testlib import TestBase010
from time import sleep
-class PersistenceTest(qpid.testlib.TestBase):
+class PersistenceTest(TestBase010):
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 0
+ def createMessage(self, **kwargs):
+ session = self.session
+ dp = {}
+ dp['delivery_mode'] = 2
+ mp = {}
+ for k, v in kwargs.iteritems():
+ if k in ['routing_key', 'delivery_mode']: dp[k] = v
+ if k in ['message_id', 'correlation_id', 'application_headers']: mp[k] = v
+ args = []
+ args.append(session.delivery_properties(**dp))
+ if len(mp):
+ args.append(session.message_properties(**mp))
+ if kwargs.has_key('body'): args.append(kwargs['body'])
+ return Message(*args)
+
def phase1(self):
- channel = self.channel
+ session = self.session
- channel.queue_declare(queue="queue-a", durable=True)
- channel.queue_declare(queue="queue-b", durable=True)
- channel.queue_bind(queue="queue-a", exchange="amq.direct", routing_key="a")
- channel.queue_bind(queue="queue-b", exchange="amq.direct", routing_key="b")
+ session.queue_declare(queue="queue-a", durable=True)
+ session.queue_declare(queue="queue-b", durable=True)
+ session.exchange_bind(queue="queue-a", exchange="amq.direct", binding_key="a")
+ session.exchange_bind(queue="queue-b", exchange="amq.direct", binding_key="b")
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':"a", 'message_id':"Msg0001", 'delivery_mode':2}, body="A_Message1"))
- channel.message_transfer(destination="amq.direct",
- content=Content(properties={'routing_key':"b", 'message_id':"Msg0002", 'delivery_mode':2}, body="B_Message1"))
+ session.message_transfer(destination="amq.direct",
+ message=self.createMessage(routing_key="a", correlation_id="Msg0001", body="A_Message1"))
+ session.message_transfer(destination="amq.direct",
+ message=self.createMessage(routing_key="b", correlation_id="Msg0002", body="B_Message1"))
def phase2(self):
- channel = self.channel
+ session = self.session
#check queues exists
- channel.queue_declare(queue="queue-a", durable=True, passive=True)
- channel.queue_declare(queue="queue-b", durable=True, passive=True)
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
#check they are still bound to amq.direct correctly
responses = []
- responses.append(channel.binding_query(queue="queue-a", exchange="amq.direct", routing_key="a"))
- responses.append(channel.binding_query(queue="queue-b", exchange="amq.direct", routing_key="b"))
+ responses.append(session.exchange_bound(queue="queue-a", exchange="amq.direct", binding_key="a"))
+ responses.append(session.exchange_bound(queue="queue-b", exchange="amq.direct", binding_key="b"))
for r in responses:
- self.assertEqual(False, r.exchange_not_found)
- self.assertEqual(False, r.queue_not_found)
- self.assertEqual(False, r.key_not_matched)
+ self.assert_(not r.exchange_not_found)
+ self.assert_(not r.queue_not_found)
+ self.assert_(not r.key_not_matched)
#check expected messages are there
@@ -73,26 +91,26 @@
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
- channel.queue_declare(queue="queue-c", durable=True)
+ session.queue_declare(queue="queue-c", durable=True)
#send a message to a topic such that it reaches all queues
- channel.queue_bind(queue="queue-a", exchange="amq.topic", routing_key="abc")
- channel.queue_bind(queue="queue-b", exchange="amq.topic", routing_key="abc")
- channel.queue_bind(queue="queue-c", exchange="amq.topic", routing_key="abc")
+ session.exchange_bind(queue="queue-a", exchange="amq.topic", binding_key="abc")
+ session.exchange_bind(queue="queue-b", exchange="amq.topic", binding_key="abc")
+ session.exchange_bind(queue="queue-c", exchange="amq.topic", binding_key="abc")
- channel.message_transfer(destination="amq.topic",
- content=Content(properties={'routing_key':"abc", 'message_id':"Msg0003", 'delivery_mode':2}, body="AB_Message2"))
+ session.message_transfer(destination="amq.topic",
+ message=self.createMessage(routing_key="abc", correlation_id="Msg0003", body="AB_Message2"))
def phase3(self):
- channel = self.channel
+ session = self.session
#check queues exists
- channel.queue_declare(queue="queue-a", durable=True, passive=True)
- channel.queue_declare(queue="queue-b", durable=True, passive=True)
- channel.queue_declare(queue="queue-c", durable=True, passive=True)
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
+ session.queue_declare(queue="queue-c", durable=True, passive=True)
- channel.tx_select()
+ session.tx_select()
#check expected messages are there
self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2")
self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2")
@@ -103,46 +121,43 @@
self.assertEmptyQueue("queue-c")
#note: default bindings must be restored for this to work
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a", 'message_id':"Msg0004", 'delivery_mode':2},
- body="A_Message3"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a", 'message_id':"Msg0005", 'delivery_mode':2},
- body="A_Message4"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a", 'message_id':"Msg0006", 'delivery_mode':2},
- body="A_Message5"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0004", body="A_Message3"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0005", body="A_Message4"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a", correlation_id="Msg0006", body="A_Message5"))
- channel.tx_commit()
+ session.tx_commit()
#delete a queue
- channel.queue_delete(queue="queue-c")
+ session.queue_delete(queue="queue-c")
- channel.message_subscribe(destination="ctag", queue="queue-a", confirm_mode=1)
- channel.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF)
- channel.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF)
- included = self.client.queue("ctag")
- msg = included.get(timeout=1)
- self.assertExpectedContent(msg, "Msg0004", "A_Message3")
- msg = included.get(timeout=1)
- self.assertExpectedContent(msg, "Msg0005", "A_Message4")
- msg = included.get(timeout=1)
- self.assertExpectedContent(msg, "Msg0006", "A_Message5").complete()
+ session.message_subscribe(destination="ctag", queue="queue-a", accept_mode=0)
+ session.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF)
+ included = session.incoming("ctag")
+ msg1 = included.get(timeout=1)
+ self.assertExpectedContent(msg1, "Msg0004", "A_Message3")
+ msg2 = included.get(timeout=1)
+ self.assertExpectedContent(msg2, "Msg0005", "A_Message4")
+ msg3 = included.get(timeout=1)
+ self.assertExpectedContent(msg3, "Msg0006", "A_Message5")
+ self.ack(msg1, msg2, msg3)
- channel.message_transfer(destination="amq.direct", content=Content(
- properties={'routing_key':"queue-b", 'message_id':"Msg0007", 'delivery_mode':2},
- body="B_Message3"))
+ session.message_transfer(destination="amq.direct", message=self.createMessage(
+ routing_key="queue-b", correlation_id="Msg0007", body="B_Message3"))
- channel.tx_rollback()
+ session.tx_rollback()
def phase4(self):
- channel = self.channel
+ session = self.session
#check queues exists
- channel.queue_declare(queue="queue-a", durable=True, passive=True)
- channel.queue_declare(queue="queue-b", durable=True, passive=True)
+ session.queue_declare(queue="queue-a", durable=True, passive=True)
+ session.queue_declare(queue="queue-b", durable=True, passive=True)
self.assertMessageOnQueue("queue-a", "Msg0004", "A_Message3")
self.assertMessageOnQueue("queue-a", "Msg0005", "A_Message4")
@@ -153,35 +168,28 @@
#check this queue doesn't exist
try:
- channel.queue_declare(queue="queue-c", durable=True, passive=True)
+ session.queue_declare(queue="queue-c", durable=True, passive=True)
raise Exception("Expected queue-c to have been deleted")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
- self.channel = self.client.channel(2)
- self.channel.session_open()
- channel = self.channel
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
def phase5(self):
- channel = self.channel
+ session = self.session
queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
for q in queues:
- channel.queue_declare(queue=q, durable=True)
+ session.queue_declare(queue=q, durable=True)
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-a1", 'message_id':"MsgA", 'delivery_mode':2},
- body="MessageA"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-b1", 'message_id':"MsgB", 'delivery_mode':2},
- body="MessageB"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-c1", 'message_id':"MsgC", 'delivery_mode':2},
- body="MessageC"))
- channel.message_transfer(content=Content(
- properties={'routing_key':"queue-d1", 'message_id':"MsgD", 'delivery_mode':2},
- body="MessageD"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-a1", correlation_id="MsgA", body="MessageA"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-b1", correlation_id="MsgB", body="MessageB"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-c1", correlation_id="MsgC", body="MessageC"))
+ session.message_transfer(message=self.createMessage(
+ routing_key="queue-d1", correlation_id="MsgD", body="MessageD"))
- channel.dtx_demarcation_select()
+ session.dtx_select()
txa = self.xid('a')
txb = self.xid('b')
txc = self.xid('c')
@@ -194,33 +202,35 @@
#no queue should have any messages accessible
for q in queues:
- self.assertEqual(0, channel.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=txa, one_phase=True).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=txb).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=txc).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=txd).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=txa, one_phase=True).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txb).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txc).status)
+ self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txd).status)
#further checks
not_empty = ["queue-a2", "queue-b1"]
for q in queues:
if q in not_empty:
- self.assertEqual(1, channel.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+ self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
else:
- self.assertEqual(0, channel.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
def phase6(self):
- channel = self.channel
+ session = self.session
#check prepared transaction are reported correctly by recover
txc = self.xid('c')
txd = self.xid('d')
- xids = channel.dtx_coordination_recover().in_doubt
- if txc not in xids:
+ xids = session.dtx_recover().in_doubt
+ ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
+
+ if txc.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
- if txd not in xids:
+ if txd.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
self.assertEqual(2, len(xids))
@@ -232,95 +242,95 @@
not_empty = ["queue-a2", "queue-b1"]
for q in queues:
if q in not_empty:
- self.assertEqual(1, channel.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+ self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
else:
- self.assertEqual(0, channel.queue_query(queue=q).message_count, "Bad count for %s" % (q))
+ self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
#complete the prepared transactions
- self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=txc).status)
- self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=txd).status)
+ self.assertEqual(self.XA_OK, session.dtx_commit(xid=txc).status)
+ self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txd).status)
not_empty.append("queue-c2")
not_empty.append("queue-d1")
for q in queues:
if q in not_empty:
- self.assertEqual(1, channel.queue_query(queue=q).message_count)
+ self.assertEqual(1, session.queue_query(queue=q).message_count)
else:
- self.assertEqual(0, channel.queue_query(queue=q).message_count)
+ self.assertEqual(0, session.queue_query(queue=q).message_count)
def phase7(self):
- channel = self.channel
- channel.synchronous = False
+ session = self.session
+ session.synchronous = False
#test deletion of queue after publish
#create queue
- channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
#send message
for i in range(1, 10):
- channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+ session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message"))
- channel.synchronous = True
+ session.synchronous = True
#explicitly delete queue
- channel.queue_delete(queue = "q")
+ session.queue_delete(queue = "q")
#test acking of message from auto-deleted queue
#create queue
- channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
#send message
- channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+ session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message"))
#create consumer
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=0)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- channel.message_flow(unit = 0, value = 10, destination = "a")
- queue = self.client.queue("a")
+ session.message_subscribe(queue = "q", destination = "a", accept_mode=0, acquire_mode=0)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
+ queue = session.incoming("a")
#consume the message, cancel subscription (triggering auto-delete), then ack it
msg = queue.get(timeout = 5)
- channel.message_cancel(destination = "a")
- msg.complete()
+ session.message_cancel(destination = "a")
+ self.ack(msg)
#test implicit deletion of bindings when queue is deleted
- channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
- channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
- channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
- channel.queue_delete(queue = "durable-subscriber-queue")
+ session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+ session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
#test unbind:
#create a series of bindings to a queue
- channel.queue_declare(queue = "binding-test-queue", durable=True)
- channel.queue_bind(exchange="amq.direct", queue="binding-test-queue", routing_key="abc")
- channel.queue_bind(exchange="amq.direct", queue="binding-test-queue", routing_key="pqr")
- channel.queue_bind(exchange="amq.direct", queue="binding-test-queue", routing_key="xyz")
- channel.queue_bind(exchange="amq.match", queue="binding-test-queue", routing_key="a", arguments={"x-match":"all", "p":"a"})
- channel.queue_bind(exchange="amq.match", queue="binding-test-queue", routing_key="b", arguments={"x-match":"all", "p":"b"})
- channel.queue_bind(exchange="amq.match", queue="binding-test-queue", routing_key="c", arguments={"x-match":"all", "p":"c"})
+ session.queue_declare(queue = "binding-test-queue", durable=True)
+ session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="abc")
+ session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr")
+ session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="xyz")
+ session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="a", arguments={"x-match":"all", "p":"a"})
+ session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="b", arguments={"x-match":"all", "p":"b"})
+ session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="c", arguments={"x-match":"all", "p":"c"})
#then restart broker...
def phase8(self):
- channel = self.channel
+ session = self.session
#continue testing unbind:
#send messages to the queue via each of the bindings
for k in ["abc", "pqr", "xyz"]:
data = "first %s" % (k)
- channel.message_transfer(destination= "amq.direct", content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+ session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
data = "first %s" % (a["p"])
- channel.message_transfer(destination="amq.match", content=Content(data, properties={'application_headers':a }))
+ session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
#unbind some bindings (using final 0-10 semantics)
- channel.queue_unbind(exchange="amq.direct", queue="binding-test-queue", routing_key="pqr")
- channel.queue_unbind(exchange="amq.match", queue="binding-test-queue", routing_key="b")
+ session.exchange_unbind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr")
+ session.exchange_unbind(exchange="amq.match", queue="binding-test-queue", binding_key="b")
#send messages again
for k in ["abc", "pqr", "xyz"]:
data = "second %s" % (k)
- channel.message_transfer(destination= "amq.direct", content=Content(data, properties={'routing_key': k,'delivery_mode':2}))
+ session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
data = "second %s" % (a["p"])
- channel.message_transfer(destination="amq.match", content=Content(data, properties={'application_headers':a }))
+ session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
#check that only the correct messages are received
expected = []
@@ -333,52 +343,47 @@
for a in [{"p":"a"}, {"p":"c"}]:
expected.append("second %s" % (a["p"]))
- channel.message_subscribe(queue = "binding-test-queue", destination = "binding-test")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "binding-test")
- channel.message_flow(unit = 0, value = 10, destination = "binding-test")
- queue = self.client.queue("binding-test")
+ session.message_subscribe(queue = "binding-test-queue", destination = "binding-test")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "binding-test")
+ session.message_flow(unit = 0, value = 10, destination = "binding-test")
+ queue = session.incoming("binding-test")
while len(expected):
msg = queue.get(timeout=1)
- if msg.content.body not in expected:
- self.fail("Missing message: %s" % msg.content.body)
- expected.remove(msg.content.body)
+ if msg.body not in expected:
+ self.fail("Missing message: %s" % msg.body)
+ expected.remove(msg.body)
try:
msg = queue.get(timeout=1)
- self.fail("Got extra message: %s" % msg.content.body)
+ self.fail("Got extra message: %s" % msg.body)
except Empty: pass
- channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
- channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
- channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
- channel.queue_delete(queue = "durable-subscriber-queue")
+ session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+ session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
def xid(self, txid, branchqual = ''):
- return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+ return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
def txswap(self, src, dest, tx):
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
- self.channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1)
- self.channel.message_flow(destination="temp-swap", unit=0, value=1)
- self.channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
- msg = self.client.queue("temp-swap").get(timeout=1)
- self.channel.message_cancel(destination="temp-swap")
- self.channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id'], 'delivery_mode':2},
- body=msg.content.body))
- msg.complete();
- self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
+ self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
+ self.session.message_subscribe(destination="temp-swap", queue=src, accept_mode=0)
+ self.session.message_flow(destination="temp-swap", unit=0, value=1)
+ self.session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
+ msg = self.session.incoming("temp-swap").get(timeout=1)
+ self.session.message_cancel(destination="temp-swap")
+ self.session.message_transfer(message=self.createMessage(routing_key=dest, correlation_id=self.getProperty(msg, 'correlation_id'),
+ body=msg.body))
+ self.ack(msg)
+ self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
def assertEmptyQueue(self, name):
- self.assertEqual(0, self.channel.queue_query(queue=name).message_count)
+ self.assertEqual(0, self.session.queue_query(queue=name).message_count)
- def assertChannelException(self, expectedCode, message):
- self.assertEqual("session", message.method.klass.name)
- self.assertEqual("closed", message.method.name)
- self.assertEqual(expectedCode, message.reply_code)
-
def assertConnectionException(self, expectedCode, message):
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
@@ -388,25 +393,40 @@
self.assertEqual(klass, reply.method.klass.name)
self.assertEqual(method, reply.method.name)
- def assertExpectedContent(self, content, id, body):
- self.assertEqual(id, content.content['message_id'])
- self.assertEqual(body, content.content.body)
- return content
+ def assertExpectedContent(self, msg, id, body):
+ self.assertEqual(id, self.getProperty(msg, 'correlation_id'))
+ self.assertEqual(body, msg.body)
+ return msg
+ def getProperty(self, msg, name):
+ for h in msg.headers:
+ if hasattr(h, name): return getattr(h, name)
+ return None
+
+ def ack(self, *msgs):
+ session = self.session
+ set = RangedSet()
+ for m in msgs:
+ set.add(m.id)
+ #TODO: tidy up completion
+ session.receiver._completed.add(m.id)
+ session.message_accept(set)
+ session.channel.session_completed(session.receiver._completed)
+
def assertExpectedGetResult(self, id, body):
- return self.assertExpectedContent(self.client.queue("incoming-gets").get(timeout=1), id, body)
+ return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body)
def assertEqual(self, expected, actual, msg=''):
if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual))
def assertMessageOnQueue(self, queue, id, body):
- self.channel.message_subscribe(destination="incoming-gets", queue=queue, confirm_mode=1)
- self.channel.message_flow(destination="incoming-gets", unit=0, value=1)
- self.channel.message_flow(destination="incoming-gets", unit=1, value=0xFFFFFFFF)
- msg = self.client.queue("incoming-gets").get(timeout=1)
+ self.session.message_subscribe(destination="incoming-gets", queue=queue, accept_mode=0)
+ self.session.message_flow(destination="incoming-gets", unit=0, value=1)
+ self.session.message_flow(destination="incoming-gets", unit=1, value=0xFFFFFFFF)
+ msg = self.session.incoming("incoming-gets").get(timeout=1)
self.assertExpectedContent(msg, id, body)
- msg.complete()
- self.channel.message_cancel(destination="incoming-gets")
+ self.ack(msg)
+ self.session.message_cancel(destination="incoming-gets")
def __init__(self):
@@ -415,10 +435,9 @@
def connect(self):
""" Connects to the broker """
- self.client = qpid.client.Client(self.host, self.port, qpid.spec.load(self.spec, *self.errata))
- self.client.start("\x00" + self.user + "\x00" + self.password, mechanism="PLAIN")
- self.channel = self.client.channel(1)
- self.channel.session_open()
+ self.conn = Connection(connect(self.host, self.port), load(self.spec, *self.errata))
+ self.conn.start(timeout=10)
+ self.session = self.conn.session("test-session", timeout=10)
def run(self, args=sys.argv[1:]):
try:
@@ -449,7 +468,8 @@
res = False
- self.channel.session_close()
+ if not self.session.error(): self.session.close(timeout=10)
+ self.conn.close(timeout=10)
# Crude fix to wait for thread in client to exit after return from session_close()
# Reduces occurrences of "Unhandled exception in thread" messages after each test
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2008-04-23 09:25:04 UTC (rev 1965)
+++ store/trunk/cpp/tests/system_test.sh 2008-04-23 14:09:44 UTC (rev 1966)
@@ -25,7 +25,7 @@
# Make sure $QPID_DIR contains what we need.
test -d "$QPID_DIR" || error "WARNING: QPID_DIR is not set skipping system tests."
-xml_spec=$QPID_DIR/specs/amqp.0-10-preview.xml
+xml_spec=$QPID_DIR/specs/amqp.0-10-qpid-errata.xml
test -f $xml_spec || error "$xml_spec or $spec_errata or $dtx_preview not found: invalid \$QPID_DIR ?"
export PYTHONPATH=$QPID_DIR/python
16 years, 8 months
rhmessaging commits: r1965 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-04-23 05:25:04 -0400 (Wed, 23 Apr 2008)
New Revision: 1965
Modified:
store/trunk/cpp/tests/Makefile.am
Log:
Changed to use abs_builddir/.. as prefix for lib
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2008-04-23 09:11:27 UTC (rev 1964)
+++ store/trunk/cpp/tests/Makefile.am 2008-04-23 09:25:04 UTC (rev 1965)
@@ -44,6 +44,6 @@
QPID_DIR=$(QPID_DIR) \
VALGRIND=$(VALGRIND) \
abs_srcdir=$(abs_srcdir) \
- LIBBDBSTORE=$(abs_top_builddir)/lib/.libs/libbdbstore.so \
+ LIBBDBSTORE=$(abs_builddir)/../lib/.libs/libbdbstore.so \
$(srcdir)/run_test
16 years, 8 months
rhmessaging commits: r1964 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-04-23 05:11:27 -0400 (Wed, 23 Apr 2008)
New Revision: 1964
Modified:
store/trunk/cpp/tests/Makefile.am
Log:
Add start/stop broker scripts to distribution
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2008-04-23 08:03:40 UTC (rev 1963)
+++ store/trunk/cpp/tests/Makefile.am 2008-04-23 09:11:27 UTC (rev 1964)
@@ -38,7 +38,7 @@
> $@-t
mv $@-t $@
-EXTRA_DIST = clean.sh system_test.sh persistence.py MessageUtils.h run_test vg_check .valgrindrc .valgrind.supp
+EXTRA_DIST = start_broker stop_broker clean.sh system_test.sh persistence.py MessageUtils.h run_test vg_check .valgrindrc .valgrind.supp
TESTS_ENVIRONMENT = \
QPID_DIR=$(QPID_DIR) \
16 years, 8 months
rhmessaging commits: r1963 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2008-04-23 04:03:40 -0400 (Wed, 23 Apr 2008)
New Revision: 1963
Modified:
store/trunk/cpp/lib/Makefile.am
Log:
Add back missing header to distribution
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2008-04-23 02:17:04 UTC (rev 1962)
+++ store/trunk/cpp/lib/Makefile.am 2008-04-23 08:03:40 UTC (rev 1963)
@@ -55,8 +55,8 @@
jrnl/nlfh.cpp \
jrnl/pmgr.cpp \
jrnl/rmgr.cpp \
+ jrnl/rrfc.cpp \
jrnl/slock.cpp \
- jrnl/rrfc.cpp \
jrnl/time_ns.cpp \
jrnl/txn_map.cpp \
jrnl/txn_rec.cpp \
@@ -89,6 +89,7 @@
jrnl/rmgr.hpp \
jrnl/rrfc.hpp \
jrnl/slock.hpp \
+ jrnl/time_ns.hpp \
jrnl/txn_hdr.hpp \
jrnl/txn_map.hpp \
jrnl/txn_rec.hpp \
16 years, 8 months
rhmessaging commits: r1962 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2008-04-22 22:17:04 -0400 (Tue, 22 Apr 2008)
New Revision: 1962
Modified:
store/trunk/cpp/tests/SimpleTest.cpp
Log:
Updated to 0-10 Str16 for AMQP 0-10 final
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2008-04-22 22:09:25 UTC (rev 1961)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2008-04-23 02:17:04 UTC (rev 1962)
@@ -226,7 +226,7 @@
BOOST_REQUIRE_EQUAL(routingKey, msg->getRoutingKey());
BOOST_REQUIRE_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
BOOST_REQUIRE_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- BOOST_REQUIRE(StringValue("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
+ BOOST_REQUIRE(Str16Value("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
BOOST_REQUIRE_EQUAL((u_int64_t) 14, msg->contentSize());
DummyHandler handler;
@@ -347,7 +347,7 @@
BOOST_REQUIRE_EQUAL(routingKey, msg->getRoutingKey());
BOOST_REQUIRE_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
BOOST_REQUIRE_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- BOOST_REQUIRE(StringValue("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
+ BOOST_REQUIRE(Str16Value("xyz") == *msg->getProperties<MessageProperties>()->getApplicationHeaders().get("abc"));
BOOST_REQUIRE_EQUAL((u_int64_t) (data1.size() + data2.size()), msg->getFrames().getHeaders()->getContentLength());
BOOST_REQUIRE_EQUAL((u_int64_t) 0, msg->contentSize());//ensure it is being lazily loaded
16 years, 8 months
rhmessaging commits: r1961 - in store/trunk/cpp/tests: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2008-04-22 18:09:25 -0400 (Tue, 22 Apr 2008)
New Revision: 1961
Modified:
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/jrnl/Makefile.am
Log:
Fix -I locations for VPATH builds.
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2008-04-22 21:55:26 UTC (rev 1960)
+++ store/trunk/cpp/tests/Makefile.am 2008-04-22 22:09:25 UTC (rev 1961)
@@ -4,7 +4,7 @@
AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS) \
$(CPPUNIT_CXXFLAGS) -DBOOST_TEST_DYN_LINK
-INCLUDES=-I../lib -I../lib/gen
+INCLUDES=-I$(top_srcdir)/lib -I$(top_srcdir)/lib/gen
SUBDIRS = . jrnl
Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am 2008-04-22 21:55:26 UTC (rev 1960)
+++ store/trunk/cpp/tests/jrnl/Makefile.am 2008-04-22 22:09:25 UTC (rev 1961)
@@ -23,7 +23,7 @@
AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(QPID_CXXFLAGS) -pthread -DBOOST_TEST_DYN_LINK
-INCLUDES=-I../../lib
+INCLUDES=-I$(top_srcdir)/lib
SUBDIRS = jtt .
16 years, 8 months
rhmessaging commits: r1960 - store/trunk/cpp/tests/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2008-04-22 17:55:26 -0400 (Tue, 22 Apr 2008)
New Revision: 1960
Modified:
store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp
Log:
tests/jrnl/_ut_enq_map.cpp: reduced loop count on stress test.
Modified: store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp 2008-04-22 21:45:29 UTC (rev 1959)
+++ store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp 2008-04-22 21:55:26 UTC (rev 1960)
@@ -305,7 +305,7 @@
u_int64_t rid;
u_int64_t rid_cnt;
u_int64_t rid_begin = 0xffffffff00000000ULL;
- u_int64_t num_rid = 0x800000ULL;
+ u_int64_t num_rid = 10;
enq_map e7;
16 years, 8 months
rhmessaging commits: r1959 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2008-04-22 17:45:29 -0400 (Tue, 22 Apr 2008)
New Revision: 1959
Modified:
store/trunk/cpp/tests/Makefile.am
Log:
Fix packaging errors.
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2008-04-22 21:10:46 UTC (rev 1958)
+++ store/trunk/cpp/tests/Makefile.am 2008-04-22 21:45:29 UTC (rev 1959)
@@ -38,7 +38,7 @@
> $@-t
mv $@-t $@
-EXTRA_DIST = clean.sh system_test.sh persistence.py MessageUtils.h setup .vg-supp
+EXTRA_DIST = clean.sh system_test.sh persistence.py MessageUtils.h run_test vg_check .valgrindrc .valgrind.supp
TESTS_ENVIRONMENT = \
QPID_DIR=$(QPID_DIR) \
16 years, 8 months