[rhmessaging-commits] rhmessaging commits: r2805 - in mgmt/trunk: cumin/python/wooly and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Nov 14 15:12:00 EST 2008


Author: justi9
Date: 2008-11-14 15:12:00 -0500 (Fri, 14 Nov 2008)
New Revision: 2805

Modified:
   mgmt/trunk/cumin/python/cumin/__init__.py
   mgmt/trunk/cumin/python/cumin/broker.py
   mgmt/trunk/cumin/python/cumin/model.py
   mgmt/trunk/cumin/python/cumin/page.py
   mgmt/trunk/cumin/python/cumin/page.strings
   mgmt/trunk/cumin/python/cumin/tools.py
   mgmt/trunk/cumin/python/wooly/forms.py
   mgmt/trunk/mint/python/mint/__init__.py
   mgmt/trunk/mint/python/mint/schema.py
   mgmt/trunk/mint/python/mint/schemaparser.py
   mgmt/trunk/mint/python/mint/update.py
   mgmt/trunk/mint/sql/schema.sql
   mgmt/trunk/parsley/python/parsley/command.py
Log:
Nuno's adaptation of mint to the qmfconsole api, which replaces the
older "management.py" api.



Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/__init__.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -18,6 +18,7 @@
 from action import ActionPage
 from user import LoginPage, UserSession
 from datetime import timedelta
+from qpid.util import URL
 
 from wooly import Session
 
@@ -121,8 +122,7 @@
     def do_run(self):
         while True:
             for reg in BrokerRegistration.select():
-                if reg.broker is None or reg.broker.managedBroker not in \
-                        self.model.data.connections:
+                if reg.broker is None or reg.getBrokerId() not in self.model.data.managedBrokers:
                     attempts = self.attempts.get(reg, 0)
                     attempts += 1
                     self.attempts[reg] = attempts
@@ -133,7 +133,6 @@
                         reg.connect(self.model.data)
                     elif attempts % 100 == 0:
                         reg.connect(self.model.data)
-
             self.event.wait(10)
 
 class CuminServer(WebServer):

Modified: mgmt/trunk/cumin/python/cumin/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/broker.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/broker.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -658,14 +658,9 @@
 
                 if addr:
                     name = names[i]
-                    host, port = parse_broker_addr(addr)
+                    url = "amqp://%s:%i" % parse_broker_addr(addr)
 
-                    args = {
-                        "name": name,
-                        "host": host,
-                        "port": port
-                        }
-
+                    args = {"name": name, "url": url}
                     reg = action.invoke(None, args)
 
                     if len(groups) > i:
@@ -701,13 +696,13 @@
                     errs = aerrs.setdefault(i, list())
                     errs.append("The address field is empty; it is required")
                 else:
-                    host, port = parse_broker_addr(addr)
+                    #host, port = parse_broker_addr(addr)
                     count = BrokerRegistration.selectBy \
-                        (host=host, port=port).count()
+                        (url=addr).count()
 
                     if count:
                         errs = aerrs.setdefault(i, list())
-                        errs.append("The broker at %s:%i " % (host, port) +
+                        errs.append("The broker at %s " % (url) +
                                     "is already registered")
 
         return not len(nerrs) and not len(aerrs)

Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/model.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -1668,14 +1668,10 @@
         super(CuminBrokerRegistration, self).__init__ \
             (model, "broker_registration", BrokerRegistration)
 
-        prop = CuminProperty(self, "host")
-        prop.title = "Host"
+        prop = CuminProperty(self, "url")
+        prop.title = "URL"
         prop.summary = True
 
-        prop = CuminProperty(self, "port")
-        prop.title = "Port"
-        prop.summary = True
-
         action = self.Add(self, "add")
         action.title = "Add"
         action.navigable = False

Modified: mgmt/trunk/cumin/python/cumin/page.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/page.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -209,6 +209,9 @@
         heading = self.Heading(app, "heading")
         self.add_child(heading)
 
+        self.add_tab(self.OverviewTab(app, "over"))
+        self.add_tab(self.AccountTab(app, "acct"))
+
     def render_change_password_href(self, session):
         branch = session.branch()
         self.frame.change_password.show(branch)
@@ -218,6 +221,22 @@
         def render_title(self, session):
             return "MRG Management"
 
+    class OverviewTab(Widget):
+        def render_title(self, session):
+            return "Overview"
+
+        def render_content(self, session):
+            pass
+
+    class AccountTab(ActionSet):
+        def render_title(self, session):
+            return "Your Account"
+
+        def render_change_password_href(self, session):
+            branch = session.branch()
+            self.frame.change_password.show(branch)
+            return branch.marshal()
+
 class MessagingView(TabbedModeSet):
     def __init__(self, app, name):
         super(MessagingView, self).__init__(app, name)

Modified: mgmt/trunk/cumin/python/cumin/page.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.strings	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/page.strings	2008-11-14 20:12:00 UTC (rev 2805)
@@ -180,11 +180,15 @@
 <div class="oblock">
   {heading}
 
-  <ul class="actions">
-    <a class="nav" href="{change_password_href}">Change Password</a>
-  </ul>
+  <ul class="TabbedModeSet tabs">{tabs}</ul>
+  <div class="TabbedModeSet mode">{mode}</div>
 </div>
 
+[AccountTab.html]
+<ul class="actions">
+  <a class="nav" href="{change_password_href}">Change Password</a>
+</ul>
+
 [MessagingView.html]
 <script type="text/javascript">
 <![CDATA[

Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/tools.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -31,9 +31,6 @@
 
         self.config = CuminConfig()
 
-        opt = CommandOption(self, "help", "h")
-        opt.description = "Print this message"
-
         opt = CommandOption(self, "data")
         opt.argument = "URI"
         opt.description = "Connect to database at URI"
@@ -84,6 +81,20 @@
         opt = CommandOption(command, "force")
         opt.description = "Don't complain and just do it"
 
+        command = self.AddBroker(self, "add-broker")
+        command.arguments = ("NAME", "URL")
+        command.description = "Add a new broker called NAME at URL"
+
+        command = self.RemoveBroker(self, "remove-broker")
+        command.arguments = ("URL",)
+        command.description = "Remove broker called NAME; requires --force"
+
+        opt = CommandOption(command, "force")
+        opt.description = "Don't complain and just do it"
+
+        command = self.ListBrokers(self, "list-brokers")
+        command.description = "List existing broker registrations"
+
         command = self.AddUser(self, "add-user")
         command.arguments = ("NAME",)
         command.description = "Add a new user called NAME"
@@ -179,6 +190,46 @@
         def run(self, opts, args):
             self.parent.database.checkSchema()
 
+    class AddBroker(Command):
+        def run(self, opts, args):
+            try:
+                name, url = args[1:]
+            except IndexError:
+                raise CommandException(self, "NAME and URL are required")
+
+            for reg in BrokerRegistration.selectBy(name=name):
+                print "Error: a broker called '%s' already exists" % name
+                sys.exit(1)
+
+            for reg in BrokerRegistration.selectBy(url=url):
+                print "Error: a broker at %s already exists" % url
+                sys.exit(1)
+
+            reg = BrokerRegistration(name=name, url=url)
+            reg.syncUpdate()
+
+    class RemoveBroker(Command):
+        def run(self, opts, args):
+            try:
+                name = args[1]
+            except IndexError:
+                raise CommandException(self, "NAME is required")
+
+            for reg in BrokerRegistration.selectBy(name=name):
+                break
+
+            if reg:
+                reg.destroySelf()
+                reg.syncUpdate()
+            else:
+                raise CommandException \
+                    (self, "Broker '%s' is unknown", reg.name)
+
+    class ListBrokers(Command):
+        def run(self, opts, args):
+            for reg in BrokerRegistration.select():
+                print reg.name, reg.url
+
     class AddUser(Command):
         def run(self, opts, args):
             try:

Modified: mgmt/trunk/cumin/python/wooly/forms.py
===================================================================
--- mgmt/trunk/cumin/python/wooly/forms.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/wooly/forms.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -184,6 +184,9 @@
 class PasswordInput(StringInput):
     pass
 
+# XXX Why does this have a boolean param?  Shouldn't the name suggest
+# that somehow?  Would some folks want a hidden input with a different
+# param?  I think this needs to take a param
 class HiddenInput(ScalarInput):
     def __init__(self, app, name):
         super(HiddenInput, self).__init__(app, name, None)

Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/__init__.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,14 +1,12 @@
-import sys, os, socket, qpid, logging
-from qpid.datatypes import uuid4
-from qpid.connection import Connection as QpidConnection
-from qpid.util import connect
-from qpid.management import managementChannel, managementClient
-from datetime import *
-from sqlobject import *
+import logging
+import qpid.qmfconsole
+import struct
 from threading import Lock, RLock
-from traceback import print_exc, print_stack
-
-from mint import schema
+from sqlobject import *
+from traceback import print_exc
+from qpid.util import URL
+from qpid.datatypes import UUID
+from mint.schema import *
 from mint import update
 
 log = logging.getLogger("mint")
@@ -28,95 +26,118 @@
                                     cascade="null", default=None,
                                     name="registration"))
 
-class MintInfo(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+class MintDatabase(object):
+  def __init__(self, uri):
+    self.uri = uri
 
-  version = StringCol(length=1000, default="0.1", notNone=True)
+  def getConnection(self):
+    return connectionForURI(self.uri).getConnection()
 
-class BrokerRegistration(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+  def check(self):
+    self.checkConnection()
 
-  name = StringCol(length=1000, default=None, unique=True, notNone=True)
-  host = StringCol(length=1000, default=None, notNone=True)
-  port = IntCol(default=None, notNone=True)
-  broker = ForeignKey("Broker", cascade="null", default=None)
-  groups = SQLRelatedJoin("BrokerGroup",
-                          intermediateTable="broker_group_mapping",
-                          createRelatedTable=False)
-  cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
-  profile = ForeignKey("BrokerProfile", cascade="null", default=None)
+  def init(self):
+    sqlhub.processConnection = connectionForURI(self.uri)
 
-  host_port_unique = index.DatabaseIndex(host, port, unique=True)
+  def checkConnection(self):
+    conn = self.getConnection()
 
-  def connect(self, model):
-    log.info("Connecting to broker '%s' at %s:%i" % \
-               (self.name, self.host, self.port or 5672))
+    try:
+      cursor = conn.cursor()
+      cursor.execute("select now()")
+    finally:
+      conn.close()
 
-    conn = BrokerConnection(model, self.host, self.port or 5672)
+  def checkSchema(self):
+    pass
 
+  def dropSchema(self):
+    conn = self.getConnection()
     try:
-      conn.open()
-      log.info("Connection succeeded")
-    except:
-      log.info("Connection failed: " + str(conn.exception))
+      cursor = conn.cursor()
+      
+      cursor.execute("drop schema public cascade")
 
-  def getDefaultVhost(self):
-    if self.broker:
+      conn.commit()
+    finally:
+      conn.close()
+
+  def createSchema(self, file_paths):
+    conn = self.getConnection()
+
+    scripts = list()
+
+    for path in file_paths:
+      file = open(path, "r")
       try:
-        return Vhost.selectBy(broker=self.broker, name="/")[0]
-      except IndexError:
+        scripts.append((path, file.read()))
+      finally:
+        file.close()
+
+    try:
+      cursor = conn.cursor()
+
+      try:
+        cursor.execute("create schema public");
+      except:
+        conn.rollback()
         pass
 
-class BrokerGroup(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+      for path, text in scripts:
+        stmts = text.split(";")
+        count = 0
 
-  name = StringCol(length=1000, default=None, unique=True, notNone=True)
-  brokers = SQLRelatedJoin("BrokerRegistration",
-                           intermediateTable="broker_group_mapping",
-                           createRelatedTable=False)
+        for stmt in stmts:
+          stmt = stmt.strip()
 
-class BrokerGroupMapping(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+          if stmt:
+            try:
+              cursor.execute(stmt)
+            except Exception, e:
+              print "Failed executing statement:"
+              print stmt
 
-  brokerRegistration = ForeignKey("BrokerRegistration", notNull=True,
-                                  cascade=True)
-  brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
-  unique = index.DatabaseIndex(brokerRegistration, brokerGroup, unique=True)
+              raise e
 
-class BrokerCluster(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+            count += 1
 
-  name = StringCol(length=1000, default=None)
-  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
+        print "Executed %i statements from file '%s'" % (count, path)
 
-class BrokerProfile(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+      conn.commit()
 
-  name = StringCol(length=1000, default=None)
-  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
-  properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
+      info = MintInfo(version="0.1")
+      info.sync()
 
-class ConfigProperty(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+      # Standard roles
 
-  name = StringCol(length=1000, default=None)
-  value = StringCol(length=1000, default=None)
-  type = StringCol(length=1, default="s")
+      user = Role(name="user")
+      user.sync()
 
-class CollectorRegistration(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
+      admin = Role(name="admin")
+      admin.sync()
+    finally:
+      conn.close()
 
-  name = StringCol(length=1000, default=None)
-  collectorId = StringCol(length=1000, default=None)
+  def checkSchema(self):
+    conn = self.getConnection()
 
+    try:
+      cursor = conn.cursor()
+
+      try:
+        cursor.execute("select version from mint_info");
+      except Exception, e:
+        print "No schema present"
+        return
+
+      for rec in cursor:
+        print "OK (version %s)" % rec[0]
+        return;
+
+      print "No schema present"
+    finally:
+      conn.close()
+
 class Subject(SQLObject):
   class sqlmeta:
     lazyUpdate = True
@@ -165,139 +186,101 @@
 class ObjectNotFound(Exception):
   pass
 
-# Not thread safe
-class BrokerConnection(object):
-  def __init__(self, model, host, port):
-    self.model = model
-    self.host = host
-    self.port = port
-    self.id = "%s:%i" % (host, port)
-    self.objectsById = dict()
+class MintInfo(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
 
-    # Set upon receiving a broker info control
-    self.sessionId = None
-    self.brokerId = None
+  version = StringCol(length=1000, default="0.1", notNone=True)
 
-    # state in (None, "opening", "opened", "closing", "closed")
-    self.state = None
-    self.exception = None
+class CollectorRegistration(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
 
-    self.mconn = None
-    self.mclient = None
-    self.mchan = None
+  name = StringCol(length=1000, default=None)
+  collectorId = StringCol(length=1000, default=None)
 
-  def getObject(self, cls, id):
-    compositeId = "%s:%s" % (id.first, id.second)
+class BrokerRegistration(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
 
-    try:
-      obj = self.objectsById[compositeId]
-    except KeyError:
-      try:
-        obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second, managedBroker=self.id)[0]
-        self.objectsById[compositeId] = obj
-      except IndexError:
-        raise ObjectNotFound()
+  name = StringCol(length=1000, default=None, unique=True, notNone=True)
+  url = StringCol(length=1000, default=None)
+  qmfBroker = None
+  broker = ForeignKey("Broker", cascade="null", default=None)
+  groups = SQLRelatedJoin("BrokerGroup",
+                          intermediateTable="broker_group_mapping",
+                          createRelatedTable=False)
+  cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
+  profile = ForeignKey("BrokerProfile", cascade="null", default=None)
 
-    return obj
+  url_unique = index.DatabaseIndex(url, unique=True)
 
-  def isOpen(self):
-    return self.state == "opened"
-
-  def open(self):
-    assert self.mconn is None
-    assert self.mclient is None
-    assert self.mchan is None
-
-    self.state = "opening"
-
+  def connect(self, model):
+    log.info("Connecting to broker '%s' at %s" % (self.name, self.url))
     try:
-      spec = qpid.spec.load(self.model.specPath)
-      sock = connect(self.host, self.port)
+      self.qmfBroker = model.getSession().addBroker(self.url)
+      log.info("Connection succeeded")
     except Exception, e:
-      self.exception = e
-      raise e
+      log.info("Connection failed: %s ", e.message)
+      print_exc()
 
-    self.mconn = QpidConnection(sock, spec)
-    self.mclient = managementClient(spec, 
-                                    self.model.controlCallback,
-                                    self.model.propsCallback,
-                                    self.model.statsCallback,
-                                    self.model.methodCallback,
-                                    self.model.closeCallback)
-    self.mclient.schemaListener(self.model.schemaCallback)
+  def getBrokerId(self):
+    if self.qmfBroker is not None:
+      return str(self.qmfBroker.getBrokerId())
+    else:
+      return None
 
-    try:
-      self.mconn.start()
-      self.mchan = self.mclient.addChannel \
-          (self.mconn.session(str(uuid4())), self)
+  def getDefaultVhost(self):
+    if self.broker:
+      try:
+        return Vhost.selectBy(broker=self.broker, name="/")[0]
+      except IndexError:
+        return None
 
-      self.state = "opened"
-    except Exception, e:
-      self.exception = e
-      raise e
+class BrokerGroup(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
 
-    self.model.lock()
-    try:
-      self.model.connections[self.id] = self
-    finally:
-      self.model.unlock()
+  name = StringCol(length=1000, default=None, unique=True, notNone=True)
+  brokers = SQLRelatedJoin("BrokerRegistration",
+                           intermediateTable="broker_group_mapping",
+                           createRelatedTable=False)
 
-  def getSessionId(self):
-    if not self.isOpen():
-      raise Exception("Connection not open")
+class BrokerGroupMapping(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
 
-    return self.mchan.sessionId
+  brokerRegistration = ForeignKey("BrokerRegistration", notNull=True,
+                                  cascade=True)
+  brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+  unique = index.DatabaseIndex(brokerRegistration, brokerGroup, unique=True)
 
-  def callMethod(self, objId, className, methodName, callback, args):
-    if not self.isOpen():
-      raise Exception("Connection not open")
+class BrokerCluster(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
 
-    self.model.lock()
-    try:
-      self.model.currentMethodId += 1
-      seq = self.model.currentMethodId
-      self.model.outstandingMethodCalls[seq] = callback
-    finally:
-      self.model.unlock()
+  name = StringCol(length=1000, default=None)
+  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
 
-    self.mclient.callMethod(self.mchan, seq, objId,
-                            className, methodName, args)
+class BrokerProfile(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
 
-  def close(self):
-    self.state = "closing"
+  name = StringCol(length=1000, default=None)
+  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
+  properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
 
-    self.model.lock()
-    try:
-        del self.model.connections[self.id]
-    finally:
-      self.model.unlock()
-
-    try:
-      self.mclient.removeChannel(self.mchan)
-
-      self.state = "closed"
-    except Exception, e:
-      self.exception = e
-      raise e
-
-    self.mconn.close()
-    # XXX What else do I need to try to shutdown here?
-
-    self.mconn = None
-    self.mclient = None
-    self.mchan = None
-
-class MintModel:
+  
+class MintModel(qpid.qmfconsole.Console):
   staticInstance = None
 
-  def __init__(self, dataUri, specPath, debug=False):
+  def __init__(self, dataUri, specPath="", debug=False):
     self.dataUri = dataUri
-    self.specPath = specPath
     self.debug = debug
 
-    self.currentMethodId = 1
-    self.outstandingMethodCalls = dict()
-    self.connections = dict()
+    assert MintModel.staticInstance is None
+    MintModel.staticInstance = self
+
     self.connCloseListener = None
     self.__lock = RLock()
 
@@ -306,15 +289,14 @@
     self.orphanObjectMap = dict()
 
     self.updateThread = update.ModelUpdateThread(self)
+    self.mgmtSession = qpid.qmfconsole.Session(self)
+    self.outstandingMethodCalls = dict()
+    self.managedBrokers = dict()
 
-    assert MintModel.staticInstance is None
-    MintModel.staticInstance = self
-
     if self.debug:
       log.setLevel(logging.DEBUG)
 
   def lock(self):
-    #print_stack()
     self.__lock.acquire()
 
   def unlock(self):
@@ -326,12 +308,10 @@
     except Exception, e:
       if hasattr(e, "message") and e.message.find("does not exist"):
         print "Database not found; run cumin-database-init"
-
       raise e
 
   def init(self):
-    conn = connectionForURI(self.dataUri)
-    sqlhub.processConnection = conn
+    sqlhub.processConnection = connectionForURI(self.dataUri)
 
   def start(self):
     self.updateThread.start()
@@ -342,156 +322,99 @@
   def setCloseListener(self, connCloseListener):
     self.connCloseListener = connCloseListener
 
-  def schemaCallback(self, conn, classInfo, props, stats, methods, events):
-    up = update.SchemaUpdate(conn, classInfo, props, stats, methods, events)
-    self.updateThread.enqueue(up)
+  def getObject(self, cls, id, broker):
+    obj = None
+    if isinstance(id, qpid.qmfconsole.ObjectId):
+      compositeId = "%s:%s" % (id.first, id.second)
+      try:
+        obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second)[0]
+      except IndexError:
+        raise ObjectNotFound()
+    elif cls.__name__.endswith("Pool"):
+      try:
+        obj = cls.selectBy(sourceId=id)[0]
+      except IndexError:
+        raise ObjectNotFound()
+      
+    return obj
 
-  def propsCallback(self, conn, classInfo, props, timestamps):
-    up = update.PropertyUpdate(conn, classInfo, props, timestamps)
-    self.updateThread.enqueue(up)
+  def getSession(self):
+    return self.mgmtSession
 
-  def statsCallback(self, conn, classInfo, stats, timestamps):
-    up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
-    self.updateThread.enqueue(up)
-
-  def methodCallback(self, conn, methodId, errorId, errorText, args):
-    up = update.MethodUpdate(conn, methodId, errorId, errorText, args)
-    self.updateThread.enqueue(up)
-  
-  def closeCallback(self, conn, data):
-    up = update.CloseUpdate(conn, data)
-    self.updateThread.enqueue(up)
-    
-  def controlCallback(self, conn, type, data):
-    up = update.ControlUpdate(conn, type, data)
-    self.updateThread.enqueue(up)
-
-  def registerCallback(self, callback):
+  def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
     self.lock()
     try:
-      self.currentMethodId += 1
-      methodId = self.currentMethodId
-      self.outstandingMethodCalls[methodId] = callback
-      return methodId
+      broker = self.managedBrokers[managedBroker]
     finally:
       self.unlock()
 
-  def getConnectionByRegistration(self, reg):
+    pname, cname, hash = classKey.split(", ")
+    seq = self.mgmtSession._sendMethodRequest(broker, (pname, cname, hash), objId, methodName, args)
+
+    if seq is not None:
+      self.lock()
+      try:
+        self.outstandingMethodCalls[seq] = callback
+      finally:
+        self.unlock()
+    return seq
+
+  def brokerConnected(self, broker):
+    """ Invoked when a connection is established to a broker """
     self.lock()
     try:
-      return self.connections.get("%s:%i" % (reg.host, reg.port))
+      self.managedBrokers[str(broker.getBrokerId())] = broker
     finally:
       self.unlock()
 
-class MintDatabase(object):
-  def __init__(self, uri):
-    self.uri = uri
-
-  def getConnection(self):
-    return connectionForURI(self.uri).getConnection()
-
-  def check(self):
-    self.checkConnection()
-
-  def init(self):
-    conn = connectionForURI(self.uri)
-    sqlhub.processConnection = conn
-
-  def checkConnection(self):
-    conn = self.getConnection()
-
+  def brokerDisconnected(self, broker):
+    """ Invoked when the connection to a broker is lost """
+    self.lock()
     try:
-      cursor = conn.cursor()
-      cursor.execute("select now()")
+      del self.managedBrokers[str(broker.getBrokerId())]
     finally:
-      conn.close()
+      self.unlock()
+    if (self.connCloseListener != None):
+      self.connCloseListener(broker)
 
-  def checkSchema(self):
+  def newPackage(self, name):
+    """ Invoked when a QMF package is discovered. """
     pass
 
-  def dropSchema(self):
-    conn = self.getConnection()
-    try:
-      cursor = conn.cursor()
-      
-      cursor.execute("drop schema public cascade")
+  def newClass(self, kind, classKey):
+    """ Invoked when a new class is discovered.  Session.getSchema can be
+    used to obtain details about the class."""
+    pass
 
-      conn.commit()
-    finally:
-      conn.close()
+  def newAgent(self, agent):
+    """ Invoked when a QMF agent is discovered. """
+    pass
 
-  def createSchema(self, file_paths):
-    conn = self.getConnection()
+  def delAgent(self, agent):
+    """ Invoked when a QMF agent disconects. """
+    pass
 
-    scripts = list()
+  def objectProps(self, broker, record):
+    """ Invoked when an object is updated. """
+    self.updateThread.enqueue(update.PropertyUpdate(broker, record))
 
-    for path in file_paths:
-      file = open(path, "r")
-      try:
-        scripts.append((path, file.read()))
-      finally:
-        file.close()
+  def objectStats(self, broker, record):
+    """ Invoked when an object is updated. """
+    self.updateThread.enqueue(update.StatisticUpdate(broker, record))
 
-    try:
-      cursor = conn.cursor()
+  def event(self, broker, event):
+    """ Invoked when an event is raised. """
+    pass
 
-      try:
-        cursor.execute("create schema public");
-      except:
-        conn.rollback()
-        pass
+  def heartbeat(self, agent, timestamp):
+    pass
 
-      for path, text in scripts:
-        stmts = text.split(";")
-        count = 0
-
-        for stmt in stmts:
-          stmt = stmt.strip()
-
-          if stmt:
-            try:
-              cursor.execute(stmt)
-            except Exception, e:
-              print "Failed executing statement:"
-              print stmt
-
-              raise e
-
-            count += 1
-
-        print "Executed %i statements from file '%s'" % (count, path)
-
-      conn.commit()
-
-      info = MintInfo(version="0.1")
-      info.sync()
-
-      # Standard roles
-
-      user = Role(name="user")
-      user.sync()
-
-      admin = Role(name="admin")
-      admin.sync()
-    finally:
-      conn.close()
-
-  def checkSchema(self):
-    conn = self.getConnection()
-
+  def brokerInfo(self, broker):
+    self.lock()
     try:
-      cursor = conn.cursor()
-
-      try:
-        cursor.execute("select version from mint_info");
-      except Exception, e:
-        print "No schema present"
-        return
-
-      for rec in cursor:
-        print "OK (version %s)" % rec[0]
-        return;
-
-      print "No schema present"
+      self.managedBrokers[str(broker.getBrokerId())] = broker
     finally:
-      conn.close()
+      self.unlock()
+
+  def methodResponse(self, broker, seq, response):
+    self.updateThread.enqueue(update.MethodUpdate(broker, seq, response))

Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/schema.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,7 +1,7 @@
 import mint
 from sqlobject import *
 from datetime import datetime
-from qpid.management import objectId
+from qpid.qmfconsole import ObjectId
 
 class Pool(SQLObject):
   class sqlmeta:
@@ -16,6 +16,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -135,6 +136,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -172,61 +174,49 @@
 
 
   def GetAd(self, model, callback, JobAd):
-    actualArgs = dict()
-    actualArgs["JobAd"] = JobAd
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "GetAd",
+    actualArgs = list()
+    actualArgs.append(JobAd)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetAd",
                     callback, args=actualArgs)
 
   def SetAttribute(self, model, callback, Name, Value):
-    actualArgs = dict()
-    actualArgs["Name"] = Name
-    actualArgs["Value"] = Value
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "SetAttribute",
+    actualArgs = list()
+    actualArgs.append(Name)
+    actualArgs.append(Value)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetAttribute",
                     callback, args=actualArgs)
 
   def Hold(self, model, callback, Reason):
-    actualArgs = dict()
-    actualArgs["Reason"] = Reason
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "Hold",
+    actualArgs = list()
+    actualArgs.append(Reason)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Hold",
                     callback, args=actualArgs)
 
   def Release(self, model, callback, Reason):
-    actualArgs = dict()
-    actualArgs["Reason"] = Reason
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "Release",
+    actualArgs = list()
+    actualArgs.append(Reason)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Release",
                     callback, args=actualArgs)
 
   def Remove(self, model, callback, Reason):
-    actualArgs = dict()
-    actualArgs["Reason"] = Reason
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "Remove",
+    actualArgs = list()
+    actualArgs.append(Reason)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Remove",
                     callback, args=actualArgs)
 
   def Fetch(self, model, callback, File, Start, End, Data):
-    actualArgs = dict()
-    actualArgs["File"] = File
-    actualArgs["Start"] = Start
-    actualArgs["End"] = End
-    actualArgs["Data"] = Data
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "Fetch",
+    actualArgs = list()
+    actualArgs.append(File)
+    actualArgs.append(Start)
+    actualArgs.append(End)
+    actualArgs.append(Data)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Fetch",
                     callback, args=actualArgs)
 
 class JobStats(SQLObject):
@@ -247,6 +237,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -298,6 +289,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -332,6 +324,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -350,30 +343,24 @@
 
 
   def GetLimits(self, model, callback, Limits):
-    actualArgs = dict()
-    actualArgs["Limits"] = Limits
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "GetLimits",
+    actualArgs = list()
+    actualArgs.append(Limits)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetLimits",
                     callback, args=actualArgs)
 
   def SetLimit(self, model, callback, Name, Max):
-    actualArgs = dict()
-    actualArgs["Name"] = Name
-    actualArgs["Max"] = Max
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "SetLimit",
+    actualArgs = list()
+    actualArgs.append(Name)
+    actualArgs.append(Max)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetLimit",
                     callback, args=actualArgs)
 
   def Reconfig(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "Reconfig",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Reconfig",
                     callback, args=actualArgs)
 
 class NegotiatorStats(SQLObject):
@@ -401,6 +388,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -433,6 +421,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -453,21 +442,17 @@
 
 
   def Start(self, model, callback, Subsystem):
-    actualArgs = dict()
-    actualArgs["Subsystem"] = Subsystem
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "Start",
+    actualArgs = list()
+    actualArgs.append(Subsystem)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Start",
                     callback, args=actualArgs)
 
   def Stop(self, model, callback, Subsystem):
-    actualArgs = dict()
-    actualArgs["Subsystem"] = Subsystem
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "Stop",
+    actualArgs = list()
+    actualArgs.append(Subsystem)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Stop",
                     callback, args=actualArgs)
 
 class MasterStats(SQLObject):
@@ -495,6 +480,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -510,11 +496,9 @@
 
   def reloadACLFile(self, model, callback):
     """Reload the ACL file"""
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "reloadACLFile",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "reloadACLFile",
                     callback, args=actualArgs)
 
 class AclStats(SQLObject):
@@ -536,6 +520,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -552,19 +537,15 @@
 
 
   def stopClusterNode(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "stopClusterNode",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "stopClusterNode",
                     callback, args=actualArgs)
 
   def stopFullCluster(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "stopFullCluster",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "stopFullCluster",
                     callback, args=actualArgs)
 
 class ClusterStats(SQLObject):
@@ -585,6 +566,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -631,6 +613,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -652,12 +635,10 @@
 
   def expand(self, model, callback, by):
     """Increase number of files allocated for this journal"""
-    actualArgs = dict()
-    actualArgs["by"] = by
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "expand",
+    actualArgs = list()
+    actualArgs.append(by)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "expand",
                     callback, args=actualArgs)
 
 class JournalStats(SQLObject):
@@ -706,6 +687,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -738,6 +720,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -757,41 +740,35 @@
 
   def echo(self, model, callback, sequence, body):
     """Request a response to test the path to the management broker"""
-    actualArgs = dict()
-    actualArgs["sequence"] = sequence
-    actualArgs["body"] = body
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "echo",
+    actualArgs = list()
+    actualArgs.append(sequence)
+    actualArgs.append(body)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "echo",
                     callback, args=actualArgs)
 
   def connect(self, model, callback, host, port, durable, authMechanism, username, password, transport):
     """Establish a connection to another broker"""
-    actualArgs = dict()
-    actualArgs["host"] = host
-    actualArgs["port"] = port
-    actualArgs["durable"] = durable
-    actualArgs["authMechanism"] = authMechanism
-    actualArgs["username"] = username
-    actualArgs["password"] = password
-    actualArgs["transport"] = transport
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "connect",
+    actualArgs = list()
+    actualArgs.append(host)
+    actualArgs.append(port)
+    actualArgs.append(durable)
+    actualArgs.append(authMechanism)
+    actualArgs.append(username)
+    actualArgs.append(password)
+    actualArgs.append(transport)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "connect",
                     callback, args=actualArgs)
 
   def queueMoveMessages(self, model, callback, srcQueue, destQueue, qty):
     """Move messages from one queue to another"""
-    actualArgs = dict()
-    actualArgs["srcQueue"] = srcQueue
-    actualArgs["destQueue"] = destQueue
-    actualArgs["qty"] = qty
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "queueMoveMessages",
+    actualArgs = list()
+    actualArgs.append(srcQueue)
+    actualArgs.append(destQueue)
+    actualArgs.append(qty)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "queueMoveMessages",
                     callback, args=actualArgs)
 
 class BrokerStats(SQLObject):
@@ -812,6 +789,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -844,6 +822,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -873,6 +852,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -889,12 +869,10 @@
 
   def purge(self, model, callback, request):
     """Discard all or some messages on a queue"""
-    actualArgs = dict()
-    actualArgs["request"] = request
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "purge",
+    actualArgs = list()
+    actualArgs.append(request)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "purge",
                     callback, args=actualArgs)
 
 class QueueStats(SQLObject):
@@ -942,6 +920,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -985,6 +964,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1017,6 +997,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1032,11 +1013,9 @@
 
 
   def close(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "close",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
                     callback, args=actualArgs)
 
 class ClientConnectionStats(SQLObject):
@@ -1062,6 +1041,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1076,29 +1056,25 @@
 
 
   def close(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "close",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
                     callback, args=actualArgs)
 
   def bridge(self, model, callback, durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic):
     """Bridge messages over the link"""
-    actualArgs = dict()
-    actualArgs["durable"] = durable
-    actualArgs["src"] = src
-    actualArgs["dest"] = dest
-    actualArgs["key"] = key
-    actualArgs["tag"] = tag
-    actualArgs["excludes"] = excludes
-    actualArgs["srcIsQueue"] = srcIsQueue
-    actualArgs["srcIsLocal"] = srcIsLocal
-    actualArgs["dynamic"] = dynamic
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "bridge",
+    actualArgs = list()
+    actualArgs.append(durable)
+    actualArgs.append(src)
+    actualArgs.append(dest)
+    actualArgs.append(key)
+    actualArgs.append(tag)
+    actualArgs.append(excludes)
+    actualArgs.append(srcIsQueue)
+    actualArgs.append(srcIsLocal)
+    actualArgs.append(dynamic)
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "bridge",
                     callback, args=actualArgs)
 
 class LinkStats(SQLObject):
@@ -1121,6 +1097,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1141,11 +1118,9 @@
 
 
   def close(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "close",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
                     callback, args=actualArgs)
 
 class BridgeStats(SQLObject):
@@ -1166,6 +1141,7 @@
   recTime = TimestampCol(default=None)
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
+  qmfClassKey = BLOBCol(default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1182,35 +1158,27 @@
 
 
   def solicitAck(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "solicitAck",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "solicitAck",
                     callback, args=actualArgs)
 
   def detach(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "detach",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "detach",
                     callback, args=actualArgs)
 
   def resetLifespan(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "resetLifespan",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "resetLifespan",
                     callback, args=actualArgs)
 
   def close(self, model, callback):
-    actualArgs = dict()
-    conn = model.connections[self.managedBroker]
-    classInfo = self.classInfos[self.managedBroker]
-    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
-    conn.callMethod(originalId, classInfo, "close",
+    actualArgs = list()
+    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
                     callback, args=actualArgs)
 
 class SessionStats(SQLObject):

Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/schemaparser.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -139,6 +139,7 @@
     else:
       self.generateAttrib("sourceScopeId", "BigIntCol")
       self.generateAttrib("sourceObjectId", "BigIntCol")
+      self.generateAttrib("qmfClassKey", "BLOBCol")
       self.generateTimestampAttrib("creation")
       self.generateTimestampAttrib("deletion")
       self.generateAttrib("managedBroker", "StringCol", "length=1000")
@@ -154,10 +155,11 @@
     else:
       comment = ""
     formalArgs = ", "
-    actualArgs = "    actualArgs = dict()\n"
+    actualArgs = "    actualArgs = list()\n"
     for arg in elem.query["arg"]:
       formalArgs += "%s, " % (arg["@name"])
-      actualArgs += "    actualArgs[\"%s\"] = %s\n" % (arg["@name"], arg["@name"])
+      actualArgs += "    actualArgs.append(%s)\n" % (arg["@name"])
+
     if (formalArgs != ", "):
       formalArgs = formalArgs[:-2]
     else:
@@ -165,10 +167,8 @@
     self.pythonOutput += "\n  def %s(self, model, callback%s):\n" % (elem["@name"], formalArgs)
     self.pythonOutput += comment
     self.pythonOutput += actualArgs
-    self.pythonOutput += "    conn = model.connections[self.managedBroker]\n"
-    self.pythonOutput += "    classInfo = self.classInfos[self.managedBroker]\n"
-    self.pythonOutput += "    originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)\n"
-    self.pythonOutput += "    conn.callMethod(originalId, classInfo, \"%s\",\n" % elem["@name"]
+    self.pythonOutput += "    originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)\n"
+    self.pythonOutput += "    model.callMethod(self.managedBroker, originalId, self.qmfClassKey, \"%s\",\n" % elem["@name"]
     self.pythonOutput += "                    callback, args=actualArgs)\n"
 
   def endClass(self):
@@ -183,7 +183,7 @@
     self.pythonOutput += "import mint\n"
     self.pythonOutput += "from sqlobject import *\n"
     self.pythonOutput += "from datetime import datetime\n"
-    self.pythonOutput += "from qpid.management import objectId\n\n"
+    self.pythonOutput += "from qpid.qmfconsole import ObjectId\n\n"
     self.pythonOutput += "class Pool(SQLObject):\n"
     self.pythonOutput += "  class sqlmeta:\n"
     self.pythonOutput += "    lazyUpdate = True\n"

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/update.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,16 +1,13 @@
 import logging
-
+import datetime
+import types
+import struct
 from Queue import Queue as ConcurrentQueue, Full, Empty
 from threading import Thread
-from datetime import datetime
-from qpid.management import managementClient
-from struct import unpack
-from schema import schemaReservedWordsMap
-from sqlobject import DateTimeCol, TimestampCol, Col
+from traceback import print_exc
+from qpid.datatypes import UUID
+from mint.schema import *
 
-import types
-import mint
-
 log = logging.getLogger("mint.update")
 
 def time_unwarp(t):
@@ -33,14 +30,10 @@
       self.updates.put(update)
     except Full:
       log.exception("Queue is full")
+      pass
 
   def run(self):
     while True:
-      #size = self.updates.qsize()
-      #
-      #if size > 1:
-      #  log.debug("Queue depth is %i", self.updates.qsize()) 
-
       try:
         update = self.updates.get(True, 1)
       except Empty:
@@ -54,303 +47,202 @@
         update.process(self.model)
       except:
         log.exception("Update failed")
+        pass
 
   def stop(self):
     self.stopRequested = True
 
-class UnknownClassException(Exception):
-  pass
+class ModelUpdate(object):
+  def __init__(self, broker, obj):
+    self.broker = broker
+    self.qmfObj = obj
 
-def unmarshalClassInfo(classInfo):
-  package = classInfo[0]
-  name = classInfo[1].capitalize()
+  def getStatsClass(self, cls):
+    return getattr(mint, cls.__name__ + "Stats")
 
-  if name == "Connection":
-    name = "ClientConnection"
+  def process(self):
+    pass
 
-  cls = getattr(mint, name)
+  def processAttributes(self, attrs, cls, model):
+    # translate keys into their string representation
+    for key in attrs.keys():
+      attrs[key.__repr__()] = attrs.pop(key)
 
-  if cls is None:
-    raise UnknownClassException("Class '%s' is unknown" % name)
+    orphan = False
+    for name in attrs.keys():
+      rename = ""
+      orig_name = name
+      if name in mint.schema.schemaReservedWordsMap:
+        rename = mint.schema.schemaReservedWordsMap.get(name)
+        attrs[rename] = attrs.pop(name)
+        name = rename
 
-  return package, cls
+      if len(name) > 3 and name.endswith("Ref"):
+        # Navigate to referenced objects
+        clsname = name[0].upper() + name[1:-3]
+        id = attrs.pop(name)
 
-def processAttributes(conn, attrs, cls, model, updateObj):
-  attrs.pop("id")
+        othercls = getattr(mint, clsname, None)
 
-  if "connectionRef" in attrs:
-    attrs["clientConnectionRef"] = attrs.pop("connectionRef")
+        if othercls:
+          attrname = name[0:-3]
 
-  orphan = False
-  for name in attrs.keys():
-    rename = schemaReservedWordsMap.get(name)
+          try:
+            attrs[attrname] = model.getObject(othercls, id, self.broker)
+          except KeyError:
+            log.info("Referenced object %s '%s' not found by key '%s'" % (clsname, id, attrname))
+          except mint.ObjectNotFound:
+            if not orphan:
+              log.info("Referenced object %s '%s' not found, deferring creation of orphan object" % (clsname, id))
+              # store object in orphan map, will be picked up later when parent info is received
+              if (clsname, id.first, id.second) not in model.orphanObjectMap:
+                model.orphanObjectMap[(clsname, id.first, id.second)] = set()
+              model.orphanObjectMap[(clsname, id.first, id.second)].add(self)
+              orphan = True
+        else:
+          log.error("Class '%s' not found" % clsname)
+      elif not hasattr(cls, orig_name):
+        # Remove attrs that we don't have in our schema
+        log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
+        del attrs[name]
+      #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
+      elif name in ("DaemonStartTime", "EnteredCurrentActivity", "EnteredCurrentState", "JobStart", 
+                    "LastBenchmark", "LastFetchWorkCompleted", "LastFetchWorkSpawned", "LastPeriodicCheckpoint", 
+                    "MyCurrentTime", "QDate", "JobQueueBirthdate", "MonitorSelfTime") \
+                    and (type(attrs[name]) is types.LongType or type(attrs[name]) is types.IntType or attrs[name] == 0):
+        attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
+      elif name.endswith("Time") and type(attrs[name]) is types.IntType and attrs[name] == 0:
+        attrs[name] = datetime.fromtimestamp(attrs[name])
+      #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
+      elif isinstance(attrs[name], UUID):
+        # convert UUIDs into their string representation, to be handled by sqlobject
+        attrs[name] = str(attrs[name])
+    if orphan:
+      return None
+    else:
+      return attrs
 
-    if rename:
-      attrs[rename] = attrs.pop(name)
-      name = rename
 
-    if len(name) > 3 and name.endswith("Ref"):
-      # Navigate to referenced objects
+class PropertyUpdate(ModelUpdate):
+  def __init__(self, broker, obj):
+    ModelUpdate.__init__(self, broker, obj)
 
-      clsname = name[0].upper() + name[1:-3]
-      id = attrs.pop(name)
-
-      othercls = getattr(mint, clsname, None)
-
-      if othercls:
-        attrname = name[0:-3]
-
-        try:
-          attrs[attrname] = conn.getObject(othercls, id)
-        except KeyError:
-          log.info("Referenced object %s '%s' not found by key '%s'" % (clsname, id, attrname))
-        except mint.ObjectNotFound:
-          if not orphan:
-            log.info("Referenced object %s '%s' not found, deferring creation of orphan object" % (clsname, id))
-            # store object in orphan map, will be picked up later when parent info is received
-            if (clsname, id.first, id.second) not in model.orphanObjectMap:
-              model.orphanObjectMap[(clsname, id.first, id.second)] = set()
-            model.orphanObjectMap[(clsname, id.first, id.second)].add(updateObj)
-            orphan = True
-      else:
-        log.error("Class '%s' not found" % clsname)
-    elif not hasattr(cls, name):
-      # Remove attrs that we don't have in our schema
-      log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
-      del attrs[name]
-    #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
-    elif name in ("DaemonStartTime", "EnteredCurrentActivity", "EnteredCurrentState", "JobStart", 
-                  "LastBenchmark", "LastFetchWorkCompleted", "LastFetchWorkSpawned", "LastPeriodicCheckpoint", 
-                  "MyCurrentTime", "QDate", "JobQueueBirthdate", "MonitorSelfTime") \
-                   and (type(attrs[name]) is types.LongType or type(attrs[name]) is types.IntType or attrs[name] == 0):
-      attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
-    elif name.endswith("Time") and type(attrs[name]) is types.IntType and attrs[name] == 0:
-      attrs[name] = datetime.fromtimestamp(attrs[name])
-    #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
-  if orphan:
-    return None
-  else:
-    return attrs
-
-def getStatsClass(cls):
-  return getattr(mint, cls.__name__ + "Stats")
-
-class SchemaUpdate(object):
-  def __init__(self, conn, classInfo, props, stats, methods, events):
-    self.conn = conn
-    self.classInfo = classInfo
-    self.props = props
-    self.stats = stats
-    self.methods = methods
-    self.events = events
-
   def process(self, model):
-    cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
-    log.info("Processing %-8s %-16s %-16s" % ("schema", self.conn.id, cls))
-
     try:
-      pkg, cls = unmarshalClassInfo(self.classInfo)
-      cls.classInfos[self.conn.id] = self.classInfo
-    except UnknownClassException, e:
-      log.warn(e)
-      return
+      cls = self.qmfObj.getSchema().getKey()[1]
+      if cls in mint.schema.schemaReservedWordsMap:
+        cls = mint.schema.schemaReservedWordsMap.get(cls)
+      cls = eval(cls[0].upper()+cls[1:])
+      attrs = dict(self.qmfObj.getProperties())
+      timestamps = self.qmfObj.getTimestamps()
+      id = self.qmfObj.getObjectId()
 
-    # XXX do more schema checking
+      if self.processAttributes(attrs, cls, model) == None:
+        # object is orphan, a parent dependency was not found; 
+        # insertion in db is deferred until parent info is received
+        return 
 
-class PropertyUpdate(object):
-  def __init__(self, conn, classInfo, props, timestamps):
-    self.conn = conn
-    self.classInfo = classInfo
-    self.props = props
-    self.timestamps = timestamps
+      obj = None
+      try:
+        obj = model.getObject(cls, id, self.broker)
+      except mint.ObjectNotFound:
+        obj = cls()
+        log.debug("%s(%i) created", cls.__name__, obj.id)
+        attrs["sourceScopeId"] = id.first
+        attrs["sourceObjectId"] = id.second
+        pkg, cls, hash = self.qmfObj.getClassKey()
+        attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, cls, hash)
+        attrs["managedBroker"] = str(self.broker.getBrokerId())
+      except Exception, e:
+        print e
 
-  def process(self, model):
-    cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
-    args = ("props", self.conn.id, cls, len(self.props))
-    log.info("Processing %-8s %-16s %-16s %3i" % args)
+      attrs["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
+      attrs["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
+      if timestamps[2] != 0:
+        attrs["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
+        log.debug("%s(%i) marked deleted", cls, obj.id)
 
-    try:
-      pkg, cls = unmarshalClassInfo(self.classInfo)
-    except UnknownClassException, e:
-      log.warn(e)
-      return
+      obj.set(**attrs)
+      obj.syncUpdate()
 
-    attrs = dict(self.props)
+      if (cls, id.first, id.second) in model.orphanObjectMap:
+        # this object is the parent of orphan objects in the map, re-enqueue for insertion
+        orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
+        for orphanObj in orphanObjects:
+          model.updateThread.enqueue(orphanObj)
+        log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
 
-    id = attrs["id"]
-    if processAttributes(self.conn, attrs, cls, model, self) == None:
-      # object is orphan, a parent dependency was not found; 
-      # insertion in db is deferred until parent info is received
-      return 
+      if cls == "broker":
+       if str(self.broker.getBrokerId()) in model.managedBrokers:
+        model.managedBrokers[str(self.broker.getBrokerId())].broker = self.broker
+        reg = mint.BrokerRegistration.selectBy(url=self.broker.getFullUrl())[0]
+        reg.broker = obj
+        reg.syncUpdate()
 
-    # XXX move these down to the try/except
+    except:
+      print_exc()
 
-    attrs["managedBroker"] = self.conn.id
+    attrs["managedBroker"] = str(self.broker.getBrokerId())
     attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
-        (self.timestamps[0]/1000000000))
+        (timestamps[0]/1000000000))
     attrs["creationTime"] = datetime.fromtimestamp \
-        (self.timestamps[1]/1000000000)
+        (timestamps[1]/1000000000)
 
-    if self.timestamps[2] != 0:
-      attrs["deletionTime"] = datetime.fromtimestamp \
-          (self.timestamps[2]/1000000000)
 
-    try:
-      obj = self.conn.getObject(cls, id)
-    except mint.ObjectNotFound:
-      obj = cls()
+class StatisticUpdate(ModelUpdate):
+  def __init__(self, broker, obj):
+    ModelUpdate.__init__(self, broker, obj)
 
-      log.debug("%s(%i) created", cls.__name__, obj.id)
-
-      attrs["sourceScopeId"] = id.first
-      attrs["sourceObjectId"] = id.second
-
-      #hash = "%08x%08x%08x%08x" % unpack("!LLLL", self.classInfo[2])
-      #classInfo = (self.classInfo[0], self.classInfo[1], hash)
-      #obj.sourceClassInfo = ",".join(classInfo)
-
-    obj.set(**attrs)
-    obj.syncUpdate()
-
-    if obj.deletionTime:
-      log.debug("%s(%i) marked deleted", cls.__name__, obj.id)
-
-    if (cls.__name__, id.first, id.second) in model.orphanObjectMap:
-      # this object is the parent of orphan objects in the map, re-enqueue for insertion
-      orphanObjects = model.orphanObjectMap.pop((cls.__name__, id.first, id.second))
-      for orphanObj in orphanObjects:
-        model.updateThread.enqueue(orphanObj)
-      log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
-
-    # XXX refactor this to take advantage of the get/create logic
-    # above
-    if isinstance(obj, mint.Broker) and obj.managedBroker:
-      host, port = obj.managedBroker.split(":")
-      port = int(port)
-
-      if not obj.registration:
-        try:
-          reg = mint.BrokerRegistration.selectBy(host=host, port=port)[0]
-        except IndexError:
-          reg = None
-
-        if reg:
-          reg.broker = obj
-          obj.registration = reg
-
-          reg.syncUpdate()
-          obj.syncUpdate()
-
-class StatisticUpdate(object):
-  def __init__(self, conn, classInfo, stats, timestamps):
-    self.conn = conn
-    self.classInfo = classInfo
-    self.stats = stats
-    self.timestamps = timestamps
-
   def process(self, model):
-    cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
-    args = ("stats", self.conn.id, cls, len(self.stats))
-    log.info("Processing %-8s %-16s %-16s %3i" % args)
-
     try:
-      pkg, cls = unmarshalClassInfo(self.classInfo)
-    except UnknownClassException, e:
-      log.warn(e)
-      return
+      cls = self.qmfObj.getSchema().getKey()[1]
+      if cls in mint.schema.schemaReservedWordsMap:
+        cls = mint.schema.schemaReservedWordsMap.get(cls)
+      cls = eval(cls[0].upper()+cls[1:])
+      attrs = dict(self.qmfObj.getStatistics())
+      timestamps = self.qmfObj.getTimestamps()
+      id = self.qmfObj.getObjectId()
 
-    attrs = dict(self.stats)
+      obj = None
+      try:
+        obj = model.getObject(cls, id, self.broker)
+      except mint.ObjectNotFound:
+        # handle this
+        raise mint.ObjectNotFound
 
-    id = attrs["id"]
-    obj = self.conn.getObject(cls, id)
 
-    statscls = getStatsClass(cls)
-    if processAttributes(self.conn, attrs, statscls, model, self) == None:
-      # object is orphan, a parent dependency was not found; 
-      # insertion in db is deferred until parent info is received
-      return 
+      statscls = self.getStatsClass(cls)
+      if self.processAttributes(attrs, statscls, model) == None:
+        # object is orphan, a parent dependency was not found; 
+        # insertion in db is deferred until parent info is received
+        return 
 
-    attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
-                                   (self.timestamps[0]/1000000000))
-    # Set the stat->obj reference
-    attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+      attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
+                                   (timestamps[0]/1000000000))
 
-    statsobj = statscls()
-    statsobj.set(**attrs)
-    statsobj.syncUpdate()
+      # Set the stat->obj reference
+      attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+      statsobj = statscls()
+      statsobj.set(**attrs)
+      statsobj.syncUpdate()
 
-    # XXX not sure if this should happen here.  makes more sense in
-    # prop update
-    if self.timestamps[2] != 0:
-      obj.deletionTime = datetime.fromtimestamp(self.timestamps[2]/1000000000)
+      obj.statsPrev = obj.statsCurr
+      obj.statsCurr = statsobj
+      obj.syncUpdate()
 
-    obj.statsPrev = obj.statsCurr
-    obj.statsCurr = statsobj
-    obj.syncUpdate()
+    except:
+      print_exc()
 
-class MethodUpdate(object):
-  def __init__(self, conn, methodId, errorCode, errorText, args):
-    self.conn = conn
-    self.methodId = methodId
-    self.errorCode = errorCode
-    self.errorText = errorText
-    self.args = args
 
-  def process(self, model):
-    logArgs = ("method", self.conn.id, self.methodId, self.errorCode,
-            self.errorText)
-    log.info("Processing %-8s %-16s %-12s %-12s %s" % logArgs)
+class MethodUpdate(ModelUpdate):
+  def __init__(self, broker, seq, response):
+    ModelUpdate.__init__(self, broker, response)
+    self.seq = seq
 
-    model.lock()
-    try:
-      method = model.outstandingMethodCalls.pop(self.methodId)
-      method(self.errorText, self.args)
-    finally:
-      model.unlock()
-
-class CloseUpdate(object):
-  def __init__(self, conn, data):
-    self.conn = conn
-    self.data = data
-
   def process(self, model):
-    log.info("Processing %-8s %-16s" % ("close", self.conn.id))
-
     model.lock()
     try:
-      del model.connections[self.conn.id]
-
-      if model.connCloseListener:
-        model.connCloseListener(self.conn, self.data)
+      methodCallback = model.outstandingMethodCalls.pop(self.seq)
+      methodCallback(self.qmfObj.text, self.qmfObj.outArgs)
     finally:
       model.unlock()
-
-class ControlUpdate(object):
-  __types = {
-    managementClient.CTRL_BROKER_INFO: "broker_info",
-    managementClient.CTRL_SCHEMA_LOADED: "schema_loaded",
-    managementClient.CTRL_USER: "user",
-    managementClient.CTRL_HEARTBEAT: "heartbeat"
-    }
-
-  def __init__(self, conn, typeCode, data):
-    self.conn = conn
-    self.typeCode = typeCode
-    self.data = data
-
-  def process(self, model):
-    type = self.__types.get(self.typeCode, "[unknown]")
-    args = ("control", self.conn.id, type, self.data)
-    log.info("Processing %-8s %-16s %-16s %s" % args)
-
-    if self.typeCode == managementClient.CTRL_BROKER_INFO:
-      uuid = "%08x-%04x-%04x-%04x-%04x%08x" % unpack \
-          ("!LHHHHL", self.data.brokerId)
-
-      log.info("Broker ID from %s is '%s'" % (self.conn.id, uuid))
-      log.info("Session ID from %s is '%s'" % \
-                 (self.conn.id, self.data.sessionId))
-
-      self.conn.brokerId = uuid
-      self.conn.sessionId = self.data.sessionId
-

Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/sql/schema.sql	2008-11-14 20:12:00 UTC (rev 2805)
@@ -23,13 +23,12 @@
 CREATE TABLE broker_registration (
     id SERIAL PRIMARY KEY,
     name VARCHAR(1000) NOT NULL UNIQUE,
-    host VARCHAR(1000) NOT NULL,
-    port INT NOT NULL,
+    url VARCHAR(1000),
     broker_id INT,
     cluster_id INT,
     profile_id INT
 );
-CREATE UNIQUE INDEX broker_registration_host_port_unique ON broker_registration (host, port);
+CREATE UNIQUE INDEX broker_registration_url_unique ON broker_registration (url);
 
 CREATE TABLE collector_registration (
     id SERIAL PRIMARY KEY,
@@ -37,13 +36,6 @@
     collector_id VARCHAR(1000)
 );
 
-CREATE TABLE config_property (
-    id SERIAL PRIMARY KEY,
-    name VARCHAR(1000),
-    value VARCHAR(1000),
-    type VARCHAR(1)
-);
-
 CREATE TABLE mint_info (
     id SERIAL PRIMARY KEY,
     version VARCHAR(1000) NOT NULL
@@ -75,6 +67,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -99,6 +92,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -123,6 +117,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -147,6 +142,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -176,6 +172,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -204,6 +201,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -233,6 +231,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -258,6 +257,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -282,6 +282,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -317,6 +318,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -363,6 +365,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -420,6 +423,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -445,6 +449,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -479,6 +484,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -516,6 +522,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -567,6 +574,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -608,6 +616,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -638,6 +647,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -747,6 +757,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -785,6 +796,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -811,6 +823,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -835,6 +848,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
+    qmf_class_key BYTEA,
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),

Modified: mgmt/trunk/parsley/python/parsley/command.py
===================================================================
--- mgmt/trunk/parsley/python/parsley/command.py	2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/parsley/python/parsley/command.py	2008-11-14 20:12:00 UTC (rev 2805)
@@ -12,6 +12,9 @@
         self.commands = list()
         self.commands_by_name = dict()
 
+        opt = CommandOption(self, "help", "h")
+        opt.description = "Print this message"
+
         if self.parent:
             self.parent.commands.append(self)
             self.parent.commands_by_name[self.name] = self




More information about the rhmessaging-commits mailing list