[rhmessaging-commits] rhmessaging commits: r2155 - mgmt/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jun 16 23:47:04 EDT 2008


Author: justi9
Date: 2008-06-16 23:47:04 -0400 (Mon, 16 Jun 2008)
New Revision: 2155

Modified:
   mgmt/mint/python/mint/__init__.py
Log:
Protect shared model data

Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py	2008-06-17 02:27:33 UTC (rev 2154)
+++ mgmt/mint/python/mint/__init__.py	2008-06-17 03:47:04 UTC (rev 2155)
@@ -5,8 +5,8 @@
 from qpid.management import managementChannel, managementClient
 from datetime import *
 from sqlobject import *
-from threading import Lock
-from traceback import print_exc
+from threading import Lock, RLock
+from traceback import print_exc, print_stack
 from mint import schema
 
 log = logging.getLogger("mint")
@@ -233,7 +233,7 @@
                                     self.model.closeCallback)
     self.mclient.schemaListener(self.model.schemaCallback)
 
-    self.model.lock.acquire()
+    self.model.lock()
     try:
       try:
         # XXX I want this to happen after broker start, but the
@@ -250,7 +250,7 @@
         self.exception = e
         raise e
     finally:
-      self.model.lock.release()
+      self.model.unlock()
 
   def getSessionId(self):
     if not self.isOpen():
@@ -262,13 +262,13 @@
     if not self.isOpen():
       raise Exception("Connection not open")
 
-    self.model.lock.acquire()
+    self.model.lock()
     try:
       self.model.currentMethodId += 1
       seq = self.model.currentMethodId
       self.model.outstandingMethodCalls[seq] = callback
     finally:
-      self.model.lock.release()
+      self.model.unlock()
 
     self.mclient.callMethod(self.mchan, seq, objId,
                             className, methodName, args)
@@ -276,7 +276,7 @@
   def close(self):
     self.state = "closing"
 
-    self.model.lock.acquire()
+    self.model.lock()
     try:
       try:
         self.mclient.removeChannel(self.mchan)
@@ -288,7 +288,7 @@
         self.exception = e
         raise e
     finally:
-      self.model.lock.release()
+      self.model.unlock()
 
     self.conn.close()
     # XXX What else do I need to try to shutdown here?
@@ -298,23 +298,19 @@
     self.mchan = None
 
   def getByOriginalId(self, objType, idOriginal, managedBroker, create=False, args={}):
-    result = None
-    self.model.lock.acquire()
+    self.model.lock()
     try:
-      result = self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
+      return self.objs.getByOriginalId(objType, idOriginal, managedBroker, create, args)
     finally:
-      self.model.lock.release()
-    return result
+      self.model.unlock()
 
   def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent,
                        create=False, args={}):
-    result = None
-    self.model.lock.acquire()
+    self.model.lock()
     try:
-      result = self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
+      return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
     finally:
-      self.model.lock.release()
-    return result
+      self.model.unlock()
 
 class MintModel:
   staticInstance = None
@@ -328,7 +324,7 @@
     self.outstandingMethodCalls = dict()
     self.connections = dict()
     self.connCloseListener = None
-    self.lock = Lock()
+    self.__lock = RLock()
 
     assert MintModel.staticInstance is None
     MintModel.staticInstance = self
@@ -336,6 +332,13 @@
     if self.debug:
       log.setLevel(logging.DEBUG)
 
+  def lock(self):
+    #print_stack()
+    self.__lock.acquire()
+
+  def unlock(self):
+    self.__lock.release()
+
   def check(self):
     try:
       connectionForURI(self.dataUri)
@@ -398,9 +401,15 @@
     self.log("\nSCHEMA---------------------------------------------------")
     objectName = self.fixClassInfo(classInfo)
     self.log("BrokerId=%s , objectName=%s" % (brokerId, objectName))
-    cls = schema.schemaNameToClassMap.get(objectName)
-    if cls:
-      cls.classInfos[brokerId] = classInfo
+
+    self.lock()
+    try:
+      cls = schema.schemaNameToClassMap.get(objectName)
+      if cls:
+        cls.classInfos[brokerId] = classInfo
+    finally:
+      self.unlock()
+
     self.log("\nEND SCHEMA---------------------------------------------------")
 
   def configCallback(self, brokerId, classInfo, list, timestamps):
@@ -410,11 +419,11 @@
     self.log(objectName)
     d = self.sanitizeDict(dict(list))
 
-    self.lock.acquire()
+    self.lock()
     try:
       conn = self.connections[brokerId]
     finally:
-      self.lock.release()
+      self.unlock()
 
     d["managedBroker"] = brokerId
     d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
@@ -423,20 +432,29 @@
     try:
       for parentKey in self.findParentKeys(d):
         convertedKey = self.convertRefKey(parentKey)
-        cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
-        if cls:
-          d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
-        else:
-          self.log("Error: referenced class not found: %s" % convertedKey)
 
+        self.lock()
+        try:
+          cls = schema.schemaNameToClassMap.get(self.initialCapital(convertedKey))
+          if cls:
+            d[convertedKey] = conn.getByOriginalId(cls, d.pop(parentKey), brokerId)
+          else:
+            self.log("Error: referenced class not found: %s" % convertedKey)
+        finally:
+          self.unlock()
+
       obj = conn.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], brokerId, create=True)
       if (not obj):
         self.log("Couldn't find type %s id %s" % (objectName, d["idOriginal"]))
         return 
 
-      obj = self.updateObjWithDict(obj, d)
-      if (not obj):
-        return
+      self.lock()
+      try:
+        obj = self.updateObjWithDict(obj, d)
+        if (not obj):
+          return
+      finally:
+        self.unlock()
 
     except TypeError, detail:
       self.log("TypeError: Schema mismatch: %s" % detail)
@@ -455,11 +473,11 @@
     self.log(objectName)
     d = self.sanitizeDict(dict(list))
 
-    self.lock.acquire()
+    self.lock()
     try:
       conn = self.connections[brokerId]
     finally:
-      self.lock.release()
+      self.unlock()
 
     d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
     self.log(d)
@@ -473,27 +491,40 @@
       
       origObjName = objectName[0].lower() + objectName[1:]
       d[origObjName] = obj
-      objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
-      objStats = objNameStats.__new__(objNameStats)
-      objStats.__init__()
+
+      self.lock()
+      try:
+        objNameStats = eval("schema.%sStats" % (schema.schemaNameToClassMap[objectName].__name__))
+        objStats = objNameStats.__new__(objNameStats)
+        objStats.__init__()
+      finally:
+        self.unlock()
       
       if (not objStats):
         self.log("Couldn't find type %s id %s" % (objNameStats, d[self.convertIdKey("id")]))
         return
-      
-      objStats = self.updateObjWithDict(objStats, d)
-      if (not objStats):
-        return
 
+      self.lock()
+      try:
+        objStats = self.updateObjWithDict(objStats, d)
+        if (not objStats):
+          return
+      finally:
+        self.unlock()
+
       d = dict()
       d["statsPrev"] = obj.statsCurr
       d["statsCurr"] = objStats
       if (timestamps[2] != 0):
         d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
 
-      obj = self.updateObjWithDict(obj, d)
-      if (not obj):
-        return
+      self.lock()
+      try:
+        obj = self.updateObjWithDict(obj, d)
+        if (not obj):
+          return
+      finally:
+        self.unlock()
 
     except TypeError, detail:
       self.log("TypeError: Schema mismatch: %s" % detail)
@@ -547,8 +578,14 @@
     self.log("Error: %d %s" % (errorNo, errorText))
     self.log("Args: ")
     self.log(args)
-    method = self.outstandingMethodCalls.pop(methodId)
-    result = method(errorText, args)
+
+    self.lock()
+    try:
+      method = self.outstandingMethodCalls.pop(methodId)
+      result = method(errorText, args)
+    finally:
+      self.unlock()
+
     self.log("END METHOD---------------------------------------------------\n")
     return result
   
@@ -556,14 +593,15 @@
     self.log("\nCLOSE---------------------------------------------------")
     self.log("BrokerId=%s , Data=%s" % (brokerId, data))
 
-    self.lock.acquire()
+    self.lock()
     try:
       del self.connections[brokerId]
+
+      if (self.connCloseListener != None):
+        self.connCloseListener(brokerId, data)
     finally:
-      self.lock.release()
+      self.unlock()
 
-    if (self.connCloseListener != None):
-      self.connCloseListener(brokerId, data)
     self.log("END CLOSE---------------------------------------------------\n")
     return
     
@@ -583,19 +621,21 @@
     return
 
   def registerCallback(self, callback):
-    self.currentMethodId += 1
-    methodId = self.currentMethodId
-    self.outstandingMethodCalls[methodId] = callback
-    return methodId
+    self.lock()
+    try:
+      self.currentMethodId += 1
+      methodId = self.currentMethodId
+      self.outstandingMethodCalls[methodId] = callback
+      return methodId
+    finally:
+      self.unlock()
 
   def getConnectionByRegistration(self, reg):
-    result = None
-    self.lock.acquire()
+    self.lock()
     try:
-      result = self.connections.get("%s:%i" % (reg.host, reg.port))
+      return self.connections.get("%s:%i" % (reg.host, reg.port))
     finally:
-      self.lock.release()
-    return result
+      self.unlock()
 
 class MintDatabase(object):
   def __init__(self, uri):




More information about the rhmessaging-commits mailing list