[rhmessaging-commits] rhmessaging commits: r2166 - mgmt/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Jun 26 10:34:57 EDT 2008


Author: justi9
Date: 2008-06-26 10:34:56 -0400 (Thu, 26 Jun 2008)
New Revision: 2166

Modified:
   mgmt/mint/python/mint/__init__.py
   mgmt/mint/python/mint/update.py
Log:
Complete the transition to queued updates

 - Pass the model to the process callbacks on update events

 - Rename the callbacks and some arguments to be in line with the new
   schema names

 - Improve update logging

 - Log full stack traces on exception

 - Implement the process method of the method update event

 - Add update handling for control and close events

 - Set the broker uuid and session id on the connection when we
   receive a "broker info" control

 - We were dropping object property updates (as opposed to initial
   property sets); fix that



Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py	2008-06-25 21:18:15 UTC (rev 2165)
+++ mgmt/mint/python/mint/__init__.py	2008-06-26 14:34:56 UTC (rev 2166)
@@ -131,11 +131,15 @@
     self.id = "%s:%i" % (host, port)
     self.objectsById = dict()
 
+    # Set upon receiving a broker info control
+    self.sessionId = None
+    self.brokerId = None
+
     # state in (None, "opening", "opened", "closing", "closed")
     self.state = None
     self.exception = None
 
-    self.conn = None
+    self.mconn = None
     self.mclient = None
     self.mchan = None
 
@@ -155,7 +159,7 @@
     return self.state == "opened"
 
   def open(self):
-    assert self.conn is None
+    assert self.mconn is None
     assert self.mclient is None
     assert self.mchan is None
 
@@ -169,19 +173,19 @@
       self.exception = e
       raise e
 
-    self.conn = QpidConnection(sock, spec)
+    self.mconn = QpidConnection(sock, spec)
     self.mclient = managementClient(spec, 
                                     self.model.controlCallback,
-                                    self.model.configCallback,
-                                    self.model.instCallback,
+                                    self.model.propsCallback,
+                                    self.model.statsCallback,
                                     self.model.methodCallback,
                                     self.model.closeCallback)
     self.mclient.schemaListener(self.model.schemaCallback)
 
     try:
-      self.conn.start()
+      self.mconn.start()
       self.mchan = self.mclient.addChannel \
-          (self.conn.session(str(uuid4())), self)
+          (self.mconn.session(str(uuid4())), self)
 
       self.state = "opened"
     except Exception, e:
@@ -232,10 +236,10 @@
       self.exception = e
       raise e
 
-    self.conn.close()
+    self.mconn.close()
     # XXX What else do I need to try to shutdown here?
 
-    self.conn = None
+    self.mconn = None
     self.mclient = None
     self.mchan = None
 
@@ -287,75 +291,32 @@
   def stop(self):
     self.updateThread.stop()
 
-  def setDebug(self, debug=True):
-    self.debug = debug
-
-  def log(self, message):
-    if (self.debug):
-      print message
-    
   def setCloseListener(self, connCloseListener):
     self.connCloseListener = connCloseListener
 
-  def schemaCallback(self, conn, classInfo, configs, metric, methods, events):
-    up = update.SchemaUpdate(conn, classInfo, configs, metric, methods, events)
+  def schemaCallback(self, conn, classInfo, props, stats, methods, events):
+    up = update.SchemaUpdate(conn, classInfo, props, stats, methods, events)
     self.updateThread.enqueue(up)
 
-  def configCallback(self, conn, classInfo, props, timestamps):
+  def propsCallback(self, conn, classInfo, props, timestamps):
     up = update.PropertyUpdate(conn, classInfo, props, timestamps)
     self.updateThread.enqueue(up)
 
-  def instCallback(self, conn, classInfo, stats, timestamps):
+  def statsCallback(self, conn, classInfo, stats, timestamps):
     up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
     self.updateThread.enqueue(up)
 
-  def methodCallback(self, conn, methodId, errorNo, errorText, args):
-    self.log("\nMETHOD---------------------------------------------------")
-    self.log("MethodId=%d" % (methodId))
-    self.log("Error: %d %s" % (errorNo, errorText))
-    self.log("Args: ")
-    self.log(args)
-
-    self.lock()
-    try:
-      method = self.outstandingMethodCalls.pop(methodId)
-      result = method(errorText, args)
-    finally:
-      self.unlock()
-
-    self.log("END METHOD---------------------------------------------------\n")
-    return result
+  def methodCallback(self, conn, methodId, errorId, errorText, args):
+    up = update.MethodUpdate(conn, methodId, errorId, errorText, args)
+    self.updateThread.enqueue(up)
   
   def closeCallback(self, conn, data):
-    self.log("\nCLOSE---------------------------------------------------")
-    self.log("BrokerId=%s , Data=%s" % (conn.id, data))
-
-    self.lock()
-    try:
-      del self.connections[conn.id]
-
-      if (self.connCloseListener != None):
-        self.connCloseListener(conn, data)
-    finally:
-      self.unlock()
-
-    self.log("END CLOSE---------------------------------------------------\n")
-    return
+    up = update.CloseCallback(conn, data)
+    self.updateThread.enqueue(up)
     
   def controlCallback(self, conn, type, data):
-    self.log("\nCONTROL---------------------------------------------------")
-    readableType = "UNKNOWN"
-    if (type == managementClient.CTRL_BROKER_INFO): 
-      readableType = "CTRL_BROKER_INFO"
-    elif (type == managementClient.CTRL_SCHEMA_LOADED):
-      readableType = "CTRL_SCHEMA_LOADED"
-    elif (type == managementClient.CTRL_USER):
-      readableType = "CTRL_USER"
-    elif (type == managementClient.CTRL_HEARTBEAT):
-      readableType = "CTRL_HEARTBEAT"
-    self.log("BrokerId=%s , Type=%s, Data=%s" % (conn.id, readableType, data))
-    self.log("END CONTROL---------------------------------------------------\n")
-    return
+    up = update.ControlUpdate(conn, type, data)
+    self.updateThread.enqueue(up)
 
   def registerCallback(self, callback):
     self.lock()

Modified: mgmt/mint/python/mint/update.py
===================================================================
--- mgmt/mint/python/mint/update.py	2008-06-25 21:18:15 UTC (rev 2165)
+++ mgmt/mint/python/mint/update.py	2008-06-26 14:34:56 UTC (rev 2166)
@@ -3,6 +3,8 @@
 from Queue import Queue as ConcurrentQueue, Full, Empty
 from threading import Thread
 from datetime import datetime
+from qpid.management import managementClient
+from struct import unpack
 
 import mint
 
@@ -21,7 +23,7 @@
     try:
       self.updates.put(update)
     except Full:
-      log.warn("Update queue is full")
+      log.exception("Queue is full")
 
   def run(self):
     while True:
@@ -31,13 +33,13 @@
         if self.stopRequested:
           break
         else:
-          log.info("Update queue is empty")
+          log.debug("Queue is empty")
           continue
 
       try:
-        update.process()
-      except Exception, e:
-        log.error(e)
+        update.process(self.model)
+      except:
+        log.exception("Update failed")
 
   def stop(self):
     self.stopRequested = True
@@ -110,18 +112,18 @@
     self.methods = methods
     self.events = events
 
-  def process(self):
-    args = ("schema", self.conn.id, self.classInfo[0], self.classInfo[1])
-    log.info("%-8s %-16s %-8s %-12s" % args)
+  def process(self, model):
+    cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+    log.info("Processing %-8s %-16s %-16s" % ("schema", self.conn.id, cls))
 
     try:
       pkg, cls = unmarshalClassInfo(self.classInfo)
+      cls.classInfos[self.conn.id] = self.classInfo
     except UnknownClassException, e:
       log.warn(e)
       return
 
-    if cls:
-      cls.classInfos[self.conn.id] = self.classInfo
+    # XXX do more schema checking
 
 class PropertyUpdate(object):
   def __init__(self, conn, classInfo, props, timestamps):
@@ -130,10 +132,10 @@
     self.props = props
     self.timestamps = timestamps
 
-  def process(self):
-    args = ("props", self.conn.id, self.classInfo[0], self.classInfo[1],
-            len(self.props))
-    log.info("%-8s %-16s %-8s %-12s %i" % args)
+  def process(self, model):
+    cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+    args = ("props", self.conn.id, cls, len(self.props))
+    log.info("Processing %-8s %-16s %-16s %3i" % args)
 
     try:
       pkg, cls = unmarshalClassInfo(self.classInfo)
@@ -147,6 +149,8 @@
 
     processAttributes(self.conn, attrs, cls)
 
+    # XXX move these down to the try/except
+
     attrs["managedBroker"] = self.conn.id
     attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
     attrs["creationTime"] = datetime.fromtimestamp \
@@ -156,9 +160,17 @@
       obj = self.conn.getObject(cls, id)
     except mint.ObjectNotFound:
       obj = cls()
-      obj.set(**attrs)
-      obj.syncUpdate()
 
+      #obj.sourceId = id
+      #obj.sourceBrokerId = self.conn.brokerId
+      
+      #hash = "%08x%08x%08x%08x" % unpack("!LLLL", self.classInfo[2])
+      #classInfo = (self.classInfo[0], self.classInfo[1], hash)
+      #obj.sourceClassInfo = ",".join(classInfo)
+
+    obj.set(**attrs)
+    obj.syncUpdate()
+
     # XXX refactor this to take advantage of the get/create logic
     # above
     if isinstance(obj, mint.Broker) and obj.managedBroker:
@@ -185,10 +197,10 @@
     self.stats = stats
     self.timestamps = timestamps
 
-  def process(self):
-    args = ("stats", self.conn.id, self.classInfo[0], self.classInfo[1],
-            len(self.stats))
-    log.info("%-8s %-16s %-8s %-12s %i" % args)
+  def process(self, model):
+    cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+    args = ("stats", self.conn.id, cls, len(self.stats))
+    log.info("Processing %-8s %-16s %-16s %3i" % args)
 
     try:
       pkg, cls = unmarshalClassInfo(self.classInfo)
@@ -199,13 +211,8 @@
     attrs = dict(self.stats)
 
     id = attrs["id"]
+    obj = self.conn.getObject(cls, id)
 
-    try:
-      obj = self.conn.getObject(cls, id)
-    except ObjectNotFound, e:
-      log.warn(e)
-      return
-
     statscls = getStatsClass(cls)
     processAttributes(self.conn, attrs, statscls)
 
@@ -226,21 +233,67 @@
     obj.syncUpdate()
 
 class MethodUpdate(object):
-  def __init__(self, conn, methodId, errorId, errorText, args):
+  def __init__(self, conn, methodId, errorCode, errorText, args):
     self.conn = conn
     self.methodId = methodId
-    self.errorId = errorId
+    self.errorCode = errorCode
     self.errorText = errorText
     self.args = args
 
-  def process(self):
-    args = ("stats", self.conn.id, self.methodId, self.errorId,
+  def process(self, model):
+    args = ("method", self.conn.id, self.methodId, self.errorCode,
             self.errorText)
-    log.info("%-8s %-16s %-8s %-8s %s" % args)
+    log.info("Processing %-8s %-16s %-12s %-12s %s" % args)
 
+    model.lock()
+    try:
+      method = model.outstandingMethodCalls.pop(self.methodId)
+      method(self.errorText, args)
+    finally:
+      model.unlock()
+
 class CloseUpdate(object):
-  def __init__(self, brokerId, data):
-    self.brokerId = brokerId
+  def __init__(self, conn, data):
+    self.conn = conn
+    self.data = data
 
-  def process(self):
-    log.info("%-8s %-16s" % ("close", self.brokerId))
+  def process(self, model):
+    log.info("Processing %-8s %-16s" % ("close", self.conn.id))
+
+    model.lock()
+    try:
+      del model.connections[conn.id]
+
+      if model.connCloseListener:
+        model.connCloseListener(conn, data)
+    finally:
+      model.unlock()
+
+class ControlUpdate(object):
+  __types = {
+    managementClient.CTRL_BROKER_INFO: "broker_info",
+    managementClient.CTRL_SCHEMA_LOADED: "schema_loaded",
+    managementClient.CTRL_USER: "user",
+    managementClient.CTRL_HEARTBEAT: "heartbeat"
+    }
+
+  def __init__(self, conn, typeCode, data):
+    self.conn = conn
+    self.typeCode = typeCode
+    self.data = data
+
+  def process(self, model):
+    type = self.__types.get(self.typeCode, "[unknown]")
+    args = ("control", self.conn.id, type, self.data)
+    log.info("Processing %-8s %-16s %-16s %s" % args)
+
+    if self.typeCode == managementClient.CTRL_BROKER_INFO:
+      uuid = "%08x-%04x-%04x-%04x-%04x%08x" % unpack \
+          ("!LHHHHL", self.data.brokerId)
+
+      log.info("Broker ID from %s is '%s'" % (self.conn.id, uuid))
+      log.info("Session ID from %s is '%s'" % \
+                 (self.conn.id, self.data.sessionId))
+
+      self.conn.brokerId = uuid
+      self.conn.sessionId = self.data.sessionId




More information about the rhmessaging-commits mailing list