[rhmessaging-commits] rhmessaging commits: r2157 - in mgmt: cumin/python/cumin and 2 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Jun 24 11:56:25 EDT 2008
Author: justi9
Date: 2008-06-24 11:56:25 -0400 (Tue, 24 Jun 2008)
New Revision: 2157
Added:
mgmt/mint/python/mint/update.py
Modified:
mgmt/cumin/bin/cumin
mgmt/cumin/bin/cumin-bench
mgmt/cumin/bin/cumin-test
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/model.py
mgmt/mint/bin/mint-test
mgmt/mint/python/mint/__init__.py
Log:
Moved all database updates to a distinct update queue and worker
thread, to simplify locking.
Changed mint-test to work on multiple broker connections.
Now that we have worker threads, introduce app start/stop methods to
the command line entry points.
Modified: mgmt/cumin/bin/cumin
===================================================================
--- mgmt/cumin/bin/cumin 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin 2008-06-24 15:56:25 UTC (rev 2157)
@@ -42,11 +42,15 @@
sys.exit(1)
try:
- server.start()
- except:
- server.stop()
- raise
+ app.start()
+ try:
+ server.start()
+ finally:
+ server.stop()
+ finally:
+ app.stop()
+
def main():
config = CuminConfig()
Modified: mgmt/cumin/bin/cumin-bench
===================================================================
--- mgmt/cumin/bin/cumin-bench 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin-bench 2008-06-24 15:56:25 UTC (rev 2157)
@@ -26,7 +26,12 @@
harness = BenchmarkHarness(app)
try:
- harness.run(hits)
+ try:
+ app.start()
+
+ harness.run(hits)
+ finally:
+ app.stop()
except KeyboardInterrupt:
pass
Modified: mgmt/cumin/bin/cumin-test
===================================================================
--- mgmt/cumin/bin/cumin-test 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/bin/cumin-test 2008-06-24 15:56:25 UTC (rev 2157)
@@ -28,8 +28,12 @@
env = TestEnvironment(app, host, port, config.spec)
env.init();
- session = env.run_test(MainTest(env))
- session.report(sys.stdout)
+ app.start()
+ try:
+ session = env.run_test(MainTest(env))
+ session.report(sys.stdout)
+ finally:
+ app.stop()
def main():
config = CuminConfig()
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/python/cumin/__init__.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -58,8 +58,14 @@
def init(self):
self.model.init()
+
+ def start(self):
+ self.model.start()
self.broker_connect_thread.start()
+ def stop(self):
+ self.model.stop()
+
class BrokerConnectThread(Thread):
log = logging.getLogger("cumin.mgmt.conn")
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/cumin/python/cumin/model.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -40,6 +40,12 @@
def init(self):
self.data.init()
+
+ def start(self):
+ self.data.start()
+
+ def stop(self):
+ self.data.stop()
def add_class(self, cls):
self.classes.append(cls)
Modified: mgmt/mint/bin/mint-test
===================================================================
--- mgmt/mint/bin/mint-test 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/mint/bin/mint-test 2008-06-24 15:56:25 UTC (rev 2157)
@@ -1,30 +1,51 @@
#!/usr/bin/env python
-import sys, os
+import sys, os, logging
from time import sleep
from mint import *
def usage():
- print "Usage: mint-test [DATABASE-URI] [BROKER-ADDRESS]"
+ print "Usage: mint-test [DATABASE-URI] [BROKER-ADDRESSES...]"
print "Example: mint-test postgresql://cumin@localhost/cumin localhost:5672"
sys.exit(1)
-def do_main(uri, spec, host, port):
- model = MintModel(uri, spec, debug=True)
+def do_main(uri, spec, hostports):
+ model = MintModel(uri, spec, debug=False)
model.check()
model.init()
- conn = BrokerConnection(model, host, port)
- conn.open()
+ conns = list()
+ for host, port in hostports:
+ conn = BrokerConnection(model, host, port)
+ conns.append(conn)
+
+ model.start()
try:
- while (True):
- sleep(5)
+ try:
+ for conn in conns:
+ conn.open()
+
+ while True:
+ sleep(5)
+ finally:
+ for conn in conns:
+ try:
+ conn.close()
+ except:
+ pass
finally:
- conn.close()
+ model.stop()
def main():
+ root = logging.getLogger("mint")
+ root.setLevel(logging.DEBUG)
+
+ h = logging.StreamHandler()
+ h.setLevel(logging.DEBUG)
+ root.addHandler(h)
+
if "-h" in sys.argv or "--help" in sys.argv:
usage()
@@ -33,17 +54,29 @@
except IndexError:
uri = "postgresql://cumin@localhost/cumin"
- try:
- addr = sys.argv[2]
- except IndexError:
- addr = "localhost:5672"
+ if uri == "-":
+ uri = "postgresql://cumin@localhost/cumin"
+ addrs = sys.argv[2:]
+
+ if not addrs:
+ addrs = ["localhost:5672"]
+
+ hostports = list()
+
+ for addr in addrs:
+ try:
+ host, port = addr.split(":")
+ hostports.append((host, int(port)))
+ except ValueError, e:
+ print "Error: Cannot parse '%s': %s" % (addr, e)
+ sys.exit(1)
+
spec = os.environ.get("AMQP_SPEC",
- os.path.normpath("/usr/share/amqp/amqp.0-10.xml"))
- host, port = addr.split(":")
+ os.path.normpath("/usr/share/amqp/amqp.0-10.xml"))
try:
- do_main(uri, spec, host, int(port))
+ do_main(uri, spec, hostports)
except KeyboardInterrupt:
pass
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-23 19:44:16 UTC (rev 2156)
+++ mgmt/mint/python/mint/__init__.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -7,7 +7,9 @@
from sqlobject import *
from threading import Lock, RLock
from traceback import print_exc, print_stack
+
from mint import schema
+from mint import update
log = logging.getLogger("mint")
@@ -117,87 +119,19 @@
lastChallenged = TimestampCol(default=None)
lastLoggedOut = TimestampCol(default=None)
-class OriginalIdDict:
- def __init__(self):
- self.idMap = dict()
- self.lock = Lock()
+class ObjectNotFound(Exception):
+ pass
- def set(self, idOriginal, obj):
- self.lock.acquire()
- try:
- self.idMap[idOriginal] = obj
- finally:
- self.lock.release()
-
- def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- obj = None
-
- self.lock.acquire()
- try:
- obj = self.doGetByOriginalId(objType, idOriginal, managedBroker, create, args)
- finally:
- self.lock.release()
-
- return obj
-
- def doGetByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- obj = None
- key = (managedBroker, idOriginal)
- if (key in self.idMap):
- #print "\n\n=============== %s %d found\n\n" % (objType.__name__, idOriginal)
- obj = self.idMap[key]
- else:
- try:
- obj = eval("objType.selectBy(idOriginal=idOriginal, managedBroker=managedBroker)[:1][0]")
- self.idMap[key] = obj
- except:
- if (create):
- #print "\n\n=============== %s %d NOT found, creating\n\n" % (objType.__name__, idOriginal)
- obj = objType.__new__(objType)
- obj.__init__()
- obj.syncUpdate()
- self.idMap[key] = obj
- else:
- #print "\n\n=============== %s %d NOT found, NOT creating\n\n" % (objType.__name__, idOriginal)
- pass
- else:
- #print "\n\n=============== %s %d found AFTER QUERY\n\n" % (objType.__name__, idOriginal)
- pass
-
- if isinstance(obj, Broker) and obj.managedBroker:
- host, port = obj.managedBroker.split(":")
- port = int(port)
-
- if not obj.registration:
- try:
- reg = BrokerRegistration.selectBy(host=host, port=port)[0]
- except IndexError:
- reg = None
-
- if reg:
- reg.broker = obj
- obj.registration = reg
-
- reg.syncUpdate()
- obj.syncUpdate()
-
- return obj
-
- def getByIndexAttrib(self, objType, indexAttrib, indexValue, create=False, args={}):
- ###FIX
- return None
-
# Not thread safe
class BrokerConnection(object):
def __init__(self, model, host, port):
self.model = model
self.host = host
self.port = port
- self.key = "%s:%i" % (host, port)
- self.objs = OriginalIdDict()
+ self.id = "%s:%i" % (host, port)
+ self.objectsById = dict()
- # state in (None, "opening", "opened", "open_failed",
- # "closing", "closed", "close_failed")
+ # state in (None, "opening", "opened", "closing", "closed")
self.state = None
self.exception = None
@@ -205,6 +139,18 @@
self.mclient = None
self.mchan = None
+ def getObject(self, cls, id):
+ try:
+ obj = self.objectsById[id]
+ except KeyError:
+ try:
+ obj = cls.selectBy(idOriginal=id, managedBroker=self.id)[0]
+ self.objectsById[id] = obj
+ except IndexError:
+ raise ObjectNotFound()
+
+ return obj
+
def isOpen(self):
return self.state == "opened"
@@ -220,9 +166,8 @@
try:
sock = connect(self.host, self.port)
except Exception, e:
- self.state = "open_failed"
self.exception = e
- return
+ raise e
self.conn = QpidConnection(sock, spec)
self.mclient = managementClient(spec,
@@ -239,11 +184,11 @@
# XXX I want this to happen after broker start, but the
# callbacks rely on the broker being in the connectedBrokers
# dict
- self.model.connections[self.key] = self
+ self.model.connections[self.id] = self
self.conn.start()
self.mchan = self.mclient.addChannel(self.conn.session(str(uuid4())),
- self.key)
+ self.id)
self.state = "opened"
except Exception, e:
@@ -281,7 +226,7 @@
try:
self.mclient.removeChannel(self.mchan)
- del self.model.connections[self.key]
+ del self.model.connections[self.id]
self.state = "closed"
except Exception, e:
@@ -297,21 +242,6 @@
self.mclient = None
self.mchan = None
- def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- self.model.lock()
- try:
- return self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
- finally:
- self.model.unlock()
-
- def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
- create=False, args={}):
- self.model.lock()
- try:
- return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
- finally:
- self.model.unlock()
-
class MintModel:
staticInstance = None
@@ -326,6 +256,8 @@
self.connCloseListener = None
self.__lock = RLock()
+ self.updateThread = update.ModelUpdateThread(self)
+
assert MintModel.staticInstance is None
MintModel.staticInstance = self
@@ -352,6 +284,12 @@
conn = connectionForURI(self.dataUri)
sqlhub.processConnection = conn
+ def start(self):
+ self.updateThread.start()
+
+ def stop(self):
+ self.updateThread.stop()
+
def setDebug(self, debug=True):
self.debug = debug
@@ -359,219 +297,34 @@
if (self.debug):
print message
- def sanitizeDict(self, d):
- if ("id" in d):
- d[self.convertIdKey("id")] = d.pop("id")
- if ("connectionRef" in d):
- d["clientConnectionRef"] = d.pop("connectionRef")
- #XXX FIX -- fix handling of field tables
- if ("arguments" in d):
- d.pop("arguments")
- #XXX FIX -- fix handling of field tables
- return d
-
- def convertIdKey(self, k):
- return "idOriginal"
-
- def convertRefKey(self, k):
- result = k.replace("Ref", "")
- result = result[0].lower() + result[1:]
- return result
-
- def findParentKeys(self, d):
- keys = []
- for key in d.keys():
- if (key.endswith("Ref")):
- keys.append(key)
- return keys
-
- def fixClassInfo(self, classInfo):
- objectName = self.initialCapital(classInfo[1])
- if (objectName == "Connection"):
- objectName = "ClientConnection"
- return objectName
-
- def initialCapital(self, string):
- return string[0].upper() + string[1:]
-
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def schemaCallback(self, brokerId, classInfo, configs, metric, methods, events):
- self.log("\nSCHEMA---------------------------------------------------")
- objectName = self.fixClassInfo(classInfo)
- self.log("BrokerId=%s , objectName=%s" % (brokerId, objectName))
-
+ def getConnection(self, id):
self.lock()
try:
- cls = schema.schemaNameToClassMap.get(objectName)
- if cls:
- cls.classInfos[brokerId] = classInfo
- finally:
- self.unlock()
-
- self.log("\nEND SCHEMA---------------------------------------------------")
-
- def configCallback(self, brokerId, classInfo, list, timestamps):
- self.log("\nCONFIG---------------------------------------------------")
- objectName = self.fixClassInfo(classInfo)
- brokerUUID = classInfo[2]
- self.log(objectName)
- d = self.sanitizeDict(dict(list))
-
- self.lock()
- try:
- conn = self.connections[brokerId]
- finally:
- self.unlock()
-
- d["managedBroker"] = brokerId
- d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
- d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
- self.log(d)
- try:
- for parentKey in self.findParentKeys(d):
- convertedKey = self.convertRefKey(parentKey)
-
- self.lock()
- try:
- cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
- if cls:
- d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
- else:
- self.log("Error: referenced class not found: %s" % convertedKey)
- finally:
- self.unlock()
-
- obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], brokerId, create=True)
- if (not obj):
- self.log("Couldn't find type %s id %s" % (objectName, d["idOriginal"]))
- return
-
- self.lock()
try:
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
- finally:
- self.unlock()
-
- except TypeError, detail:
- self.log("TypeError: Schema mismatch: %s" % detail)
- return
- except KeyError, detail:
- self.log("KeyError: Schema mismatch: %s" % detail)
- return
-
- self.log("END CONFIG---------------------------------------------------\n")
- return obj
-
- def instCallback(self, brokerId, classInfo, list, timestamps):
- self.log("\nINST---------------------------------------------------")
- objectName = self.fixClassInfo(classInfo)
- brokerUUID = classInfo[2]
- self.log(objectName)
- d = self.sanitizeDict(dict(list))
-
- self.lock()
- try:
- conn = self.connections[brokerId]
+ return self.connections[id]
+ except KeyError:
+ log.error("Connection '%s' not found" % id)
finally:
self.unlock()
- d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
- self.log(d)
+ def schemaCallback(self, brokerId, classInfo, configs, metric, methods, events):
+ conn = self.getConnection(brokerId)
+ up = update.SchemaUpdate(conn, classInfo, configs, metric, methods, events)
+ self.updateThread.enqueue(up)
- try:
- obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d[self.convertIdKey("id")], brokerId)
- if (not obj):
- self.log("Couldn't find type %s id %s" % (objectName, d[self.convertIdKey("id")]))
- print "lion", classInfo, list
- return
-
- origObjName = objectName[0].lower() + objectName[1:]
- d[origObjName] = obj
+ def configCallback(self, brokerId, classInfo, props, timestamps):
+ conn = self.getConnection(brokerId)
+ up = update.PropertyUpdate(conn, classInfo, props, timestamps)
+ self.updateThread.enqueue(up)
- self.lock()
- try:
- objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
- objStats = objNameStats.__new__(objNameStats)
- objStats.__init__()
- finally:
- self.unlock()
-
- if (not objStats):
- self.log("Couldn't find type %s id %s" % (objNameStats, d[self.convertIdKey("id")]))
- return
+ def instCallback(self, brokerId, classInfo, stats, timestamps):
+ conn = self.getConnection(brokerId)
+ up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
+ self.updateThread.enqueue(up)
- self.lock()
- try:
- objStats = self.updateObjWithDict(objStats, d)
- if (not objStats):
- return
- finally:
- self.unlock()
-
- d = dict()
- d["statsPrev"] = obj.statsCurr
- d["statsCurr"] = objStats
- if (timestamps[2] != 0):
- d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
-
- self.lock()
- try:
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
- finally:
- self.unlock()
-
- except TypeError, detail:
- self.log("TypeError: Schema mismatch: %s" % detail)
- return
- except KeyError, detail:
- self.log("KeyError: Schema mismatch: %s" % detail)
- return
-
- self.log("END INST---------------------------------------------------\n")
- return objStats
-
- def updateObjWithDict(self, obj, d):
- updateDone = False
- reattemptCount = 0
- while not updateDone and len(d) > 0:
- try:
- obj.set(**d)
- obj.syncUpdate()
- updateDone = True
- if (reattemptCount > 0):
- self.log("Reattempts successful")
- except TypeError, detail:
- self.log("TypeError: Schema mismatch: %s" % detail)
- detailString = detail.__str__()
- errorString = "got an unexpected keyword argument "
- index = detailString.index(errorString)
- if (index >= 0):
- # argument in dict is not in schema, so remove it and re-attempt
- index += len(errorString)
- missingAttrib = detailString[index:]
- self.log("Reattempting without %s attribute" % missingAttrib)
- d.pop(missingAttrib)
- reattemptCount += 1
- else:
- # can't recover
- self.log("Non-recoverable schema mismatch, information lost")
- return None
- except KeyError, detail:
- self.log("KeyError: Schema mismatch: %s" % detail)
- return None
- except:
- #TODO: better exception handling here
- self.log("Unexpected Error: %s" % sys.exc_info()[0])
- print "Unexpected Error: %s" % sys.exc_info()[0]
- return obj
- return obj
-
def methodCallback(self, brokerId, methodId, errorNo, errorText, args):
self.log("\nMETHOD---------------------------------------------------")
self.log("MethodId=%d" % (methodId))
Added: mgmt/mint/python/mint/update.py
===================================================================
--- mgmt/mint/python/mint/update.py (rev 0)
+++ mgmt/mint/python/mint/update.py 2008-06-24 15:56:25 UTC (rev 2157)
@@ -0,0 +1,246 @@
+import logging
+
+from Queue import Queue as ConcurrentQueue, Full, Empty
+from threading import Thread
+from datetime import datetime
+
+import mint
+
+log = logging.getLogger("mint.update")
+
+class ModelUpdateThread(Thread):
+ def __init__(self, model):
+ super(ModelUpdateThread, self).__init__()
+
+ self.model = model
+ self.updates = ConcurrentQueue()
+ self.stopRequested = False
+ self.setDaemon(False)
+
+ def enqueue(self, update):
+ try:
+ self.updates.put(update)
+ except Full:
+ log.warn("Update queue is full")
+
+ def run(self):
+ while True:
+ try:
+ update = self.updates.get(True, 1)
+ except Empty:
+ if self.stopRequested:
+ break
+ else:
+ log.info("Update queue is empty")
+ continue
+
+ try:
+ update.process()
+ except Exception, e:
+ log.error(e)
+
+ def stop(self):
+ self.stopRequested = True
+
+class UnknownClassException(Exception):
+ pass
+
+def unmarshalClassInfo(classInfo):
+ package = classInfo[0]
+ name = classInfo[1].capitalize()
+
+ if name == "Connection":
+ name = "ClientConnection"
+
+ cls = getattr(mint, name)
+
+ if cls is None:
+ raise UnknownClassException("Class '%s' is unknown" % name)
+
+ return package, cls
+
+def processAttributes(conn, attrs, cls):
+ if "id" in attrs:
+ attrs["idOriginal"] = attrs.pop("id")
+
+ if "connectionRef" in attrs:
+ attrs["clientConnectionRef"] = attrs.pop("connectionRef")
+
+ #XXX FIX -- fix handling of field tables
+ if "arguments" in attrs:
+ del attrs["arguments"]
+
+ for name in attrs.keys():
+ if len(name) > 3 and name.endswith("Ref"):
+ # Navigate to referenced objects
+
+ clsname = name[0].upper() + name[1:-3]
+ id = attrs.pop(name)
+
+ othercls = getattr(mint, clsname, None)
+
+ if othercls:
+ attrname = name[0:-3]
+
+ try:
+ attrs[attrname] = conn.getObject(othercls, id)
+ except KeyError:
+ log.info("Referenced object %s '%s' not found" % \
+ (clsname, id))
+ else:
+ log.error("Class '%s' not found" % clsname)
+ elif not hasattr(cls, name):
+ # Remove attrs that we don't have in our schema
+
+ log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
+
+ del attrs[name]
+
+ return attrs
+
+def getStatsClass(cls):
+ return getattr(mint, cls.__name__ + "Stats")
+
+class SchemaUpdate(object):
+ def __init__(self, conn, classInfo, props, stats, methods, events):
+ self.conn = conn
+ self.classInfo = classInfo
+ self.props = props
+ self.stats = stats
+ self.methods = methods
+ self.events = events
+
+ def process(self):
+ args = ("schema", self.conn.id, self.classInfo[0], self.classInfo[1])
+ log.info("%-8s %-16s %-8s %-12s" % args)
+
+ try:
+ pkg, cls = unmarshalClassInfo(self.classInfo)
+ except UnknownClassException, e:
+ log.warn(e)
+ return
+
+ if cls:
+ cls.classInfos[self.conn.id] = self.classInfo
+
+class PropertyUpdate(object):
+ def __init__(self, conn, classInfo, props, timestamps):
+ self.conn = conn
+ self.classInfo = classInfo
+ self.props = props
+ self.timestamps = timestamps
+
+ def process(self):
+ args = ("props", self.conn.id, self.classInfo[0], self.classInfo[1],
+ len(self.props))
+ log.info("%-8s %-16s %-8s %-12s %i" % args)
+
+ try:
+ pkg, cls = unmarshalClassInfo(self.classInfo)
+ except UnknownClassException, e:
+ log.warn(e)
+ return
+
+ attrs = dict(self.props)
+
+ id = attrs["id"]
+
+ processAttributes(self.conn, attrs, cls)
+
+ attrs["managedBroker"] = self.conn.id
+ attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
+ attrs["creationTime"] = datetime.fromtimestamp \
+ (self.timestamps[1]/1000000000)
+
+ try:
+ obj = self.conn.getObject(cls, id)
+ except mint.ObjectNotFound:
+ obj = cls()
+ obj.set(**attrs)
+ obj.syncUpdate()
+
+ # XXX refactor this to take advantage of the get/create logic
+ # above
+ if isinstance(obj, mint.Broker) and obj.managedBroker:
+ host, port = obj.managedBroker.split(":")
+ port = int(port)
+
+ if not obj.registration:
+ try:
+ reg = mint.BrokerRegistration.selectBy(host=host, port=port)[0]
+ except IndexError:
+ reg = None
+
+ if reg:
+ reg.broker = obj
+ obj.registration = reg
+
+ reg.syncUpdate()
+ obj.syncUpdate()
+
+class StatisticUpdate(object):
+ def __init__(self, conn, classInfo, stats, timestamps):
+ self.conn = conn
+ self.classInfo = classInfo
+ self.stats = stats
+ self.timestamps = timestamps
+
+ def process(self):
+ args = ("stats", self.conn.id, self.classInfo[0], self.classInfo[1],
+ len(self.stats))
+ log.info("%-8s %-16s %-8s %-12s %i" % args)
+
+ try:
+ pkg, cls = unmarshalClassInfo(self.classInfo)
+ except UnknownClassException, e:
+ log.warn(e)
+ return
+
+ attrs = dict(self.stats)
+
+ id = attrs["id"]
+
+ try:
+ obj = self.conn.getObject(cls, id)
+ except ObjectNotFound, e:
+ log.warn(e)
+ return
+
+ statscls = getStatsClass(cls)
+ processAttributes(self.conn, attrs, statscls)
+
+ attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)
+
+ # Set the stat->obj reference
+ attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+
+ statsobj = statscls()
+ statsobj.set(**attrs)
+ statsobj.syncUpdate()
+
+ if self.timestamps[2] != 0:
+ obj.deletionTime = datetime.fromtimestamp(self.timestamps[2]/1000000000)
+
+ obj.statsPrev = obj.statsCurr
+ obj.statsCurr = statsobj
+ obj.syncUpdate()
+
+class MethodUpdate(object):
+ def __init__(self, conn, methodId, errorId, errorText, args):
+ self.conn = conn
+ self.methodId = methodId
+ self.errorId = errorId
+ self.errorText = errorText
+ self.args = args
+
+ def process(self):
+ args = ("stats", self.conn.id, self.methodId, self.errorId,
+ self.errorText)
+ log.info("%-8s %-16s %-8s %-8s %s" % args)
+
+class CloseUpdate(object):
+ def __init__(self, brokerId, data):
+ self.brokerId = brokerId
+
+ def process(self):
+ log.info("%-8s %-16s" % ("close", self.brokerId))
More information about the rhmessaging-commits
mailing list