[rhmessaging-commits] rhmessaging commits: r2166 - mgmt/mint/python/mint.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Jun 26 10:34:57 EDT 2008
Author: justi9
Date: 2008-06-26 10:34:56 -0400 (Thu, 26 Jun 2008)
New Revision: 2166
Modified:
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/update.py
Log:
Complete the transition to queued updates
- Pass the model to the process callbacks on update events
- Rename the callbacks and some arguments to be in line with the new
schema names
- Improve update logging
- Log full stack traces on exception
- Implement the process method of the method update event
- Add update handling for control and close events
- Set the broker uuid and session id on the connection when we
receive a "broker info" control
- We were dropping object property updates (as opposed to initial
property sets); fix that
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-25 21:18:15 UTC (rev 2165)
+++ mgmt/mint/python/mint/__init__.py 2008-06-26 14:34:56 UTC (rev 2166)
@@ -131,11 +131,15 @@
self.id = "%s:%i" % (host, port)
self.objectsById = dict()
+ # Set upon receiving a broker info control
+ self.sessionId = None
+ self.brokerId = None
+
# state in (None, "opening", "opened", "closing", "closed")
self.state = None
self.exception = None
- self.conn = None
+ self.mconn = None
self.mclient = None
self.mchan = None
@@ -155,7 +159,7 @@
return self.state == "opened"
def open(self):
- assert self.conn is None
+ assert self.mconn is None
assert self.mclient is None
assert self.mchan is None
@@ -169,19 +173,19 @@
self.exception = e
raise e
- self.conn = QpidConnection(sock, spec)
+ self.mconn = QpidConnection(sock, spec)
self.mclient = managementClient(spec,
self.model.controlCallback,
- self.model.configCallback,
- self.model.instCallback,
+ self.model.propsCallback,
+ self.model.statsCallback,
self.model.methodCallback,
self.model.closeCallback)
self.mclient.schemaListener(self.model.schemaCallback)
try:
- self.conn.start()
+ self.mconn.start()
self.mchan = self.mclient.addChannel \
- (self.conn.session(str(uuid4())), self)
+ (self.mconn.session(str(uuid4())), self)
self.state = "opened"
except Exception, e:
@@ -232,10 +236,10 @@
self.exception = e
raise e
- self.conn.close()
+ self.mconn.close()
# XXX What else do I need to try to shutdown here?
- self.conn = None
+ self.mconn = None
self.mclient = None
self.mchan = None
@@ -287,75 +291,32 @@
def stop(self):
self.updateThread.stop()
- def setDebug(self, debug=True):
- self.debug = debug
-
- def log(self, message):
- if (self.debug):
- print message
-
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def schemaCallback(self, conn, classInfo, configs, metric, methods, events):
- up = update.SchemaUpdate(conn, classInfo, configs, metric, methods, events)
+ def schemaCallback(self, conn, classInfo, props, stats, methods, events):
+ up = update.SchemaUpdate(conn, classInfo, props, stats, methods, events)
self.updateThread.enqueue(up)
- def configCallback(self, conn, classInfo, props, timestamps):
+ def propsCallback(self, conn, classInfo, props, timestamps):
up = update.PropertyUpdate(conn, classInfo, props, timestamps)
self.updateThread.enqueue(up)
- def instCallback(self, conn, classInfo, stats, timestamps):
+ def statsCallback(self, conn, classInfo, stats, timestamps):
up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
self.updateThread.enqueue(up)
- def methodCallback(self, conn, methodId, errorNo, errorText, args):
- self.log("\nMETHOD---------------------------------------------------")
- self.log("MethodId=%d" % (methodId))
- self.log("Error: %d %s" % (errorNo, errorText))
- self.log("Args: ")
- self.log(args)
-
- self.lock()
- try:
- method = self.outstandingMethodCalls.pop(methodId)
- result = method(errorText, args)
- finally:
- self.unlock()
-
- self.log("END METHOD---------------------------------------------------\n")
- return result
+ def methodCallback(self, conn, methodId, errorId, errorText, args):
+ up = update.MethodUpdate(conn, methodId, errorId, errorText, args)
+ self.updateThread.enqueue(up)
def closeCallback(self, conn, data):
- self.log("\nCLOSE---------------------------------------------------")
- self.log("BrokerId=%s , Data=%s" % (conn.id, data))
-
- self.lock()
- try:
- del self.connections[conn.id]
-
- if (self.connCloseListener != None):
- self.connCloseListener(conn, data)
- finally:
- self.unlock()
-
- self.log("END CLOSE---------------------------------------------------\n")
- return
+ up = update.CloseCallback(conn, data)
+ self.updateThread.enqueue(up)
def controlCallback(self, conn, type, data):
- self.log("\nCONTROL---------------------------------------------------")
- readableType = "UNKNOWN"
- if (type == managementClient.CTRL_BROKER_INFO):
- readableType = "CTRL_BROKER_INFO"
- elif (type == managementClient.CTRL_SCHEMA_LOADED):
- readableType = "CTRL_SCHEMA_LOADED"
- elif (type == managementClient.CTRL_USER):
- readableType = "CTRL_USER"
- elif (type == managementClient.CTRL_HEARTBEAT):
- readableType = "CTRL_HEARTBEAT"
- self.log("BrokerId=%s , Type=%s, Data=%s" % (conn.id, readableType, data))
- self.log("END CONTROL---------------------------------------------------\n")
- return
+ up = update.ControlUpdate(conn, type, data)
+ self.updateThread.enqueue(up)
def registerCallback(self, callback):
self.lock()
Modified: mgmt/mint/python/mint/update.py
===================================================================
--- mgmt/mint/python/mint/update.py 2008-06-25 21:18:15 UTC (rev 2165)
+++ mgmt/mint/python/mint/update.py 2008-06-26 14:34:56 UTC (rev 2166)
@@ -3,6 +3,8 @@
from Queue import Queue as ConcurrentQueue, Full, Empty
from threading import Thread
from datetime import datetime
+from qpid.management import managementClient
+from struct import unpack
import mint
@@ -21,7 +23,7 @@
try:
self.updates.put(update)
except Full:
- log.warn("Update queue is full")
+ log.exception("Queue is full")
def run(self):
while True:
@@ -31,13 +33,13 @@
if self.stopRequested:
break
else:
- log.info("Update queue is empty")
+ log.debug("Queue is empty")
continue
try:
- update.process()
- except Exception, e:
- log.error(e)
+ update.process(self.model)
+ except:
+ log.exception("Update failed")
def stop(self):
self.stopRequested = True
@@ -110,18 +112,18 @@
self.methods = methods
self.events = events
- def process(self):
- args = ("schema", self.conn.id, self.classInfo[0], self.classInfo[1])
- log.info("%-8s %-16s %-8s %-12s" % args)
+ def process(self, model):
+ cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+ log.info("Processing %-8s %-16s %-16s" % ("schema", self.conn.id, cls))
try:
pkg, cls = unmarshalClassInfo(self.classInfo)
+ cls.classInfos[self.conn.id] = self.classInfo
except UnknownClassException, e:
log.warn(e)
return
- if cls:
- cls.classInfos[self.conn.id] = self.classInfo
+ # XXX do more schema checking
class PropertyUpdate(object):
def __init__(self, conn, classInfo, props, timestamps):
@@ -130,10 +132,10 @@
self.props = props
self.timestamps = timestamps
- def process(self):
- args = ("props", self.conn.id, self.classInfo[0], self.classInfo[1],
- len(self.props))
- log.info("%-8s %-16s %-8s %-12s %i" % args)
+ def process(self, model):
+ cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+ args = ("props", self.conn.id, cls, len(self.props))
+ log.info("Processing %-8s %-16s %-16s %3i" % args)
try:
pkg, cls = unmarshalClassInfo(self.classInfo)
@@ -147,6 +149,8 @@
processAttributes(self.conn, attrs, cls)
+ # XXX move these down to the try/except
+
attrs["managedBroker"] = self.conn.id
attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
attrs["creationTime"] = datetime.fromtimestamp \
@@ -156,9 +160,17 @@
obj = self.conn.getObject(cls, id)
except mint.ObjectNotFound:
obj = cls()
- obj.set(**attrs)
- obj.syncUpdate()
+ #obj.sourceId = id
+ #obj.sourceBrokerId = self.conn.brokerId
+
+ #hash = "%08x%08x%08x%08x" % unpack("!LLLL", self.classInfo[2])
+ #classInfo = (self.classInfo[0], self.classInfo[1], hash)
+ #obj.sourceClassInfo = ",".join(classInfo)
+
+ obj.set(**attrs)
+ obj.syncUpdate()
+
# XXX refactor this to take advantage of the get/create logic
# above
if isinstance(obj, mint.Broker) and obj.managedBroker:
@@ -185,10 +197,10 @@
self.stats = stats
self.timestamps = timestamps
- def process(self):
- args = ("stats", self.conn.id, self.classInfo[0], self.classInfo[1],
- len(self.stats))
- log.info("%-8s %-16s %-8s %-12s %i" % args)
+ def process(self, model):
+ cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
+ args = ("stats", self.conn.id, cls, len(self.stats))
+ log.info("Processing %-8s %-16s %-16s %3i" % args)
try:
pkg, cls = unmarshalClassInfo(self.classInfo)
@@ -199,13 +211,8 @@
attrs = dict(self.stats)
id = attrs["id"]
+ obj = self.conn.getObject(cls, id)
- try:
- obj = self.conn.getObject(cls, id)
- except ObjectNotFound, e:
- log.warn(e)
- return
-
statscls = getStatsClass(cls)
processAttributes(self.conn, attrs, statscls)
@@ -226,21 +233,67 @@
obj.syncUpdate()
class MethodUpdate(object):
- def __init__(self, conn, methodId, errorId, errorText, args):
+ def __init__(self, conn, methodId, errorCode, errorText, args):
self.conn = conn
self.methodId = methodId
- self.errorId = errorId
+ self.errorCode = errorCode
self.errorText = errorText
self.args = args
- def process(self):
- args = ("stats", self.conn.id, self.methodId, self.errorId,
+ def process(self, model):
+ args = ("method", self.conn.id, self.methodId, self.errorCode,
self.errorText)
- log.info("%-8s %-16s %-8s %-8s %s" % args)
+ log.info("Processing %-8s %-16s %-12s %-12s %s" % args)
+ model.lock()
+ try:
+ method = model.outstandingMethodCalls.pop(self.methodId)
+ method(self.errorText, args)
+ finally:
+ model.unlock()
+
class CloseUpdate(object):
- def __init__(self, brokerId, data):
- self.brokerId = brokerId
+ def __init__(self, conn, data):
+ self.conn = conn
+ self.data = data
- def process(self):
- log.info("%-8s %-16s" % ("close", self.brokerId))
+ def process(self, model):
+ log.info("Processing %-8s %-16s" % ("close", self.conn.id))
+
+ model.lock()
+ try:
+ del model.connections[conn.id]
+
+ if model.connCloseListener:
+ model.connCloseListener(conn, data)
+ finally:
+ model.unlock()
+
+class ControlUpdate(object):
+ __types = {
+ managementClient.CTRL_BROKER_INFO: "broker_info",
+ managementClient.CTRL_SCHEMA_LOADED: "schema_loaded",
+ managementClient.CTRL_USER: "user",
+ managementClient.CTRL_HEARTBEAT: "heartbeat"
+ }
+
+ def __init__(self, conn, typeCode, data):
+ self.conn = conn
+ self.typeCode = typeCode
+ self.data = data
+
+ def process(self, model):
+ type = self.__types.get(self.typeCode, "[unknown]")
+ args = ("control", self.conn.id, type, self.data)
+ log.info("Processing %-8s %-16s %-16s %s" % args)
+
+ if self.typeCode == managementClient.CTRL_BROKER_INFO:
+ uuid = "%08x-%04x-%04x-%04x-%04x%08x" % unpack \
+ ("!LHHHHL", self.data.brokerId)
+
+ log.info("Broker ID from %s is '%s'" % (self.conn.id, uuid))
+ log.info("Session ID from %s is '%s'" % \
+ (self.conn.id, self.data.sessionId))
+
+ self.conn.brokerId = uuid
+ self.conn.sessionId = self.data.sessionId
More information about the rhmessaging-commits
mailing list