[rhmessaging-commits] rhmessaging commits: r2155 - mgmt/mint/python/mint.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jun 16 23:47:04 EDT 2008
Author: justi9
Date: 2008-06-16 23:47:04 -0400 (Mon, 16 Jun 2008)
New Revision: 2155
Modified:
mgmt/mint/python/mint/__init__.py
Log:
Protect shared model data
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-06-17 02:27:33 UTC (rev 2154)
+++ mgmt/mint/python/mint/__init__.py 2008-06-17 03:47:04 UTC (rev 2155)
@@ -5,8 +5,8 @@
from qpid.management import managementChannel, managementClient
from datetime import *
from sqlobject import *
-from threading import Lock
-from traceback import print_exc
+from threading import Lock, RLock
+from traceback import print_exc, print_stack
from mint import schema
log = logging.getLogger("mint")
@@ -233,7 +233,7 @@
self.model.closeCallback)
self.mclient.schemaListener(self.model.schemaCallback)
- self.model.lock.acquire()
+ self.model.lock()
try:
try:
# XXX I want this to happen after broker start, but the
@@ -250,7 +250,7 @@
self.exception = e
raise e
finally:
- self.model.lock.release()
+ self.model.unlock()
def getSessionId(self):
if not self.isOpen():
@@ -262,13 +262,13 @@
if not self.isOpen():
raise Exception("Connection not open")
- self.model.lock.acquire()
+ self.model.lock()
try:
self.model.currentMethodId += 1
seq = self.model.currentMethodId
self.model.outstandingMethodCalls[seq] = callback
finally:
- self.model.lock.release()
+ self.model.unlock()
self.mclient.callMethod(self.mchan, seq, objId,
className, methodName, args)
@@ -276,7 +276,7 @@
def close(self):
self.state = "closing"
- self.model.lock.acquire()
+ self.model.lock()
try:
try:
self.mclient.removeChannel(self.mchan)
@@ -288,7 +288,7 @@
self.exception = e
raise e
finally:
- self.model.lock.release()
+ self.model.unlock()
self.conn.close()
# XXX What else do I need to try to shutdown here?
@@ -298,23 +298,19 @@
self.mchan = None
def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
- result = None
- self.model.lock.acquire()
+ self.model.lock()
try:
- result = self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
+ return self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
finally:
- self.model.lock.release()
- return result
+ self.model.unlock()
def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
create=False, args={}):
- result = None
- self.model.lock.acquire()
+ self.model.lock()
try:
- result = self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
+ return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
finally:
- self.model.lock.release()
- return result
+ self.model.unlock()
class MintModel:
staticInstance = None
@@ -328,7 +324,7 @@
self.outstandingMethodCalls = dict()
self.connections = dict()
self.connCloseListener = None
- self.lock = Lock()
+ self.__lock = RLock()
assert MintModel.staticInstance is None
MintModel.staticInstance = self
@@ -336,6 +332,13 @@
if self.debug:
log.setLevel(logging.DEBUG)
+ def lock(self):
+ #print_stack()
+ self.__lock.acquire()
+
+ def unlock(self):
+ self.__lock.release()
+
def check(self):
try:
connectionForURI(self.dataUri)
@@ -398,9 +401,15 @@
self.log("\nSCHEMA---------------------------------------------------")
objectName = self.fixClassInfo(classInfo)
self.log("BrokerId=%s , objectName=%s" % (brokerId, objectName))
- cls = schema.schemaNameToClassMap.get(objectName)
- if cls:
- cls.classInfos[brokerId] = classInfo
+
+ self.lock()
+ try:
+ cls = schema.schemaNameToClassMap.get(objectName)
+ if cls:
+ cls.classInfos[brokerId] = classInfo
+ finally:
+ self.unlock()
+
self.log("\nEND SCHEMA---------------------------------------------------")
def configCallback(self, brokerId, classInfo, list, timestamps):
@@ -410,11 +419,11 @@
self.log(objectName)
d = self.sanitizeDict(dict(list))
- self.lock.acquire()
+ self.lock()
try:
conn = self.connections[brokerId]
finally:
- self.lock.release()
+ self.unlock()
d["managedBroker"] = brokerId
d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
@@ -423,20 +432,29 @@
try:
for parentKey in self.findParentKeys(d):
convertedKey = self.convertRefKey(parentKey)
- cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
- if cls:
- d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
- else:
- self.log("Error: referenced class not found: %s" % convertedKey)
+ self.lock()
+ try:
+ cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
+ if cls:
+ d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
+ else:
+ self.log("Error: referenced class not found: %s" % convertedKey)
+ finally:
+ self.unlock()
+
obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], brokerId, create=True)
if (not obj):
self.log("Couldn't find type %s id %s" % (objectName, d["idOriginal"]))
return
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
+ self.lock()
+ try:
+ obj = self.updateObjWithDict(obj, d)
+ if (not obj):
+ return
+ finally:
+ self.unlock()
except TypeError, detail:
self.log("TypeError: Schema mismatch: %s" % detail)
@@ -455,11 +473,11 @@
self.log(objectName)
d = self.sanitizeDict(dict(list))
- self.lock.acquire()
+ self.lock()
try:
conn = self.connections[brokerId]
finally:
- self.lock.release()
+ self.unlock()
d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
self.log(d)
@@ -473,27 +491,40 @@
origObjName = objectName[0].lower() + objectName[1:]
d[origObjName] = obj
- objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
- objStats = objNameStats.__new__(objNameStats)
- objStats.__init__()
+
+ self.lock()
+ try:
+ objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
+ objStats = objNameStats.__new__(objNameStats)
+ objStats.__init__()
+ finally:
+ self.unlock()
if (not objStats):
self.log("Couldn't find type %s id %s" % (objNameStats, d[self.convertIdKey("id")]))
return
-
- objStats = self.updateObjWithDict(objStats, d)
- if (not objStats):
- return
+ self.lock()
+ try:
+ objStats = self.updateObjWithDict(objStats, d)
+ if (not objStats):
+ return
+ finally:
+ self.unlock()
+
d = dict()
d["statsPrev"] = obj.statsCurr
d["statsCurr"] = objStats
if (timestamps[2] != 0):
d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
- obj = self.updateObjWithDict(obj, d)
- if (not obj):
- return
+ self.lock()
+ try:
+ obj = self.updateObjWithDict(obj, d)
+ if (not obj):
+ return
+ finally:
+ self.unlock()
except TypeError, detail:
self.log("TypeError: Schema mismatch: %s" % detail)
@@ -547,8 +578,14 @@
self.log("Error: %d %s" % (errorNo, errorText))
self.log("Args: ")
self.log(args)
- method = self.outstandingMethodCalls.pop(methodId)
- result = method(errorText, args)
+
+ self.lock()
+ try:
+ method = self.outstandingMethodCalls.pop(methodId)
+ result = method(errorText, args)
+ finally:
+ self.unlock()
+
self.log("END METHOD---------------------------------------------------\n")
return result
@@ -556,14 +593,15 @@
self.log("\nCLOSE---------------------------------------------------")
self.log("BrokerId=%s , Data=%s" % (brokerId, data))
- self.lock.acquire()
+ self.lock()
try:
del self.connections[brokerId]
+
+ if (self.connCloseListener != None):
+ self.connCloseListener(brokerId, data)
finally:
- self.lock.release()
+ self.unlock()
- if (self.connCloseListener != None):
- self.connCloseListener(brokerId, data)
self.log("END CLOSE---------------------------------------------------\n")
return
@@ -583,19 +621,21 @@
return
def registerCallback(self, callback):
- self.currentMethodId += 1
- methodId = self.currentMethodId
- self.outstandingMethodCalls[methodId] = callback
- return methodId
+ self.lock()
+ try:
+ self.currentMethodId += 1
+ methodId = self.currentMethodId
+ self.outstandingMethodCalls[methodId] = callback
+ return methodId
+ finally:
+ self.unlock()
def getConnectionByRegistration(self, reg):
- result = None
- self.lock.acquire()
+ self.lock()
try:
- result = self.connections.get("%s:%i" % (reg.host, reg.port))
+ return self.connections.get("%s:%i" % (reg.host, reg.port))
finally:
- self.lock.release()
- return result
+ self.unlock()
class MintDatabase(object):
def __init__(self, uri):
More information about the rhmessaging-commits
mailing list