Author: justi9
Date: 2007-11-27 18:52:36 -0500 (Tue, 27 Nov 2007)
New Revision: 1380
Added:
mgmt/bin/quirk
Log:
A preliminary broker client, mostly so I can ask Rafi questions about
the API.
Added: mgmt/bin/quirk
===================================================================
--- mgmt/bin/quirk (rev 0)
+++ mgmt/bin/quirk 2007-11-27 23:52:36 UTC (rev 1380)
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+import sys, qpid
+from qpid.client import Client
+from qpid.content import Content
+
+class Exchange(object):
+ def __init__(self, name):
+ self.name = name
+
+ def declare(self, session):
+ session.exchange_declare(exchange=self.name)
+ session.message_flow("amq.direct", 0, 0xFFFFFFFF)
+ session.message_flow("amq.direct", 1, 0xFFFFFFFF)
+
+class Queue(object):
+ def __init__(self, name):
+ self.name = name
+
+ def declare(self, session):
+ session.queue_declare(queue=self.name) #XXX blows up without queue=
+
+ def bind(self, session, exchange, binding_key=None):
+ if binding_key is None:
+ binding_key = self.name
+
+ session.queue_bind(exchange=exchange.name, queue=self.name,
+ routing_key=binding_key)
+
+class Message(object):
+ def __init__(self, body=""):
+ self.content = Content(body)
+ self.content["content_type"] = "text/plain"
+
+ def set_content_type(self, type):
+ self.content["content_type"] = type
+
+ def set_routing_key(self, key):
+ self.content["routing_key"] = key # XXX why here and not an arg to
message_transfer?
+
+ def send(self, session, exchange):
+ session.message_transfer(destination=exchange.name, content=self.content)
+
+def main(host, port):
+ client = Client(host, port)
+ client.start({"LOGIN": "guest", "PASSWORD":
"guest"})
+
+ session = client.session()
+ session.open()
+
+ # XXX what all does this do? it seems to declare things
+ session.message_subscribe(queue="test", destination="amq.direct")
# XXX what is the destination arg for?
+
+ exchange = Exchange("amq.direct")
+ exchange.declare(session)
+
+ queue = Queue("test")
+ queue.declare(session)
+ queue.bind(session, exchange)
+
+ # XXX this can't go here, because flow stuff barfs
+ #session.message_subscribe(queue="test",
destination="amq.direct") # XXX what is the destination arg for?
+
+ for n in range(0, 10):
+ print n,
+ message = Message("Test message " + str(n))
+ message.set_routing_key(queue.name)
+ message.send(session, exchange)
+
+ print
+
+ q = client.queue("amq.direct") # XXX huh?
+ msg = q.get(timeout=10)
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print "Usage: qbench IP:PORT"
+ sys.exit(2)
+
+ addr = sys.argv[1].split(":")
+
+ if len(addr) > 1:
+ host, port = (addr[0], int(addr[1]))
+ else:
+ host, port = (addr[0], 5672)
+
+ main(host, port)
+
+"""
+# XXX what do these do?
+session.message_subscribe(queue="test", destination="amq.direct") #
esp what is the destination arg for?
+session.message_flow("amq.direct", 0, 0xFFFFFFFF)
+session.message_flow("amq.direct", 1, 0xFFFFFFFF)
+
+for n in range(0, 100):
+ msg = Content("hello world " + str(n))
+ msg["content_type"] = "text/plain"
+ msg["reply_to"] = client.structs.reply_to("asdf",
"fdsa") # XXX structs.reply_to garbage ?
+ msg["application_headers"] = {"x": 1, "y": 2,
"z": "zee"}
+ session.message_transfer(destination="amq.direct", content=msg)
+ q = client.queue("amq.direct") #XXX huh?
+ msg = q.get(timeout=10)
+
+session.close()
+"""
Property changes on: mgmt/bin/quirk
___________________________________________________________________
Name: svn:executable
+ *