[rhmessaging-commits] rhmessaging commits: r1754 - in mgmt: mint/python/mint and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Mar 3 16:43:05 EST 2008
Author: justi9
Date: 2008-03-03 16:43:05 -0500 (Mon, 03 Mar 2008)
New Revision: 1754
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/model.py
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/schema.py
mgmt/mint/python/mint/schemaparser.py
Log:
Repairs mint method calls, so they now work with the new managment
api. Eliminates the brokerId argument to method calls on mint
classes, since that bit information is already present on the object.
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/cumin/python/cumin/__init__.py 2008-03-03 21:43:05 UTC (rev 1754)
@@ -78,7 +78,7 @@
while True:
for reg in BrokerRegistration.select():
if reg.broker is None or reg.broker.managedBroker not in \
- self.model.data.connectedBrokers:
+ self.model.data.connections:
attempts = self.attempts.get(reg, 0)
attempts += 1
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/cumin/python/cumin/model.py 2008-03-03 21:43:05 UTC (rev 1754)
@@ -529,7 +529,7 @@
return "Purge"
def do_invoke(self, object, args, completion):
- object.purge(self.model.data, object.managedBroker, completion)
+ object.purge(self.model.data, completion)
class CuminExchange(CuminClass):
def __init__(self, model):
@@ -634,7 +634,7 @@
return "Close"
def do_invoke(self, object, args, completion):
- object.close(self.model.data, object.managedBroker, completion)
+ object.close(self.model.data, completion)
class CuminSession(CuminClass):
def __init__(self, model):
@@ -668,30 +668,28 @@
return "Close"
def do_invoke(self, object, args, completion):
- object.close(self.model.data, object.managedBroker, completion)
+ object.close(self.model.data, completion)
class Detach(CuminAction):
def get_title(self, session):
return "Detach"
def do_invoke(self, object, args, completion):
- object.detach(self.model.data, object.managedBroker, completion)
+ object.detach(self.model.data, completion)
class ResetLifespan(CuminAction):
def get_title(self, session):
return "Reset Lifespan"
def do_invoke(self, object, args, completion):
- object.resetLifespan(self.model.data, object.managedBroker,
- completion)
+ object.resetLifespan(self.model.data, completion)
class SolicitAck(CuminAction):
def get_title(self, session):
return "Solicit Acknowledgment"
def do_invoke(self, object, args, completion):
- object.solicitAck(self.model.data, object.managedBroker,
- completion)
+ object.solicitAck(self.model.data, completion)
class CuminBrokerRegistration(CuminClass):
def __init__(self, model):
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/mint/python/mint/__init__.py 2008-03-03 21:43:05 UTC (rev 1754)
@@ -151,6 +151,7 @@
def __init__(self, model, host, port):
self.model = model
self.key = "%s:%i" % (host, port)
+ self.objs = OriginalIdDict()
spec = qpid.spec.load(model.specPath)
self.client = qpid.client.Client(host, port, spec)
@@ -158,15 +159,11 @@
self.model.configCallback,
self.model.instCallback,
self.model.methodCallback)
- #self.mclient.schemaListener(self.model.schemaCallback)
+ self.mclient.schemaListener(self.model.schemaCallback)
self.state = None # in (None, "opening", "opened", "closing", "closed")
self.exception = None
- #self.broker.configListener(self.key, self.model.configCallback)
- #self.broker.instrumentationListener(self.key, self.model.instCallback)
- #self.broker.methodListener(self.key, self.model.methodCallback)
-
def isOpen(self):
return self.state == "opened"
@@ -179,7 +176,7 @@
# XXX I want this to happen after broker start, but the
# callbacks rely on the broker being in the connectedBrokers
# dict
- self.model.connectedBrokers[self.key] = ConnectedBroker()
+ self.model.connections[self.key] = self
self.client.start({})
self.mchan = managementChannel(self.client.channel(1),
@@ -189,8 +186,6 @@
self.mclient.addChannel(self.mchan)
- #self.model.connections[self.key] = self
-
self.state = "opened"
except Exception, e:
self.exception = e
@@ -198,6 +193,18 @@
finally:
self.model.lock.release()
+ def callMethod(self, objId, className, methodName, callback, args):
+ self.model.lock.acquire()
+ try:
+ self.model.currentMethodId += 1
+ seq = self.model.currentMethodId
+ self.model.outstandingMethodCalls[seq] = callback
+ finally:
+ self.model.lock.release()
+
+ self.mclient.callMethod(self.mchan, seq, objId,
+ className, methodName, args)
+
def close(self):
self.state = "closing"
@@ -206,8 +213,8 @@
try:
self.mclient.removeChannel(self.mchan)
- #del self.model.connections[self.key]
- del self.model.connectedBrokers[self.key]
+ del self.model.connections[self.key]
+
self.state = "closed"
except Exception, e:
self.exception = e
@@ -215,17 +222,17 @@
finally:
self.model.lock.release()
-class ConnectedBroker:
- def __init__(self):
- self.objs = OriginalIdDict()
-
def getByOriginalId(self, objType, idOriginal, create=False, args={}):
return self.objs.getByOriginalId(objType, idOriginal, create, args)
- def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent, create=False, args={}):
- return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
+ def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
+ create=False, args={}):
+ return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue,
+ create, args)
class MintModel:
+ staticInstance = None
+
def __init__(self, dataUri, specPath, debug=False):
self.dataUri = dataUri
self.specPath = specPath
@@ -233,10 +240,13 @@
self.currentMethodId = 1
self.outstandingMethodCalls = dict()
- self.connectedBrokers = dict()
+ self.connections = dict()
self.lock = Lock()
+ assert MintModel.staticInstance is None
+ MintModel.staticInstance = self
+
def check(self):
try:
connectionForURI(self.dataUri)
@@ -275,20 +285,27 @@
keys.append(key)
return keys
- def configCallback(self, broker, classInfo, list, timestamps):
+ def schemaCallback(self, brokerId, classInfo,
+ configs, metric, methods, events):
+ cls = schema.schemaNameToClassMap.get(classInfo[1])
+
+ if cls:
+ cls.classInfos[brokerId] = classInfo
+
+ def configCallback(self, brokerId, classInfo, list, timestamps):
self.log("\nCONFIG---------------------------------------------------")
objectName = classInfo[1]
self.log(objectName)
d = self.sanitizeDict(dict(list))
- connectedBroker = self.connectedBrokers[broker]
- d["managedBroker"] = broker
+ conn = self.connections[brokerId]
+ d["managedBroker"] = brokerId
d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
self.log(d)
for parentKey in self.findParentKeys(d):
convertedKey = self.convertRefKey(parentKey)
- d[convertedKey] = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[convertedKey], d.pop(parentKey))
- obj = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], create=True)
+ d[convertedKey] = conn.getByOriginalId(schema.schemaNameToClassMap[convertedKey], d.pop(parentKey))
+ obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], create=True)
try:
obj.set(**d)
@@ -299,15 +316,15 @@
self.log("END CONFIG---------------------------------------------------\n")
return obj
- def instCallback(self, broker, classInfo, list, timestamps):
+ def instCallback(self, brokerId, classInfo, list, timestamps):
self.log("\nINST---------------------------------------------------")
objectName = classInfo[1]
self.log(objectName)
d = self.sanitizeDict(dict(list))
- connectedBroker = self.connectedBrokers[broker]
+ conn = self.connections[brokerId]
d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
self.log(d)
- obj = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[objectName], d[self.convertIdKey("id")])
+ obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d[self.convertIdKey("id")])
d[objectName] = obj
objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
objStats = objNameStats.__new__(objNameStats)
Modified: mgmt/mint/python/mint/schema.py
===================================================================
--- mgmt/mint/python/mint/schema.py 2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/mint/python/mint/schema.py 2008-03-03 21:43:05 UTC (rev 1754)
@@ -1,3 +1,4 @@
+import mint
from sqlobject import *
from datetime import datetime
@@ -14,6 +15,8 @@
statsPrev = ForeignKey('SystemStats', cascade='null', default=None)
sysId = StringCol(length=1000, default=None)
+ classInfos = dict() # brokerId => classInfo
+
class SystemStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -27,6 +30,8 @@
version = StringCol(length=1000, default=None)
machine = StringCol(length=1000, default=None)
+ classInfos = dict() # brokerId => classInfo
+
System.sqlmeta.addJoin(SQLMultipleJoin('SystemStats', joinMethodName='stats'))
@@ -53,36 +58,42 @@
dataDirEnabled = BoolCol(default=None)
dataDir = StringCol(length=1000, default=None)
- def joinCluster(self, model, managedBroker, callbackMethod, clusterName):
+ classInfos = dict() # brokerId => classInfo
+
+ def joinCluster(self, model, callback, clusterName):
actualArgs = dict()
actualArgs["clusterName"] = clusterName
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "joinCluster", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "joinCluster",
+ callback, args=actualArgs)
- def leaveCluster(self, model, managedBroker, callbackMethod):
+ def leaveCluster(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "leaveCluster", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "leaveCluster",
+ callback, args=actualArgs)
- def echo(self, model, managedBroker, callbackMethod, sequence, body):
+ def echo(self, model, callback, sequence, body):
"""Request a response to test the path to the management agent"""
actualArgs = dict()
actualArgs["sequence"] = sequence
actualArgs["body"] = body
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "echo", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "echo",
+ callback, args=actualArgs)
- def connect(self, model, managedBroker, callbackMethod, host, port):
+ def connect(self, model, callback, host, port):
"""Establish a connection to another broker"""
actualArgs = dict()
actualArgs["host"] = host
actualArgs["port"] = port
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "connect", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "connect",
+ callback, args=actualArgs)
System.sqlmeta.addJoin(SQLMultipleJoin('Broker', joinMethodName='brokers'))
@@ -95,6 +106,8 @@
recTime = TimestampCol(default=None)
broker = ForeignKey('Broker', cascade='null', default=None)
+ classInfos = dict() # brokerId => classInfo
+
Broker.sqlmeta.addJoin(SQLMultipleJoin('BrokerStats', joinMethodName='stats'))
@@ -112,6 +125,8 @@
broker = ForeignKey('Broker', cascade='null', default=None)
name = StringCol(length=1000, default=None)
+ classInfos = dict() # brokerId => classInfo
+
Broker.sqlmeta.addJoin(SQLMultipleJoin('Vhost', joinMethodName='vhosts'))
@@ -123,6 +138,8 @@
recTime = TimestampCol(default=None)
vhost = ForeignKey('Vhost', cascade='null', default=None)
+ classInfos = dict() # brokerId => classInfo
+
Vhost.sqlmeta.addJoin(SQLMultipleJoin('VhostStats', joinMethodName='stats'))
@@ -143,12 +160,15 @@
autoDelete = BoolCol(default=None)
exclusive = BoolCol(default=None)
- def purge(self, model, managedBroker, callbackMethod):
+ classInfos = dict() # brokerId => classInfo
+
+ def purge(self, model, callback):
"""Discard all messages on queue"""
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "purge", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "purge",
+ callback, args=actualArgs)
Vhost.sqlmeta.addJoin(SQLMultipleJoin('Queue', joinMethodName='queues'))
@@ -204,6 +224,8 @@
messageLatencyAverage = BigIntCol(default=None)
messageLatencySamples = BigIntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Queue.sqlmeta.addJoin(SQLMultipleJoin('QueueStats', joinMethodName='stats'))
@@ -222,6 +244,8 @@
name = StringCol(length=1000, default=None)
type = StringCol(length=1000, default=None)
+ classInfos = dict() # brokerId => classInfo
+
Vhost.sqlmeta.addJoin(SQLMultipleJoin('Exchange', joinMethodName='exchanges'))
@@ -245,6 +269,8 @@
byteDrops = BigIntCol(default=None)
byteRoutes = BigIntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Exchange.sqlmeta.addJoin(SQLMultipleJoin('ExchangeStats', joinMethodName='stats'))
@@ -263,6 +289,8 @@
queue = ForeignKey('Queue', cascade='null', default=None)
bindingKey = StringCol(length=1000, default=None)
+ classInfos = dict() # brokerId => classInfo
+
Exchange.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
Queue.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
@@ -277,6 +305,8 @@
binding = ForeignKey('Binding', cascade='null', default=None)
msgMatched = BigIntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Binding.sqlmeta.addJoin(SQLMultipleJoin('BindingStats', joinMethodName='stats'))
@@ -294,11 +324,14 @@
vhost = ForeignKey('Vhost', cascade='null', default=None)
address = StringCol(length=1000, default=None)
- def close(self, model, managedBroker, callbackMethod):
+ classInfos = dict() # brokerId => classInfo
+
+ def close(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "close",
+ callback, args=actualArgs)
Vhost.sqlmeta.addJoin(SQLMultipleJoin('Client', joinMethodName='clients'))
@@ -317,6 +350,8 @@
bytesFromClient = BigIntCol(default=None)
bytesToClient = BigIntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Client.sqlmeta.addJoin(SQLMultipleJoin('ClientStats', joinMethodName='stats'))
@@ -334,13 +369,16 @@
vhost = ForeignKey('Vhost', cascade='null', default=None)
address = StringCol(length=1000, default=None)
- def close(self, model, managedBroker, callbackMethod):
+ classInfos = dict() # brokerId => classInfo
+
+ def close(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "close",
+ callback, args=actualArgs)
- def bridge(self, model, managedBroker, callbackMethod, src, dest, key, src_is_queue, src_is_local):
+ def bridge(self, model, callback, src, dest, key, src_is_queue, src_is_local):
"""Bridge messages over the link"""
actualArgs = dict()
actualArgs["src"] = src
@@ -348,9 +386,10 @@
actualArgs["key"] = key
actualArgs["src_is_queue"] = src_is_queue
actualArgs["src_is_local"] = src_is_local
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "bridge", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "bridge",
+ callback, args=actualArgs)
Vhost.sqlmeta.addJoin(SQLMultipleJoin('Link', joinMethodName='links'))
@@ -369,6 +408,8 @@
bytesFromPeer = BigIntCol(default=None)
bytesToPeer = BigIntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Link.sqlmeta.addJoin(SQLMultipleJoin('LinkStats', joinMethodName='stats'))
@@ -391,11 +432,14 @@
srcIsQueue = BoolCol(default=None)
srcIsLocal = BoolCol(default=None)
- def close(self, model, managedBroker, callbackMethod):
+ classInfos = dict() # brokerId => classInfo
+
+ def close(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "close",
+ callback, args=actualArgs)
Link.sqlmeta.addJoin(SQLMultipleJoin('Bridge', joinMethodName='bridges'))
@@ -408,6 +452,8 @@
recTime = TimestampCol(default=None)
bridge = ForeignKey('Bridge', cascade='null', default=None)
+ classInfos = dict() # brokerId => classInfo
+
Bridge.sqlmeta.addJoin(SQLMultipleJoin('BridgeStats', joinMethodName='stats'))
@@ -428,29 +474,35 @@
client = ForeignKey('Client', cascade='null', default=None)
detachedLifespan = IntCol(default=None)
- def solicitAck(self, model, managedBroker, callbackMethod):
+ classInfos = dict() # brokerId => classInfo
+
+ def solicitAck(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "solicitAck", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "solicitAck",
+ callback, args=actualArgs)
- def detach(self, model, managedBroker, callbackMethod):
+ def detach(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "detach", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "detach",
+ callback, args=actualArgs)
- def resetLifespan(self, model, managedBroker, callbackMethod):
+ def resetLifespan(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "resetLifespan", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "resetLifespan",
+ callback, args=actualArgs)
- def close(self, model, managedBroker, callbackMethod):
+ def close(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "close",
+ callback, args=actualArgs)
Vhost.sqlmeta.addJoin(SQLMultipleJoin('Session', joinMethodName='sessions'))
@@ -468,6 +520,8 @@
expireTime = BigIntCol(default=None)
framesOutstanding = IntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Session.sqlmeta.addJoin(SQLMultipleJoin('SessionStats', joinMethodName='stats'))
@@ -485,25 +539,30 @@
session = ForeignKey('Session', cascade='null', default=None)
name = StringCol(length=1000, default=None)
- def throttle(self, model, managedBroker, callbackMethod, strength):
+ classInfos = dict() # brokerId => classInfo
+
+ def throttle(self, model, callback, strength):
"""Apply extra rate limiting to destination: 0 = Normal, 10 = Maximum"""
actualArgs = dict()
actualArgs["strength"] = strength
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "throttle", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "throttle",
+ callback, args=actualArgs)
- def stop(self, model, managedBroker, callbackMethod):
+ def stop(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "stop", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "stop",
+ callback, args=actualArgs)
- def start(self, model, managedBroker, callbackMethod):
+ def start(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "start", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "start",
+ callback, args=actualArgs)
Session.sqlmeta.addJoin(SQLMultipleJoin('Destination', joinMethodName='destinations'))
@@ -521,6 +580,8 @@
msgCredits = IntCol(default=None)
byteCredits = IntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Destination.sqlmeta.addJoin(SQLMultipleJoin('DestinationStats', joinMethodName='stats'))
@@ -538,6 +599,8 @@
destination = ForeignKey('Destination', cascade='null', default=None)
exchange = ForeignKey('Exchange', cascade='null', default=None)
+ classInfos = dict() # brokerId => classInfo
+
Destination.sqlmeta.addJoin(SQLMultipleJoin('Producer', joinMethodName='producers'))
Exchange.sqlmeta.addJoin(SQLMultipleJoin('Producer', joinMethodName='producers'))
@@ -553,6 +616,8 @@
msgsProduced = BigIntCol(default=None)
bytesProduced = BigIntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Producer.sqlmeta.addJoin(SQLMultipleJoin('ProducerStats', joinMethodName='stats'))
@@ -570,11 +635,14 @@
destination = ForeignKey('Destination', cascade='null', default=None)
queue = ForeignKey('Queue', cascade='null', default=None)
- def close(self, model, managedBroker, callbackMethod):
+ classInfos = dict() # brokerId => classInfo
+
+ def close(self, model, callback):
actualArgs = dict()
- methodId = model.registerCallback(callbackMethod)
- model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
- classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+ conn = model.connections[self.managedBroker]
+ classInfo = self.classInfos[self.managedBroker]
+ conn.callMethod(self.idOriginal, classInfo, "close",
+ callback, args=actualArgs)
Destination.sqlmeta.addJoin(SQLMultipleJoin('Consumer', joinMethodName='consumers'))
@@ -594,6 +662,8 @@
unackedMessagesLow = IntCol(default=None)
unackedMessagesHigh = IntCol(default=None)
+ classInfos = dict() # brokerId => classInfo
+
Consumer.sqlmeta.addJoin(SQLMultipleJoin('ConsumerStats', joinMethodName='stats'))
Modified: mgmt/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/mint/python/mint/schemaparser.py 2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/mint/python/mint/schemaparser.py 2008-03-03 21:43:05 UTC (rev 1754)
@@ -9,7 +9,8 @@
self.pythonFilePath = pythonFilePath
self.dsn = dsn
self.style = MixedCaseUnderscoreStyle()
- self.pythonOutput = "from sqlobject import *\n"
+ self.pythonOutput = "import mint\n"
+ self.pythonOutput += "from sqlobject import *\n"
self.pythonOutput += "from datetime import datetime\n"
self.additionalPythonOutput = ""
self.currentClass = ""
@@ -86,6 +87,9 @@
args += "length=4000"
self.generateAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap[elem["@type"]], args)
+ self.pythonOutput += "\n"
+ self.pythonOutput += " classInfos = dict() # brokerId => classInfo\n"
+
def startClass(self, schemaName, stats=False):
if (stats):
origPythonName = self.style.dbTableToPythonClass(schemaName)
@@ -126,13 +130,13 @@
formalArgs = formalArgs[:-2]
else:
formalArgs = ""
- self.pythonOutput += "\n def %s(self, model, managedBroker, callbackMethod%s):\n" % (elem["@name"], formalArgs)
+ self.pythonOutput += "\n def %s(self, model, callback%s):\n" % (elem["@name"], formalArgs)
self.pythonOutput += comment
self.pythonOutput += actualArgs
- self.pythonOutput += " methodId = model.registerCallback(callbackMethod)\n"
- self.pythonOutput += " model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \\\n"
- self.pythonOutput += " classToSchemaNameMap[self.__class__.__name__], \"%s\", " % (elem["@name"])
- self.pythonOutput += "args=actualArgs, packageName=\"qpid\")\n"
+ self.pythonOutput += " conn = model.connections[self.managedBroker]\n"
+ self.pythonOutput += " classInfo = self.classInfos[self.managedBroker]\n"
+ self.pythonOutput += " conn.callMethod(self.idOriginal, classInfo, \"%s\",\n" % elem["@name"]
+ self.pythonOutput += " callback, args=actualArgs)\n"
def endClass(self):
if (self.additionalPythonOutput != ""):
More information about the rhmessaging-commits
mailing list