[rhmessaging-commits] rhmessaging commits: r1667 - in mgmt: cumin/python/cumin and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Sat Feb 9 13:06:18 EST 2008
Author: justi9
Date: 2008-02-09 13:06:18 -0500 (Sat, 09 Feb 2008)
New Revision: 1667
Added:
mgmt/cumin/python/cumin/quirk.py
Modified:
mgmt/bin/quirk
Log:
Moves the wrapper classes from quirk to cumin/quirk.py so I can use
them as library code.
Changes those classes to support static broker object configuration on
the client side.
Updates bin/quirk to the new interfaces.
Modified: mgmt/bin/quirk
===================================================================
--- mgmt/bin/quirk 2008-02-08 18:12:41 UTC (rev 1666)
+++ mgmt/bin/quirk 2008-02-09 18:06:18 UTC (rev 1667)
@@ -4,141 +4,34 @@
from time import sleep
from random import random, sample, randint
-class Exchange(object):
- def __init__(self, session, name):
- self.session = session
- self.name = name
- self.type = "direct"
+from cumin.quirk import *
- def declare(self):
- self.session.psession.exchange_declare(exchange=self.name,
- type=self.type)
-
- def exists(self):
- return False #XXX figure out how to do this
-
-class Queue(object):
- def __init__(self, session, name):
- self.session = session
- self.name = name
-
- def declare(self):
- self.session.psession.queue_declare(queue=self.name)
-
- def exists(self):
- return False #XXX figure out how to do this
-
- def bind(self, exchange, binding_key=None):
- if binding_key is None:
- binding_key = self.name
-
- self.session.psession.queue_bind(exchange=exchange.name,
- queue=self.name,
- routing_key=binding_key)
-
-class Subscription(object):
- def __init__(self, session, queue):
- self.session = session
- self.name = queue.name # XXX bad?
- self.queue = queue
-
- session.psession.message_subscribe(queue=queue.name,
- destination=self.name)
-
- session.psession.message_flow(self.name, 0, 0xFFFFFFFF)
- session.psession.message_flow(self.name, 1, 0xFFFFFFFF)
-
- self.client_queue = session.client.pclient.queue(self.name)
-
- def get(self):
- m = self.session.message()
- m.content = self.client_queue.get(timeout=10).content
- return m
-
-class Message(object):
- def __init__(self, session):
- self.session = session
-
- def set(self, payload):
- self.content = qpid.content.Content(payload)
- self.content["content_type"] = "text/plain"
-
- def send(self, dest, routing_key=None):
- if dest.__class__ is Queue:
- self.content["routing_key"] = dest.name
- self.session.psession.message_transfer(destination="",
- content=self.content)
- elif dest.__class__ is Exchange:
- if routing_key is None:
- raise Exception("Routing key not set")
-
- self.session.psession.message_transfer(destination=dest.name,
- content=self.content)
- else:
- raise Exception("Unknown destination object")
-
- def __str__(self):
- return self.content.body
-
-class Client(object):
- def __init__(self, host, port):
- self.pclient = qpid.client.Client(host, port)
-
- def session(self):
- return Session(self)
-
- def login(self, user, password):
- self.pclient.start({"LOGIN": user, "PASSWORD": password})
-
-class Session(object):
- def __init__(self, client):
- self.client = client
- self.psession = client.pclient.session()
-
- def open(self):
- self.psession.open()
-
- def close(self):
- self.psession.close()
-
- def exchange(self, name):
- return Exchange(self, name)
-
- def queue(self, name):
- return Queue(self, name)
-
- def subscribe(self, queue):
- return Subscription(self, queue)
-
- def message(self):
- return Message(self)
-
class TestCommand(object):
def __init__(self, name):
self.name = name
def run(self, client):
- session = client.session()
+ session = Session(client)
session.open()
try:
- q = session.queue("quirk.test")
+ q = Queue("quirk.test")
- if not q.exists():
- q.declare()
+ if not q.exists(session):
+ q.declare(session)
- s = session.subscribe(q)
+ s = Subscription(q)
+ s.init(session)
for i in range(0, 10):
print i,
- m = session.message()
- m.set("message %i" % i)
- m.send(q)
+ m = Message("message %i" % i)
+ m.send(session, q)
print "Sent", m
for i in range(0, 10):
print i,
- m = s.get()
+ m = s.get(session)
print "Received", m
finally:
session.close()
@@ -148,23 +41,22 @@
self.name = name
def run(self, client):
- session = client.session()
+ session = Session(client)
session.open()
try:
- q = session.queue("quirk.bench")
+ q = Queue("quirk.bench")
- if not q.exists():
- q.declare()
+ if not q.exists(session):
+ q.declare(session)
- s = session.subscribe(q)
+ s = Subscription(q)
i = 0
while True:
- m = session.message()
- m.set(str(i))
- m.send(q)
+ m = Message(str(i))
+ m.send(session, q)
if i % 1000 == 0:
print ".",
@@ -178,7 +70,7 @@
self.name = name
def run(self, client):
- session = client.session()
+ session = Session(client)
session.open()
qs = list()
@@ -188,13 +80,13 @@
for i in range(0, 30):
name = "demo%02i" % (i + 1)
- q = session.queue(name)
- q.declare()
+ q = Queue(name)
+ q.declare(session)
qs.append(q)
- e = session.exchange(name)
- e.declare()
+ e = Exchange(name)
+ e.declare(session)
es.append(e)
@@ -202,9 +94,8 @@
sqs = sample(qs, 5)
for q in sqs:
- m = session.message()
- m.set("test")
- m.send(q)
+ m = Message("test")
+ m.send(session, q)
sleep(1)
Added: mgmt/cumin/python/cumin/quirk.py
===================================================================
--- mgmt/cumin/python/cumin/quirk.py (rev 0)
+++ mgmt/cumin/python/cumin/quirk.py 2008-02-09 18:06:18 UTC (rev 1667)
@@ -0,0 +1,104 @@
+import sys, qpid
+from time import sleep
+from random import random, sample, randint
+
+class Exchange(object):
+ def __init__(self, name, type="direct"):
+ self.name = name
+ self.type = type
+
+ def declare(self, session):
+ session.psession.exchange_declare(exchange=self.name, type=self.type)
+
+ def exists(self, session):
+ return False #XXX figure out how to do this
+
+class Queue(object):
+ def __init__(self, name):
+ self.name = name
+
+ def declare(self, session):
+ session.psession.queue_declare(queue=self.name)
+
+ def exists(self, session):
+ return False #XXX figure out how to do this
+
+ def bind(self, session, exchange, key=None):
+ if key is None:
+ key = self.name
+
+ session.psession.queue_bind(exchange=exchange.name,
+ queue=self.name,
+ routing_key=key)
+
+class Subscription(object):
+ def __init__(self, queue):
+ self.name = queue.name # XXX bad?
+ self.queue = queue
+ self.client_queue = None
+
+ def init(self, session):
+ session.psession.message_subscribe(queue=self.queue.name,
+ destination=self.name)
+
+ session.psession.message_flow(self.name, 0, 0xFFFFFFFF)
+ session.psession.message_flow(self.name, 1, 0xFFFFFFFF)
+
+ self.client_queue = session.client.pclient.queue(self.name)
+
+ def get(self, session):
+ if self.client_queue is None:
+ raise Exception()
+
+ m = Message()
+ m.content = self.client_queue.get(timeout=10).content
+
+ return m
+
+class Message(object):
+ def __init__(self, payload=None):
+ self.content = qpid.content.Content()
+ self.content["content_type"] = "text/plain"
+
+ if payload is not None:
+ self.set_payload(payload)
+
+ def set_payload(self, payload):
+ self.content.body = payload
+
+ def send(self, session, dest, key=None):
+ if dest.__class__ is Queue:
+ self.content["routing_key"] = dest.name
+ session.psession.message_transfer(destination="",
+ content=self.content)
+ elif dest.__class__ is Exchange:
+ if routing_key is None:
+ raise Exception("Routing key not set")
+
+ session.psession.message_transfer(destination=dest.name,
+ content=self.content)
+ else:
+ raise Exception("Unknown destination object")
+
+ def __str__(self):
+ return self.content.body
+
+class Client(object):
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.pclient = qpid.client.Client(host, port)
+
+ def login(self, user, password):
+ self.pclient.start({"LOGIN": user, "PASSWORD": password})
+
+class Session(object):
+ def __init__(self, client):
+ self.client = client
+ self.psession = client.pclient.session()
+
+ def open(self):
+ self.psession.open()
+
+ def close(self):
+ self.psession.close()
More information about the rhmessaging-commits
mailing list