Author: justi9
Date: 2007-11-28 12:08:54 -0500 (Wed, 28 Nov 2007)
New Revision: 1381
Modified:
mgmt/bin/quirk
Log:
Updates the example client with a simpler api concept.
Modified: mgmt/bin/quirk
===================================================================
--- mgmt/bin/quirk 2007-11-27 23:52:36 UTC (rev 1380)
+++ mgmt/bin/quirk 2007-11-28 17:08:54 UTC (rev 1381)
@@ -5,76 +5,164 @@
from qpid.content import Content
class Exchange(object):
- def __init__(self, name):
+ def __init__(self, session, name):
+ self.session = session
self.name = name
- def declare(self, session):
- session.exchange_declare(exchange=self.name)
- session.message_flow("amq.direct", 0, 0xFFFFFFFF)
- session.message_flow("amq.direct", 1, 0xFFFFFFFF)
-
class Queue(object):
- def __init__(self, name):
+ def __init__(self, session, name):
+ self.session = session
self.name = name
- def declare(self, session):
- session.queue_declare(queue=self.name) #XXX blows up without queue=
+class Subscription(object):
+ def __init__(self, session, queue, name):
+ self.session = session
+ self.queue = queue
+ self.name = name
- def bind(self, session, exchange, binding_key=None):
- if binding_key is None:
- binding_key = self.name
+ # XXX what all does this do? it seems to declare things
- session.queue_bind(exchange=exchange.name, queue=self.name,
- routing_key=binding_key)
+ # XXX what is the destination arg for?
+ # XXX from reading the spec, "destination" seems less
+ # appropriate than "subscription name" (which is what the spec
+ # ch. 25 docs say it is)
+ session.csession.message_subscribe(queue="test",
destination=self.name)
+
+ session.csession.message_flow(self.name, 0, 0xFFFFFFFF)
+ session.csession.message_flow(self.name, 1, 0xFFFFFFFF)
+
+ self.client_queue = session.client.queue(self.name)
+
+ def get(self):
+ m = Message()
+ m.content = self.client_queue.get(timeout=10).content
+ return m
+
class Message(object):
def __init__(self, body=""):
self.content = Content(body)
self.content["content_type"] = "text/plain"
- def set_content_type(self, type):
- self.content["content_type"] = type
-
def set_routing_key(self, key):
- self.content["routing_key"] = key # XXX why here and not an arg to
message_transfer?
+ self.content["routing_key"] = key
- def send(self, session, exchange):
- session.message_transfer(destination=exchange.name, content=self.content)
+ def get_routing_key(self):
+ try:
+ return self.content["routing_key"]
+ except KeyError:
+ pass
-def main(host, port):
+ def __str__(self):
+ return self.content.body
+
+class Session(object):
+ def __init__(self, client, csession):
+ self.client = client
+ self.csession = csession
+
+ def open(self):
+ self.csession.open()
+
+ def close(self):
+ self.csession.close()
+
+ def declare(self, object):
+ if object.__class__ is Queue:
+ #XXX blows up without queue=
+ self.csession.queue_declare(queue=object.name)
+ elif object.__class__ is Exchange:
+ self.csession.exchange_declare(exchange=object.name)
+ else:
+ raise Exception()
+
+ def bind(self, queue, exchange, binding_key=None):
+ if binding_key is None:
+ binding_key = queue.name
+
+ self.csession.queue_bind(exchange=exchange.name,
+ queue=queue.name,
+ routing_key=binding_key)
+
+ def publish(self, message, object):
+ if object.__class__ is Exchange:
+ self.csession.message_transfer(destination=object.name,
+ content=message.content)
+ elif object.__class__ is Queue:
+ # XXX maybe this shouldn't be conditional
+ if message.get_routing_key() is None:
+ message.set_routing_key(object.name)
+
+ self.csession.message_transfer(destination="",
+ content=message.content)
+
+def direct_with_explicit_exchange(host, port):
client = Client(host, port)
client.start({"LOGIN": "guest", "PASSWORD":
"guest"})
- session = client.session()
+ session = Session(client, client.session())
session.open()
- # XXX what all does this do? it seems to declare things
- session.message_subscribe(queue="test", destination="amq.direct")
# XXX what is the destination arg for?
+ try:
+ q = Queue(session, "test")
+ e = Exchange(session, "amq.direct")
+ s = Subscription(session, q, "s")
- exchange = Exchange("amq.direct")
- exchange.declare(session)
+ session.declare(q)
+ session.bind(q, e)
- queue = Queue("test")
- queue.declare(session)
- queue.bind(session, exchange)
+ for i in range(0, 10):
+ print i,
- # XXX this can't go here, because flow stuff barfs
- #session.message_subscribe(queue="test",
destination="amq.direct") # XXX what is the destination arg for?
+ m = Message("Test message " + str(i))
- for n in range(0, 10):
- print n,
- message = Message("Test message " + str(n))
- message.set_routing_key(queue.name)
- message.send(session, exchange)
+ # XXX make this an arg publish, instead?
+ m.set_routing_key(q.name)
- print
+ session.publish(m, e)
- q = client.queue("amq.direct") # XXX huh?
- msg = q.get(timeout=10)
+ print "."
+ for i in range(0, 10):
+ print i,
+
+ m = s.get()
+
+ print m
+ finally:
+ session.close()
+
+def direct_with_implicit_exchange(host, port):
+ client = Client(host, port)
+ client.start({"LOGIN": "guest", "PASSWORD":
"guest"})
+
+ # Now, simpler, using the default exchange:
+
+ session = Session(client, client.session())
+ session.open()
+
+ try:
+ q = Queue(session, "test")
+ s = Subscription(session, q, "s")
+
+ session.declare(q)
+
+ for i in range(0, 10):
+ print i,
+ m = Message("m%i" % i)
+ session.publish(m, q)
+ print "Sent", m
+
+ for i in range(0, 10):
+ print i,
+ m = s.get()
+ print "Received", m
+ finally:
+ session.close()
+
if __name__ == "__main__":
if len(sys.argv) != 2:
- print "Usage: qbench IP:PORT"
+ print "Usage: quirk IP:PORT"
sys.exit(2)
addr = sys.argv[1].split(":")
@@ -84,22 +172,4 @@
else:
host, port = (addr[0], 5672)
- main(host, port)
-
-"""
-# XXX what do these do?
-session.message_subscribe(queue="test", destination="amq.direct") #
esp what is the destination arg for?
-session.message_flow("amq.direct", 0, 0xFFFFFFFF)
-session.message_flow("amq.direct", 1, 0xFFFFFFFF)
-
-for n in range(0, 100):
- msg = Content("hello world " + str(n))
- msg["content_type"] = "text/plain"
- msg["reply_to"] = client.structs.reply_to("asdf",
"fdsa") # XXX structs.reply_to garbage ?
- msg["application_headers"] = {"x": 1, "y": 2,
"z": "zee"}
- session.message_transfer(destination="amq.direct", content=msg)
- q = client.queue("amq.direct") #XXX huh?
- msg = q.get(timeout=10)
-
-session.close()
-"""
+ direct_with_implicit_exchange(host, port)