[rhmessaging-commits] rhmessaging commits: r1381 - mgmt/bin.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Nov 28 12:08:54 EST 2007


Author: justi9
Date: 2007-11-28 12:08:54 -0500 (Wed, 28 Nov 2007)
New Revision: 1381

Modified:
   mgmt/bin/quirk
Log:
Updates the example client with a simpler api concept.



Modified: mgmt/bin/quirk
===================================================================
--- mgmt/bin/quirk	2007-11-27 23:52:36 UTC (rev 1380)
+++ mgmt/bin/quirk	2007-11-28 17:08:54 UTC (rev 1381)
@@ -5,76 +5,164 @@
 from qpid.content import Content
 
 class Exchange(object):
-    def __init__(self, name):
+    def __init__(self, session, name):
+        self.session = session
         self.name = name
 
-    def declare(self, session):
-        session.exchange_declare(exchange=self.name)
-        session.message_flow("amq.direct", 0, 0xFFFFFFFF)
-        session.message_flow("amq.direct", 1, 0xFFFFFFFF)
-
 class Queue(object):
-    def __init__(self, name):
+    def __init__(self, session, name):
+        self.session = session
         self.name = name
 
-    def declare(self, session):
-        session.queue_declare(queue=self.name) #XXX blows up without queue=
+class Subscription(object):
+    def __init__(self, session, queue, name):
+        self.session = session
+        self.queue = queue
+        self.name = name
 
-    def bind(self, session, exchange, binding_key=None):
-        if binding_key is None:
-            binding_key = self.name
+        # XXX what all does this do?  it seems to declare things
 
-        session.queue_bind(exchange=exchange.name, queue=self.name,
-                           routing_key=binding_key)
+        # XXX what is the destination arg for?
+        # XXX from reading the spec, "destination" seems less
+        # appropriate than "subscription name" (which is what the spec
+        # ch. 25 docs say it is)
 
+        session.csession.message_subscribe(queue="test", destination=self.name)
+
+        session.csession.message_flow(self.name, 0, 0xFFFFFFFF)
+        session.csession.message_flow(self.name, 1, 0xFFFFFFFF)
+
+        self.client_queue = session.client.queue(self.name)
+
+    def get(self):
+        m = Message()
+        m.content = self.client_queue.get(timeout=10).content
+        return m
+
 class Message(object):
     def __init__(self, body=""):
         self.content = Content(body)
         self.content["content_type"] = "text/plain"
 
-    def set_content_type(self, type):
-        self.content["content_type"] = type
-
     def set_routing_key(self, key):
-        self.content["routing_key"] = key # XXX why here and not an arg to message_transfer?
+        self.content["routing_key"] = key
 
-    def send(self, session, exchange):
-        session.message_transfer(destination=exchange.name, content=self.content)
+    def get_routing_key(self):
+        try:
+            return self.content["routing_key"]
+        except KeyError:
+            pass
 
-def main(host, port):
+    def __str__(self):
+        return self.content.body
+
+class Session(object):
+    def __init__(self, client, csession):
+        self.client = client
+        self.csession = csession
+
+    def open(self):
+        self.csession.open()
+
+    def close(self):
+        self.csession.close()
+
+    def declare(self, object):
+        if object.__class__ is Queue:
+            #XXX blows up without queue=
+            self.csession.queue_declare(queue=object.name)
+        elif object.__class__ is Exchange:
+            self.csession.exchange_declare(exchange=object.name)
+        else:
+            raise Exception()
+
+    def bind(self, queue, exchange, binding_key=None):
+        if binding_key is None:
+            binding_key = queue.name
+
+        self.csession.queue_bind(exchange=exchange.name,
+                                 queue=queue.name,
+                                 routing_key=binding_key)
+
+    def publish(self, message, object):
+        if object.__class__ is Exchange:
+            self.csession.message_transfer(destination=object.name,
+                                           content=message.content)
+        elif object.__class__ is Queue:
+            # XXX maybe this shouldn't be conditional
+            if message.get_routing_key() is None:
+                message.set_routing_key(object.name)
+
+            self.csession.message_transfer(destination="",
+                                           content=message.content)
+
+def direct_with_explicit_exchange(host, port):
     client = Client(host, port)
     client.start({"LOGIN": "guest", "PASSWORD": "guest"})
 
-    session = client.session()
+    session = Session(client, client.session())
     session.open()
 
-    # XXX what all does this do?  it seems to declare things
-    session.message_subscribe(queue="test", destination="amq.direct") # XXX what is the destination arg for?
+    try:
+        q = Queue(session, "test")
+        e = Exchange(session, "amq.direct")
+        s = Subscription(session, q, "s")
 
-    exchange = Exchange("amq.direct")
-    exchange.declare(session)
+        session.declare(q)
+        session.bind(q, e)
 
-    queue = Queue("test")
-    queue.declare(session)
-    queue.bind(session, exchange)
+        for i in range(0, 10):
+            print i,
 
-    # XXX this can't go here, because flow stuff barfs
-    #session.message_subscribe(queue="test", destination="amq.direct") # XXX what is the destination arg for?
+            m = Message("Test message " + str(i))
 
-    for n in range(0, 10):
-        print n, 
-        message = Message("Test message " + str(n))
-        message.set_routing_key(queue.name)
-        message.send(session, exchange)
+            # XXX make this an arg publish, instead?
+            m.set_routing_key(q.name)
 
-    print
+            session.publish(m, e)
 
-    q = client.queue("amq.direct") # XXX huh?
-    msg = q.get(timeout=10)
+            print "."
 
+        for i in range(0, 10):
+            print i,
+
+            m = s.get()
+
+            print m
+    finally:
+        session.close()
+
+def direct_with_implicit_exchange(host, port):
+    client = Client(host, port)
+    client.start({"LOGIN": "guest", "PASSWORD": "guest"})
+
+    # Now, simpler, using the default exchange:
+
+    session = Session(client, client.session())
+    session.open()
+
+    try:
+        q = Queue(session, "test")
+        s = Subscription(session, q, "s")
+
+        session.declare(q)
+
+        for i in range(0, 10):
+            print i,
+            m = Message("m%i" % i)
+            session.publish(m, q)
+            print "Sent", m
+
+        for i in range(0, 10):
+            print i,
+            m = s.get()
+            print "Received", m
+    finally:
+        session.close()
+
 if __name__ == "__main__":
     if len(sys.argv) != 2:
-        print "Usage: qbench IP:PORT"
+        print "Usage: quirk IP:PORT"
         sys.exit(2)
 
     addr = sys.argv[1].split(":")
@@ -84,22 +172,4 @@
     else:
         host, port = (addr[0], 5672)
 
-    main(host, port)
-
-"""
-# XXX what do these do?
-session.message_subscribe(queue="test", destination="amq.direct") # esp what is the destination arg for?
-session.message_flow("amq.direct", 0, 0xFFFFFFFF)
-session.message_flow("amq.direct", 1, 0xFFFFFFFF)
-
-for n in range(0, 100):
-    msg = Content("hello world " + str(n))
-    msg["content_type"] = "text/plain"
-    msg["reply_to"] = client.structs.reply_to("asdf", "fdsa") # XXX structs.reply_to garbage ?
-    msg["application_headers"] = {"x": 1, "y": 2, "z": "zee"}
-    session.message_transfer(destination="amq.direct", content=msg)
-    q = client.queue("amq.direct") #XXX huh?
-    msg = q.get(timeout=10)
-
-session.close()
-"""
+    direct_with_implicit_exchange(host, port)




More information about the rhmessaging-commits mailing list