[rhmessaging-commits] rhmessaging commits: r1754 - in mgmt: mint/python/mint and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Mar 3 16:43:05 EST 2008


Author: justi9
Date: 2008-03-03 16:43:05 -0500 (Mon, 03 Mar 2008)
New Revision: 1754

Modified:
   mgmt/cumin/python/cumin/__init__.py
   mgmt/cumin/python/cumin/model.py
   mgmt/mint/python/mint/__init__.py
   mgmt/mint/python/mint/schema.py
   mgmt/mint/python/mint/schemaparser.py
Log:
Repairs mint method calls, so they now work with the new managment
api.  Eliminates the brokerId argument to method calls on mint
classes, since that bit information is already present on the object.



Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py	2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/cumin/python/cumin/__init__.py	2008-03-03 21:43:05 UTC (rev 1754)
@@ -78,7 +78,7 @@
         while True:
             for reg in BrokerRegistration.select():
                 if reg.broker is None or reg.broker.managedBroker not in \
-                        self.model.data.connectedBrokers:
+                        self.model.data.connections:
 
                     attempts = self.attempts.get(reg, 0)
                     attempts += 1

Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py	2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/cumin/python/cumin/model.py	2008-03-03 21:43:05 UTC (rev 1754)
@@ -529,7 +529,7 @@
             return "Purge"
 
         def do_invoke(self, object, args, completion):
-            object.purge(self.model.data, object.managedBroker, completion)
+            object.purge(self.model.data, completion)
 
 class CuminExchange(CuminClass):
     def __init__(self, model):
@@ -634,7 +634,7 @@
             return "Close"
 
         def do_invoke(self, object, args, completion):
-            object.close(self.model.data, object.managedBroker, completion)
+            object.close(self.model.data, completion)
 
 class CuminSession(CuminClass):
     def __init__(self, model):
@@ -668,30 +668,28 @@
             return "Close"
 
         def do_invoke(self, object, args, completion):
-            object.close(self.model.data, object.managedBroker, completion)
+            object.close(self.model.data, completion)
 
     class Detach(CuminAction):
         def get_title(self, session):
             return "Detach"
 
         def do_invoke(self, object, args, completion):
-            object.detach(self.model.data, object.managedBroker, completion)
+            object.detach(self.model.data, completion)
 
     class ResetLifespan(CuminAction):
         def get_title(self, session):
             return "Reset Lifespan"
 
         def do_invoke(self, object, args, completion):
-            object.resetLifespan(self.model.data, object.managedBroker,
-                                 completion)
+            object.resetLifespan(self.model.data, completion)
 
     class SolicitAck(CuminAction):
         def get_title(self, session):
             return "Solicit Acknowledgment"
 
         def do_invoke(self, object, args, completion):
-            object.solicitAck(self.model.data, object.managedBroker,
-                              completion)
+            object.solicitAck(self.model.data, completion)
 
 class CuminBrokerRegistration(CuminClass):
     def __init__(self, model):

Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py	2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/mint/python/mint/__init__.py	2008-03-03 21:43:05 UTC (rev 1754)
@@ -151,6 +151,7 @@
   def __init__(self, model, host, port):
     self.model = model
     self.key = "%s:%i" % (host, port)
+    self.objs = OriginalIdDict()
 
     spec = qpid.spec.load(model.specPath)
     self.client = qpid.client.Client(host, port, spec)
@@ -158,15 +159,11 @@
                                     self.model.configCallback,
                                     self.model.instCallback,
                                     self.model.methodCallback)
-    #self.mclient.schemaListener(self.model.schemaCallback)
+    self.mclient.schemaListener(self.model.schemaCallback)
 
     self.state = None # in (None, "opening", "opened", "closing", "closed")
     self.exception = None
 
-    #self.broker.configListener(self.key, self.model.configCallback)
-    #self.broker.instrumentationListener(self.key, self.model.instCallback)
-    #self.broker.methodListener(self.key, self.model.methodCallback)
-
   def isOpen(self):
     return self.state == "opened"
 
@@ -179,7 +176,7 @@
         # XXX I want this to happen after broker start, but the
         # callbacks rely on the broker being in the connectedBrokers
         # dict
-        self.model.connectedBrokers[self.key] = ConnectedBroker()
+        self.model.connections[self.key] = self
 
         self.client.start({})
         self.mchan = managementChannel(self.client.channel(1),
@@ -189,8 +186,6 @@
 
         self.mclient.addChannel(self.mchan)
 
-        #self.model.connections[self.key] = self
-
         self.state = "opened"
       except Exception, e:
         self.exception = e
@@ -198,6 +193,18 @@
     finally:
       self.model.lock.release()
 
+  def callMethod(self, objId, className, methodName, callback, args):
+    self.model.lock.acquire()
+    try:
+      self.model.currentMethodId += 1
+      seq = self.model.currentMethodId
+      self.model.outstandingMethodCalls[seq] = callback
+    finally:
+      self.model.lock.release()
+
+    self.mclient.callMethod(self.mchan, seq, objId,
+                            className, methodName, args)
+
   def close(self):
     self.state = "closing"
 
@@ -206,8 +213,8 @@
       try:
         self.mclient.removeChannel(self.mchan)
 
-        #del self.model.connections[self.key]
-        del self.model.connectedBrokers[self.key]
+        del self.model.connections[self.key]
+
         self.state = "closed"
       except Exception, e:
         self.exception = e
@@ -215,17 +222,17 @@
     finally:
       self.model.lock.release()
 
-class ConnectedBroker:
-  def __init__(self):
-    self.objs = OriginalIdDict()
-
   def getByOriginalId(self, objType, idOriginal, create=False, args={}):
     return self.objs.getByOriginalId(objType, idOriginal, create, args)
 
-  def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent, create=False, args={}):
-    return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
+  def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
+                       create=False, args={}):
+    return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue,
+                                      create, args)
 
 class MintModel:
+  staticInstance = None
+
   def __init__(self, dataUri, specPath, debug=False):
     self.dataUri = dataUri
     self.specPath = specPath
@@ -233,10 +240,13 @@
 
     self.currentMethodId = 1
     self.outstandingMethodCalls = dict()
-    self.connectedBrokers = dict()
+    self.connections = dict()
 
     self.lock = Lock()
 
+    assert MintModel.staticInstance is None
+    MintModel.staticInstance = self
+
   def check(self):
     try:
       connectionForURI(self.dataUri)
@@ -275,20 +285,27 @@
         keys.append(key)
     return keys
 
-  def configCallback(self, broker, classInfo, list, timestamps):
+  def schemaCallback(self, brokerId, classInfo,
+                     configs, metric, methods, events):
+    cls = schema.schemaNameToClassMap.get(classInfo[1])
+
+    if cls:
+      cls.classInfos[brokerId] = classInfo
+
+  def configCallback(self, brokerId, classInfo, list, timestamps):
     self.log("\nCONFIG---------------------------------------------------")
     objectName = classInfo[1]
     self.log(objectName)
     d = self.sanitizeDict(dict(list))
-    connectedBroker = self.connectedBrokers[broker]
-    d["managedBroker"] = broker
+    conn = self.connections[brokerId]
+    d["managedBroker"] = brokerId
     d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
     d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
     self.log(d)
     for parentKey in self.findParentKeys(d):
       convertedKey = self.convertRefKey(parentKey)
-      d[convertedKey] = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[convertedKey], d.pop(parentKey))
-    obj = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], create=True)
+      d[convertedKey] = conn.getByOriginalId(schema.schemaNameToClassMap[convertedKey], d.pop(parentKey))
+    obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], create=True)
 
     try:
       obj.set(**d)
@@ -299,15 +316,15 @@
     self.log("END CONFIG---------------------------------------------------\n")
     return obj
 
-  def instCallback(self, broker, classInfo, list, timestamps):
+  def instCallback(self, brokerId, classInfo, list, timestamps):
     self.log("\nINST---------------------------------------------------")
     objectName = classInfo[1]
     self.log(objectName)
     d = self.sanitizeDict(dict(list))
-    connectedBroker = self.connectedBrokers[broker]
+    conn = self.connections[brokerId]
     d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
     self.log(d)
-    obj = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[objectName], d[self.convertIdKey("id")])
+    obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d[self.convertIdKey("id")])
     d[objectName] = obj
     objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
     objStats = objNameStats.__new__(objNameStats)

Modified: mgmt/mint/python/mint/schema.py
===================================================================
--- mgmt/mint/python/mint/schema.py	2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/mint/python/mint/schema.py	2008-03-03 21:43:05 UTC (rev 1754)
@@ -1,3 +1,4 @@
+import mint
 from sqlobject import *
 from datetime import datetime
 
@@ -14,6 +15,8 @@
   statsPrev = ForeignKey('SystemStats', cascade='null', default=None)
   sysId = StringCol(length=1000, default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 class SystemStats(SQLObject):
   class sqlmeta:
     lazyUpdate = True
@@ -27,6 +30,8 @@
   version = StringCol(length=1000, default=None)
   machine = StringCol(length=1000, default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 System.sqlmeta.addJoin(SQLMultipleJoin('SystemStats', joinMethodName='stats'))
 
 
@@ -53,36 +58,42 @@
   dataDirEnabled = BoolCol(default=None)
   dataDir = StringCol(length=1000, default=None)
 
-  def joinCluster(self, model, managedBroker, callbackMethod, clusterName):
+  classInfos = dict() # brokerId => classInfo
+
+  def joinCluster(self, model, callback, clusterName):
     actualArgs = dict()
     actualArgs["clusterName"] = clusterName
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "joinCluster", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "joinCluster",
+                    callback, args=actualArgs)
 
-  def leaveCluster(self, model, managedBroker, callbackMethod):
+  def leaveCluster(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "leaveCluster", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "leaveCluster",
+                    callback, args=actualArgs)
 
-  def echo(self, model, managedBroker, callbackMethod, sequence, body):
+  def echo(self, model, callback, sequence, body):
     """Request a response to test the path to the management agent"""
     actualArgs = dict()
     actualArgs["sequence"] = sequence
     actualArgs["body"] = body
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "echo", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "echo",
+                    callback, args=actualArgs)
 
-  def connect(self, model, managedBroker, callbackMethod, host, port):
+  def connect(self, model, callback, host, port):
     """Establish a connection to another broker"""
     actualArgs = dict()
     actualArgs["host"] = host
     actualArgs["port"] = port
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "connect", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "connect",
+                    callback, args=actualArgs)
 
 System.sqlmeta.addJoin(SQLMultipleJoin('Broker', joinMethodName='brokers'))
 
@@ -95,6 +106,8 @@
   recTime = TimestampCol(default=None)
   broker = ForeignKey('Broker', cascade='null', default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Broker.sqlmeta.addJoin(SQLMultipleJoin('BrokerStats', joinMethodName='stats'))
 
 
@@ -112,6 +125,8 @@
   broker = ForeignKey('Broker', cascade='null', default=None)
   name = StringCol(length=1000, default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Broker.sqlmeta.addJoin(SQLMultipleJoin('Vhost', joinMethodName='vhosts'))
 
 
@@ -123,6 +138,8 @@
   recTime = TimestampCol(default=None)
   vhost = ForeignKey('Vhost', cascade='null', default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Vhost.sqlmeta.addJoin(SQLMultipleJoin('VhostStats', joinMethodName='stats'))
 
 
@@ -143,12 +160,15 @@
   autoDelete = BoolCol(default=None)
   exclusive = BoolCol(default=None)
 
-  def purge(self, model, managedBroker, callbackMethod):
+  classInfos = dict() # brokerId => classInfo
+
+  def purge(self, model, callback):
     """Discard all messages on queue"""
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "purge", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "purge",
+                    callback, args=actualArgs)
 
 Vhost.sqlmeta.addJoin(SQLMultipleJoin('Queue', joinMethodName='queues'))
 
@@ -204,6 +224,8 @@
   messageLatencyAverage = BigIntCol(default=None)
   messageLatencySamples = BigIntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Queue.sqlmeta.addJoin(SQLMultipleJoin('QueueStats', joinMethodName='stats'))
 
 
@@ -222,6 +244,8 @@
   name = StringCol(length=1000, default=None)
   type = StringCol(length=1000, default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Vhost.sqlmeta.addJoin(SQLMultipleJoin('Exchange', joinMethodName='exchanges'))
 
 
@@ -245,6 +269,8 @@
   byteDrops = BigIntCol(default=None)
   byteRoutes = BigIntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Exchange.sqlmeta.addJoin(SQLMultipleJoin('ExchangeStats', joinMethodName='stats'))
 
 
@@ -263,6 +289,8 @@
   queue = ForeignKey('Queue', cascade='null', default=None)
   bindingKey = StringCol(length=1000, default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Exchange.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
 
 Queue.sqlmeta.addJoin(SQLMultipleJoin('Binding', joinMethodName='bindings'))
@@ -277,6 +305,8 @@
   binding = ForeignKey('Binding', cascade='null', default=None)
   msgMatched = BigIntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Binding.sqlmeta.addJoin(SQLMultipleJoin('BindingStats', joinMethodName='stats'))
 
 
@@ -294,11 +324,14 @@
   vhost = ForeignKey('Vhost', cascade='null', default=None)
   address = StringCol(length=1000, default=None)
 
-  def close(self, model, managedBroker, callbackMethod):
+  classInfos = dict() # brokerId => classInfo
+
+  def close(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "close",
+                    callback, args=actualArgs)
 
 Vhost.sqlmeta.addJoin(SQLMultipleJoin('Client', joinMethodName='clients'))
 
@@ -317,6 +350,8 @@
   bytesFromClient = BigIntCol(default=None)
   bytesToClient = BigIntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Client.sqlmeta.addJoin(SQLMultipleJoin('ClientStats', joinMethodName='stats'))
 
 
@@ -334,13 +369,16 @@
   vhost = ForeignKey('Vhost', cascade='null', default=None)
   address = StringCol(length=1000, default=None)
 
-  def close(self, model, managedBroker, callbackMethod):
+  classInfos = dict() # brokerId => classInfo
+
+  def close(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "close",
+                    callback, args=actualArgs)
 
-  def bridge(self, model, managedBroker, callbackMethod, src, dest, key, src_is_queue, src_is_local):
+  def bridge(self, model, callback, src, dest, key, src_is_queue, src_is_local):
     """Bridge messages over the link"""
     actualArgs = dict()
     actualArgs["src"] = src
@@ -348,9 +386,10 @@
     actualArgs["key"] = key
     actualArgs["src_is_queue"] = src_is_queue
     actualArgs["src_is_local"] = src_is_local
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "bridge", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "bridge",
+                    callback, args=actualArgs)
 
 Vhost.sqlmeta.addJoin(SQLMultipleJoin('Link', joinMethodName='links'))
 
@@ -369,6 +408,8 @@
   bytesFromPeer = BigIntCol(default=None)
   bytesToPeer = BigIntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Link.sqlmeta.addJoin(SQLMultipleJoin('LinkStats', joinMethodName='stats'))
 
 
@@ -391,11 +432,14 @@
   srcIsQueue = BoolCol(default=None)
   srcIsLocal = BoolCol(default=None)
 
-  def close(self, model, managedBroker, callbackMethod):
+  classInfos = dict() # brokerId => classInfo
+
+  def close(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "close",
+                    callback, args=actualArgs)
 
 Link.sqlmeta.addJoin(SQLMultipleJoin('Bridge', joinMethodName='bridges'))
 
@@ -408,6 +452,8 @@
   recTime = TimestampCol(default=None)
   bridge = ForeignKey('Bridge', cascade='null', default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Bridge.sqlmeta.addJoin(SQLMultipleJoin('BridgeStats', joinMethodName='stats'))
 
 
@@ -428,29 +474,35 @@
   client = ForeignKey('Client', cascade='null', default=None)
   detachedLifespan = IntCol(default=None)
 
-  def solicitAck(self, model, managedBroker, callbackMethod):
+  classInfos = dict() # brokerId => classInfo
+
+  def solicitAck(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "solicitAck", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "solicitAck",
+                    callback, args=actualArgs)
 
-  def detach(self, model, managedBroker, callbackMethod):
+  def detach(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "detach", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "detach",
+                    callback, args=actualArgs)
 
-  def resetLifespan(self, model, managedBroker, callbackMethod):
+  def resetLifespan(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "resetLifespan", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "resetLifespan",
+                    callback, args=actualArgs)
 
-  def close(self, model, managedBroker, callbackMethod):
+  def close(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "close",
+                    callback, args=actualArgs)
 
 Vhost.sqlmeta.addJoin(SQLMultipleJoin('Session', joinMethodName='sessions'))
 
@@ -468,6 +520,8 @@
   expireTime = BigIntCol(default=None)
   framesOutstanding = IntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Session.sqlmeta.addJoin(SQLMultipleJoin('SessionStats', joinMethodName='stats'))
 
 
@@ -485,25 +539,30 @@
   session = ForeignKey('Session', cascade='null', default=None)
   name = StringCol(length=1000, default=None)
 
-  def throttle(self, model, managedBroker, callbackMethod, strength):
+  classInfos = dict() # brokerId => classInfo
+
+  def throttle(self, model, callback, strength):
     """Apply extra rate limiting to destination: 0 = Normal, 10 = Maximum"""
     actualArgs = dict()
     actualArgs["strength"] = strength
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "throttle", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "throttle",
+                    callback, args=actualArgs)
 
-  def stop(self, model, managedBroker, callbackMethod):
+  def stop(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "stop", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "stop",
+                    callback, args=actualArgs)
 
-  def start(self, model, managedBroker, callbackMethod):
+  def start(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "start", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "start",
+                    callback, args=actualArgs)
 
 Session.sqlmeta.addJoin(SQLMultipleJoin('Destination', joinMethodName='destinations'))
 
@@ -521,6 +580,8 @@
   msgCredits = IntCol(default=None)
   byteCredits = IntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Destination.sqlmeta.addJoin(SQLMultipleJoin('DestinationStats', joinMethodName='stats'))
 
 
@@ -538,6 +599,8 @@
   destination = ForeignKey('Destination', cascade='null', default=None)
   exchange = ForeignKey('Exchange', cascade='null', default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Destination.sqlmeta.addJoin(SQLMultipleJoin('Producer', joinMethodName='producers'))
 
 Exchange.sqlmeta.addJoin(SQLMultipleJoin('Producer', joinMethodName='producers'))
@@ -553,6 +616,8 @@
   msgsProduced = BigIntCol(default=None)
   bytesProduced = BigIntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Producer.sqlmeta.addJoin(SQLMultipleJoin('ProducerStats', joinMethodName='stats'))
 
 
@@ -570,11 +635,14 @@
   destination = ForeignKey('Destination', cascade='null', default=None)
   queue = ForeignKey('Queue', cascade='null', default=None)
 
-  def close(self, model, managedBroker, callbackMethod):
+  classInfos = dict() # brokerId => classInfo
+
+  def close(self, model, callback):
     actualArgs = dict()
-    methodId = model.registerCallback(callbackMethod)
-    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \
-      classToSchemaNameMap[self.__class__.__name__], "close", args=actualArgs, packageName="qpid")
+    conn = model.connections[self.managedBroker]
+    classInfo = self.classInfos[self.managedBroker]
+    conn.callMethod(self.idOriginal, classInfo, "close",
+                    callback, args=actualArgs)
 
 Destination.sqlmeta.addJoin(SQLMultipleJoin('Consumer', joinMethodName='consumers'))
 
@@ -594,6 +662,8 @@
   unackedMessagesLow = IntCol(default=None)
   unackedMessagesHigh = IntCol(default=None)
 
+  classInfos = dict() # brokerId => classInfo
+
 Consumer.sqlmeta.addJoin(SQLMultipleJoin('ConsumerStats', joinMethodName='stats'))
 
 

Modified: mgmt/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/mint/python/mint/schemaparser.py	2008-03-03 17:30:06 UTC (rev 1753)
+++ mgmt/mint/python/mint/schemaparser.py	2008-03-03 21:43:05 UTC (rev 1754)
@@ -9,7 +9,8 @@
     self.pythonFilePath = pythonFilePath
     self.dsn = dsn
     self.style = MixedCaseUnderscoreStyle()
-    self.pythonOutput = "from sqlobject import *\n"
+    self.pythonOutput = "import mint\n"
+    self.pythonOutput += "from sqlobject import *\n"
     self.pythonOutput += "from datetime import datetime\n"
     self.additionalPythonOutput = ""
     self.currentClass = ""
@@ -86,6 +87,9 @@
           args += "length=4000"
         self.generateAttrib(self.attrNameFromDbColumn(elem["@name"]), self.dataTypesMap[elem["@type"]], args)
 
+    self.pythonOutput += "\n"
+    self.pythonOutput += "  classInfos = dict() # brokerId => classInfo\n"
+
   def startClass(self, schemaName, stats=False):
     if (stats):
       origPythonName = self.style.dbTableToPythonClass(schemaName)
@@ -126,13 +130,13 @@
       formalArgs = formalArgs[:-2]
     else:
       formalArgs = ""
-    self.pythonOutput += "\n  def %s(self, model, managedBroker, callbackMethod%s):\n" % (elem["@name"], formalArgs)
+    self.pythonOutput += "\n  def %s(self, model, callback%s):\n" % (elem["@name"], formalArgs)
     self.pythonOutput += comment
     self.pythonOutput += actualArgs
-    self.pythonOutput += "    methodId = model.registerCallback(callbackMethod)\n"
-    self.pythonOutput += "    model.connectedBrokers[managedBroker].managedBroker.method(methodId, self.idOriginal, \\\n"
-    self.pythonOutput += "      classToSchemaNameMap[self.__class__.__name__], \"%s\", " % (elem["@name"])
-    self.pythonOutput += "args=actualArgs, packageName=\"qpid\")\n"
+    self.pythonOutput += "    conn = model.connections[self.managedBroker]\n"
+    self.pythonOutput += "    classInfo = self.classInfos[self.managedBroker]\n"
+    self.pythonOutput += "    conn.callMethod(self.idOriginal, classInfo, \"%s\",\n" % elem["@name"]
+    self.pythonOutput += "                    callback, args=actualArgs)\n"
 
   def endClass(self):
     if (self.additionalPythonOutput != ""):




More information about the rhmessaging-commits mailing list