[rhmessaging-commits] rhmessaging commits: r2157 - in mgmt: cumin/python/cumin and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Jun 24 11:56:25 EDT 2008


Author: justi9
Date: 2008-06-24 11:56:25 -0400 (Tue, 24 Jun 2008)
New Revision: 2157

Added:
   mgmt/mint/python/mint/update.py
Modified:
   mgmt/cumin/bin/cumin
   mgmt/cumin/bin/cumin-bench
   mgmt/cumin/bin/cumin-test
   mgmt/cumin/python/cumin/__init__.py
   mgmt/cumin/python/cumin/model.py
   mgmt/mint/bin/mint-test
   mgmt/mint/python/mint/__init__.py
Log:
Moved all database updates to a distinct update queue and worker
thread, to simplify locking.

Changed mint-test to work on multiple broker connections.

Now that we have worker threads, introduce app start/stop methods to
the command line entry points.



Modified: mgmt/cumin/bin/cumin
===================================================================
--- mgmt/cumin/bin/cumin	2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin	2008-06-24 15:56:25 UTC (rev 2157)
@@ -42,11 +42,15 @@
             sys.exit(1)
 
     try:
-        server.start()
-    except:
-        server.stop()
-        raise
+        app.start()
 
+        try:
+            server.start()
+        finally:
+            server.stop()
+    finally:
+        app.stop()
+
 def main():
     config = CuminConfig()
 

Modified: mgmt/cumin/bin/cumin-bench
===================================================================
--- mgmt/cumin/bin/cumin-bench	2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin-bench	2008-06-24 15:56:25 UTC (rev 2157)
@@ -26,7 +26,12 @@
     harness = BenchmarkHarness(app)
 
     try:
-        harness.run(hits)
+        try:
+            app.start()
+
+            harness.run(hits)
+        finally:
+            app.stop()
     except KeyboardInterrupt:
         pass
 

Modified: mgmt/cumin/bin/cumin-test
===================================================================
--- mgmt/cumin/bin/cumin-test	2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin-test	2008-06-24 15:56:25 UTC (rev 2157)
@@ -28,8 +28,12 @@
     env = TestEnvironment(app, host, port, config.spec)
     env.init();
 
-    session = env.run_test(MainTest(env))
-    session.report(sys.stdout)
+    app.start()
+    try:
+        session = env.run_test(MainTest(env))
+        session.report(sys.stdout)
+    finally:
+        app.stop()
 
 def main():
     config = CuminConfig()

Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py	2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/python/cumin/__init__.py	2008-06-24 15:56:25 UTC (rev 2157)
@@ -58,8 +58,14 @@
 
     def init(self):
         self.model.init()
+
+    def start(self):
+        self.model.start()
         self.broker_connect_thread.start()
 
+    def stop(self):
+        self.model.stop()
+
 class BrokerConnectThread(Thread):
     log = logging.getLogger("cumin.mgmt.conn")
 

Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py	2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/python/cumin/model.py	2008-06-24 15:56:25 UTC (rev 2157)
@@ -40,6 +40,12 @@
 
     def init(self):
         self.data.init()
+
+    def start(self):
+        self.data.start()
+
+    def stop(self):
+        self.data.stop()
         
     def add_class(self, cls):
         self.classes.append(cls)

Modified: mgmt/mint/bin/mint-test
===================================================================
--- mgmt/mint/bin/mint-test	2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/mint/bin/mint-test	2008-06-24 15:56:25 UTC (rev 2157)
@@ -1,30 +1,51 @@
 #!/usr/bin/env python
 
-import sys, os
+import sys, os, logging
 from time import sleep
 
 from mint import *
 
 def usage():
-    print "Usage: mint-test [DATABASE-URI] [BROKER-ADDRESS]"
+    print "Usage: mint-test [DATABASE-URI] [BROKER-ADDRESSES...]"
     print "Example: mint-test postgresql://cumin@localhost/cumin localhost:5672"
     sys.exit(1)
 
-def do_main(uri, spec, host, port):
-    model = MintModel(uri, spec, debug=True)
+def do_main(uri, spec, hostports):
+    model = MintModel(uri, spec, debug=False)
     model.check()
     model.init()
 
-    conn = BrokerConnection(model, host, port)
-    conn.open()
+    conns = list()
 
+    for host, port in hostports:
+        conn = BrokerConnection(model, host, port)
+        conns.append(conn)
+        
+    model.start()
     try:
-        while (True):
-            sleep(5)
+        try:
+            for conn in conns:
+                conn.open()
+            
+            while True:
+                sleep(5)
+        finally:
+            for conn in conns:
+                try:
+                    conn.close()
+                except:
+                    pass
     finally:
-        conn.close()
+        model.stop()
 
 def main():
+    root = logging.getLogger("mint")
+    root.setLevel(logging.DEBUG)
+
+    h = logging.StreamHandler()
+    h.setLevel(logging.DEBUG)
+    root.addHandler(h)
+
     if "-h" in sys.argv or "--help" in sys.argv:
         usage()
 
@@ -33,17 +54,29 @@
     except IndexError:
         uri = "postgresql://cumin@localhost/cumin"
 
-    try:
-        addr = sys.argv[2]
-    except IndexError:
-        addr = "localhost:5672"
+    if uri == "-":
+        uri = "postgresql://cumin@localhost/cumin"
 
+    addrs = sys.argv[2:]
+
+    if not addrs:
+        addrs = ["localhost:5672"]
+
+    hostports = list()
+
+    for addr in addrs:
+        try:
+            host, port = addr.split(":")
+            hostports.append((host, int(port)))
+        except ValueError, e:
+            print "Error: Cannot parse '%s': %s" % (addr, e)
+            sys.exit(1)
+
     spec = os.environ.get("AMQP_SPEC",
-        os.path.normpath("/usr/share/amqp/amqp.0-10.xml"))
-    host, port = addr.split(":")
+                          os.path.normpath("/usr/share/amqp/amqp.0-10.xml"))
 
     try:
-        do_main(uri, spec, host, int(port))
+        do_main(uri, spec, hostports)
     except KeyboardInterrupt:
         pass
 

Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py	2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/mint/python/mint/__init__.py	2008-06-24 15:56:25 UTC (rev 2157)
@@ -7,7 +7,9 @@
 from sqlobject import *
 from threading import Lock, RLock
 from traceback import print_exc, print_stack
+
 from mint import schema
+from mint import update
 
 log = logging.getLogger("mint")
 
@@ -117,87 +119,19 @@
   lastChallenged = TimestampCol(default=None)
   lastLoggedOut = TimestampCol(default=None)
 
-class OriginalIdDict:
-  def __init__(self):
-    self.idMap = dict()
-    self.lock = Lock()
+class ObjectNotFound(Exception):
+  pass
 
-  def set(self, idOriginal, obj):
-    self.lock.acquire()
-    try:
-      self.idMap[idOriginal] = obj
-    finally:
-      self.lock.release()
-
-  def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
-    obj = None
-
-    self.lock.acquire()
-    try:
-      obj = self.doGetByOriginalId(objType, idOriginal, managedBroker, create, args)
-    finally:
-      self.lock.release()
-
-    return obj
-
-  def doGetByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
-    obj = None
-    key = (managedBroker, idOriginal)
-    if (key in self.idMap):
-      #print "\n\n=============== %s %d found\n\n" % (objType.__name__, idOriginal)
-      obj = self.idMap[key]
-    else:
-      try:
-        obj = eval("objType.selectBy(idOriginal=idOriginal, managedBroker=managedBroker)[:1][0]")
-        self.idMap[key] = obj
-      except:
-        if (create):
-          #print "\n\n=============== %s %d NOT found, creating\n\n" % (objType.__name__, idOriginal)
-          obj = objType.__new__(objType)
-          obj.__init__()
-          obj.syncUpdate()
-          self.idMap[key] = obj
-        else:
-          #print "\n\n=============== %s %d NOT found, NOT creating\n\n" % (objType.__name__, idOriginal)
-          pass
-      else:
-        #print "\n\n=============== %s %d found AFTER QUERY\n\n" % (objType.__name__, idOriginal)
-        pass
-
-    if isinstance(obj, Broker) and obj.managedBroker:
-      host, port = obj.managedBroker.split(":")
-      port = int(port)
-
-      if not obj.registration:
-        try:
-          reg = BrokerRegistration.selectBy(host=host, port=port)[0]
-        except IndexError:
-          reg = None
-
-        if reg:
-          reg.broker = obj
-          obj.registration = reg
-
-          reg.syncUpdate()
-          obj.syncUpdate()
-
-    return obj
-
-  def getByIndexAttrib(self, objType, indexAttrib, indexValue, create=False, args={}):
-    ###FIX
-    return None
-
 # Not thread safe
 class BrokerConnection(object):
   def __init__(self, model, host, port):
     self.model = model
     self.host = host
     self.port = port
-    self.key = "%s:%i" % (host, port)
-    self.objs = OriginalIdDict()
+    self.id = "%s:%i" % (host, port)
+    self.objectsById = dict()
 
-    # state in (None, "opening", "opened", "open_failed",
-    #                 "closing", "closed", "close_failed")
+    # state in (None, "opening", "opened", "closing", "closed")
     self.state = None
     self.exception = None
 
@@ -205,6 +139,18 @@
     self.mclient = None
     self.mchan = None
 
+  def getObject(self, cls, id):
+    try:
+      obj = self.objectsById[id]
+    except KeyError:
+      try:
+        obj = cls.selectBy(idOriginal=id, managedBroker=self.id)[0]
+        self.objectsById[id] = obj
+      except IndexError:
+        raise ObjectNotFound()
+
+    return obj
+
   def isOpen(self):
     return self.state == "opened"
 
@@ -220,9 +166,8 @@
     try:
       sock = connect(self.host, self.port)
     except Exception, e:
-      self.state = "open_failed"
       self.exception = e
-      return
+      raise e
 
     self.conn = QpidConnection(sock, spec)
     self.mclient = managementClient(spec, 
@@ -239,11 +184,11 @@
         # XXX I want this to happen after broker start, but the
         # callbacks rely on the broker being in the connectedBrokers
         # dict
-        self.model.connections[self.key] = self
+        self.model.connections[self.id] = self
 
         self.conn.start()
         self.mchan = self.mclient.addChannel(self.conn.session(str(uuid4())),
-                                             self.key)
+                                             self.id)
 
         self.state = "opened"
       except Exception, e:
@@ -281,7 +226,7 @@
       try:
         self.mclient.removeChannel(self.mchan)
 
-        del self.model.connections[self.key]
+        del self.model.connections[self.id]
 
         self.state = "closed"
       except Exception, e:
@@ -297,21 +242,6 @@
     self.mclient = None
     self.mchan = None
 
-  def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
-    self.model.lock()
-    try:
-      return self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
-    finally:
-      self.model.unlock()
-
-  def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
-                       create=False, args={}):
-    self.model.lock()
-    try:
-      return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
-    finally:
-      self.model.unlock()
-
 class MintModel:
   staticInstance = None
 
@@ -326,6 +256,8 @@
     self.connCloseListener = None
     self.__lock = RLock()
 
+    self.updateThread = update.ModelUpdateThread(self)
+
     assert MintModel.staticInstance is None
     MintModel.staticInstance = self
 
@@ -352,6 +284,12 @@
     conn = connectionForURI(self.dataUri)
     sqlhub.processConnection = conn
 
+  def start(self):
+    self.updateThread.start()
+
+  def stop(self):
+    self.updateThread.stop()
+
   def setDebug(self, debug=True):
     self.debug = debug
 
@@ -359,219 +297,34 @@
     if (self.debug):
       print message
     
-  def sanitizeDict(self, d):
-    if ("id" in d):
-      d[self.convertIdKey("id")] = d.pop("id")
-    if ("connectionRef" in d):
-      d["clientConnectionRef"] = d.pop("connectionRef")
-    #XXX FIX -- fix handling of field tables
-    if ("arguments" in d):
-      d.pop("arguments")
-    #XXX FIX -- fix handling of field tables
-    return d
-
-  def convertIdKey(self, k):
-    return "idOriginal"
-
-  def convertRefKey(self, k):
-    result = k.replace("Ref", "")
-    result = result[0].lower() + result[1:]
-    return result
-
-  def findParentKeys(self, d):
-    keys = []
-    for key in d.keys():
-      if (key.endswith("Ref")):
-        keys.append(key)
-    return keys
-
-  def fixClassInfo(self, classInfo):
-    objectName = self.initialCapital(classInfo[1])
-    if (objectName == "Connection"):
-      objectName = "ClientConnection"
-    return objectName
-
-  def initialCapital(self, string):
-    return string[0].upper() + string[1:]
-
   def setCloseListener(self, connCloseListener):
     self.connCloseListener = connCloseListener
 
-  def schemaCallback(self, brokerId, classInfo, configs, metric, methods, events):
-    self.log("\nSCHEMA---------------------------------------------------")
-    objectName = self.fixClassInfo(classInfo)
-    self.log("BrokerId=%s , objectName=%s" % (brokerId, objectName))
-
+  def getConnection(self, id):
     self.lock()
     try:
-      cls = schema.schemaNameToClassMap.get(objectName)
-      if cls:
-        cls.classInfos[brokerId] = classInfo
-    finally:
-      self.unlock()
-
-    self.log("\nEND SCHEMA---------------------------------------------------")
-
-  def configCallback(self, brokerId, classInfo, list, timestamps):
-    self.log("\nCONFIG---------------------------------------------------")
-    objectName = self.fixClassInfo(classInfo)
-    brokerUUID = classInfo[2]
-    self.log(objectName)
-    d = self.sanitizeDict(dict(list))
-
-    self.lock()
-    try:
-      conn = self.connections[brokerId]
-    finally:
-      self.unlock()
-
-    d["managedBroker"] = brokerId
-    d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
-    d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
-    self.log(d)
-    try:
-      for parentKey in self.findParentKeys(d):
-        convertedKey = self.convertRefKey(parentKey)
-
-        self.lock()
-        try:
-          cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
-          if cls:
-            d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
-          else:
-            self.log("Error: referenced class not found: %s" % convertedKey)
-        finally:
-          self.unlock()
-
-      obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], brokerId, create=True)
-      if (not obj):
-        self.log("Couldn't find type %s id %s" % (objectName, d["idOriginal"]))
-        return 
-
-      self.lock()
       try:
-        obj = self.updateObjWithDict(obj, d)
-        if (not obj):
-          return
-      finally:
-        self.unlock()
-
-    except TypeError, detail:
-      self.log("TypeError: Schema mismatch: %s" % detail)
-      return
-    except KeyError, detail:
-      self.log("KeyError: Schema mismatch: %s" % detail)
-      return 
-
-    self.log("END CONFIG---------------------------------------------------\n")
-    return obj
-
-  def instCallback(self, brokerId, classInfo, list, timestamps):
-    self.log("\nINST---------------------------------------------------")
-    objectName = self.fixClassInfo(classInfo)
-    brokerUUID = classInfo[2]
-    self.log(objectName)
-    d = self.sanitizeDict(dict(list))
-
-    self.lock()
-    try:
-      conn = self.connections[brokerId]
+        return self.connections[id]
+      except KeyError:
+        log.error("Connection '%s' not found" % id)
     finally:
       self.unlock()
 
-    d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
-    self.log(d)
+  def schemaCallback(self, brokerId, classInfo, configs, metric, methods, events):
+    conn = self.getConnection(brokerId)
+    up = update.SchemaUpdate(conn, classInfo, configs, metric, methods, events)
+    self.updateThread.enqueue(up)
 
-    try:
-      obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d[self.convertIdKey("id")], brokerId)
-      if (not obj):
-        self.log("Couldn't find type %s id %s" % (objectName, d[self.convertIdKey("id")]))
-        print "lion", classInfo, list
-        return
-      
-      origObjName = objectName[0].lower() + objectName[1:]
-      d[origObjName] = obj
+  def configCallback(self, brokerId, classInfo, props, timestamps):
+    conn = self.getConnection(brokerId)
+    up = update.PropertyUpdate(conn, classInfo, props, timestamps)
+    self.updateThread.enqueue(up)
 
-      self.lock()
-      try:
-        objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
-        objStats = objNameStats.__new__(objNameStats)
-        objStats.__init__()
-      finally:
-        self.unlock()
-      
-      if (not objStats):
-        self.log("Couldn't find type %s id %s" % (objNameStats, d[self.convertIdKey("id")]))
-        return
+  def instCallback(self, brokerId, classInfo, stats, timestamps):
+    conn = self.getConnection(brokerId)
+    up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
+    self.updateThread.enqueue(up)
 
-      self.lock()
-      try:
-        objStats = self.updateObjWithDict(objStats, d)
-        if (not objStats):
-          return
-      finally:
-        self.unlock()
-
-      d = dict()
-      d["statsPrev"] = obj.statsCurr
-      d["statsCurr"] = objStats
-      if (timestamps[2] != 0):
-        d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
-
-      self.lock()
-      try:
-        obj = self.updateObjWithDict(obj, d)
-        if (not obj):
-          return
-      finally:
-        self.unlock()
-
-    except TypeError, detail:
-      self.log("TypeError: Schema mismatch: %s" % detail)
-      return
-    except KeyError, detail:
-      self.log("KeyError: Schema mismatch: %s" % detail)
-      return 
-
-    self.log("END INST---------------------------------------------------\n")
-    return objStats
-
-  def updateObjWithDict(self, obj, d):
-    updateDone = False
-    reattemptCount = 0
-    while not updateDone and len(d) > 0:
-      try:
-        obj.set(**d)
-        obj.syncUpdate()
-        updateDone = True
-        if (reattemptCount > 0):
-          self.log("Reattempts successful")
-      except TypeError, detail:
-        self.log("TypeError: Schema mismatch: %s" % detail)
-        detailString = detail.__str__()
-        errorString = "got an unexpected keyword argument "
-        index = detailString.index(errorString)
-        if (index >= 0):
-          # argument in dict is not in schema, so remove it and re-attempt
-          index += len(errorString)
-          missingAttrib = detailString[index:]
-          self.log("Reattempting without %s attribute" % missingAttrib)
-          d.pop(missingAttrib)
-          reattemptCount += 1
-        else:
-          # can't recover
-          self.log("Non-recoverable schema mismatch, information lost")
-          return None
-      except KeyError, detail:
-        self.log("KeyError: Schema mismatch: %s" % detail)
-        return None
-      except:
-        #TODO: better exception handling here
-        self.log("Unexpected Error: %s" % sys.exc_info()[0])
-        print "Unexpected Error: %s" % sys.exc_info()[0]
-        return obj
-    return obj
-
   def methodCallback(self, brokerId, methodId, errorNo, errorText, args):
     self.log("\nMETHOD---------------------------------------------------")
     self.log("MethodId=%d" % (methodId))

Added: mgmt/mint/python/mint/update.py
===================================================================
--- mgmt/mint/python/mint/update.py	                        (rev 0)
+++ mgmt/mint/python/mint/update.py	2008-06-24 15:56:25 UTC (rev 2157)
@@ -0,0 +1,246 @@
+import logging
+
+from Queue import Queue as ConcurrentQueue, Full, Empty
+from threading import Thread
+from datetime import datetime
+
+import mint
+
+log = logging.getLogger("mint.update")
+
+class ModelUpdateThread(Thread):
+  def __init__(self, model):
+    super(ModelUpdateThread, self).__init__()
+
+    self.model = model
+    self.updates = ConcurrentQueue()
+    self.stopRequested = False
+    self.setDaemon(False)
+
+  def enqueue(self, update):
+    try:
+      self.updates.put(update)
+    except Full:
+      log.warn("Update queue is full")
+
+  def run(self):
+    while True:
+      try:
+        update = self.updates.get(True, 1)
+      except Empty:
+        if self.stopRequested:
+          break
+        else:
+          log.info("Update queue is empty")
+          continue
+
+      try:
+        update.process()
+      except Exception, e:
+        log.error(e)
+
+  def stop(self):
+    self.stopRequested = True
+
+class UnknownClassException(Exception):
+  pass
+
+def unmarshalClassInfo(classInfo):
+  package = classInfo[0]
+  name = classInfo[1].capitalize()
+
+  if name == "Connection":
+    name = "ClientConnection"
+
+  cls = getattr(mint, name)
+
+  if cls is None:
+    raise UnknownClassException("Class '%s' is unknown" % name)
+
+  return package, cls
+
+def processAttributes(conn, attrs, cls):
+  if "id" in attrs:
+    attrs["idOriginal"] = attrs.pop("id")
+
+  if "connectionRef" in attrs:
+    attrs["clientConnectionRef"] = attrs.pop("connectionRef")
+
+  #XXX FIX -- fix handling of field tables
+  if "arguments" in attrs:
+    del attrs["arguments"]
+
+  for name in attrs.keys():
+    if len(name) > 3 and name.endswith("Ref"):
+      # Navigate to referenced objects
+
+      clsname = name[0].upper() + name[1:-3]
+      id = attrs.pop(name)
+
+      othercls = getattr(mint, clsname, None)
+
+      if othercls:
+        attrname = name[0:-3]
+
+        try:
+          attrs[attrname] = conn.getObject(othercls, id)
+        except KeyError:
+          log.info("Referenced object %s '%s' not found" % \
+                     (clsname, id))
+      else:
+        log.error("Class '%s' not found" % clsname)
+    elif not hasattr(cls, name):
+      # Remove attrs that we don't have in our schema
+
+      log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
+
+      del attrs[name]
+
+  return attrs
+
+def getStatsClass(cls):
+  return getattr(mint, cls.__name__ + "Stats")
+
+class SchemaUpdate(object):
+  def __init__(self, conn, classInfo, props, stats, methods, events):
+    self.conn = conn
+    self.classInfo = classInfo
+    self.props = props
+    self.stats = stats
+    self.methods = methods
+    self.events = events
+
+  def process(self):
+    args = ("schema", self.conn.id, self.classInfo[0], self.classInfo[1])
+    log.info("%-8s %-16s %-8s %-12s" % args)
+
+    try:
+      pkg, cls = unmarshalClassInfo(self.classInfo)
+    except UnknownClassException, e:
+      log.warn(e)
+      return
+
+    if cls:
+      cls.classInfos[self.conn.id] = self.classInfo
+
+class PropertyUpdate(object):
+  def __init__(self, conn, classInfo, props, timestamps):
+    self.conn = conn
+    self.classInfo = classInfo
+    self.props = props
+    self.timestamps = timestamps
+
+  def process(self):
+    args = ("props", self.conn.id, self.classInfo[0], self.classInfo[1],
+            len(self.props))
+    log.info("%-8s %-16s %-8s %-12s %i" % args)
+
+    try:
+      pkg, cls = unmarshalClassInfo(self.classInfo)
+    except UnknownClassException, e:
+      log.warn(e)
+      return
+
+    attrs = dict(self.props)
+
+    id = attrs["id"]
+
+    processAttributes(self.conn, attrs, cls)
+
+    attrs["managedBroker"] = self.conn.id
+    attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
+    attrs["creationTime"] = datetime.fromtimestamp \
+        (self.timestamps[1]/1000000000)
+
+    try:
+      obj = self.conn.getObject(cls, id)
+    except mint.ObjectNotFound:
+      obj = cls()
+      obj.set(**attrs)
+      obj.syncUpdate()
+
+    # XXX refactor this to take advantage of the get/create logic
+    # above
+    if isinstance(obj, mint.Broker) and obj.managedBroker:
+      host, port = obj.managedBroker.split(":")
+      port = int(port)
+
+      if not obj.registration:
+        try:
+          reg = mint.BrokerRegistration.selectBy(host=host, port=port)[0]
+        except IndexError:
+          reg = None
+
+        if reg:
+          reg.broker = obj
+          obj.registration = reg
+
+          reg.syncUpdate()
+          obj.syncUpdate()
+
+class StatisticUpdate(object):
+  def __init__(self, conn, classInfo, stats, timestamps):
+    self.conn = conn
+    self.classInfo = classInfo
+    self.stats = stats
+    self.timestamps = timestamps
+
+  def process(self):
+    args = ("stats", self.conn.id, self.classInfo[0], self.classInfo[1],
+            len(self.stats))
+    log.info("%-8s %-16s %-8s %-12s %i" % args)
+
+    try:
+      pkg, cls = unmarshalClassInfo(self.classInfo)
+    except UnknownClassException, e:
+      log.warn(e)
+      return
+
+    attrs = dict(self.stats)
+
+    id = attrs["id"]
+
+    try:
+      obj = self.conn.getObject(cls, id)
+    except ObjectNotFound, e:
+      log.warn(e)
+      return
+
+    statscls = getStatsClass(cls)
+    processAttributes(self.conn, attrs, statscls)
+
+    attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
+
+    # Set the stat->obj reference
+    attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+
+    statsobj = statscls()
+    statsobj.set(**attrs)
+    statsobj.syncUpdate()
+
+    if self.timestamps[2] != 0:
+      obj.deletionTime = datetime.fromtimestamp(self.timestamps[2]/1000000000)
+
+    obj.statsPrev = obj.statsCurr
+    obj.statsCurr = statsobj
+    obj.syncUpdate()
+
+class MethodUpdate(object):
+  def __init__(self, conn, methodId, errorId, errorText, args):
+    self.conn = conn
+    self.methodId = methodId
+    self.errorId = errorId
+    self.errorText = errorText
+    self.args = args
+
+  def process(self):
+    args = ("stats", self.conn.id, self.methodId, self.errorId,
+            self.errorText)
+    log.info("%-8s %-16s %-8s %-8s %s" % args)
+
+class CloseUpdate(object):
+  def __init__(self, brokerId, data):
+    self.brokerId = brokerId
+
+  def process(self):
+    log.info("%-8s %-16s" % ("close", self.brokerId))




More information about the rhmessaging-commits mailing list