[rhmessaging-commits] rhmessaging commits: r2844 - mgmt/trunk/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Nov 19 12:21:40 EST 2008


Author: nunofsantos
Date: 2008-11-19 12:21:39 -0500 (Wed, 19 Nov 2008)
New Revision: 2844

Modified:
   mgmt/trunk/mint/python/mint/__init__.py
   mgmt/trunk/mint/python/mint/update.py
Log:
more database optimizations, namely by connection reuse, and better caching of qmfId-to-dbId mappings

Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py	2008-11-19 16:40:42 UTC (rev 2843)
+++ mgmt/trunk/mint/python/mint/__init__.py	2008-11-19 17:21:39 UTC (rev 2844)
@@ -341,30 +341,26 @@
   def setCloseListener(self, connCloseListener):
     self.connCloseListener = connCloseListener
 
-  def getObjectId(self, cls, id):
-    if isinstance(id, qpid.qmfconsole.ObjectId):
-      first = id.first
-      second = id.second
-      dbId = None
-      if (first, second) in self.qmfIdToDbIdMap:
-        dbId = self.qmfIdToDbIdMap[(first, second)]
-      else:
-        try:
-          try:
-            conn = self.dbConn.getConnection()
-            cursor = conn.cursor()
-            cursor.execute("select id from %s where source_scope_id = %s and source_object_id = %s" \
-                             % (self.dbStyle.pythonClassToDBTable(cls.__name__), first, second));
-            rec = cursor.fetchone()
-            dbId = rec[0]
-            self.qmfIdToDbIdMap[(first, second)] = dbId
-          except Exception:
-            raise ObjectNotFound()
-        finally:
-          conn.close()
-      return dbId
-    else:
-      raise ObjectNotFound()
+#  def getObjectId(self, cls, id, conn):
+#    if isinstance(id, qpid.qmfconsole.ObjectId):
+#      first = id.first
+#      second = id.second
+#      dbId = None
+#      if (first, second) in self.qmfIdToDbIdMap:
+#        dbId = self.qmfIdToDbIdMap[(first, second)]
+#      else:
+#        try:
+#          cursor = conn.cursor()
+#          cursor.execute("select id from %s where source_scope_id = %s and source_object_id = %s" \
+#                           % (self.dbStyle.pythonClassToDBTable(cls.__name__), first, second));
+#          rec = cursor.fetchone()
+#          dbId = rec[0]
+#          self.qmfIdToDbIdMap[(first, second)] = dbId
+#        except Exception:
+#          raise ObjectNotFound()
+#      return dbId
+#    else:
+#      raise ObjectNotFound()
 
   def __pythonValueToDB(self, key, value):
     if key == "qmfClassKey":

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2008-11-19 16:40:42 UTC (rev 2843)
+++ mgmt/trunk/mint/python/mint/update.py	2008-11-19 17:21:39 UTC (rev 2844)
@@ -38,6 +38,7 @@
       pass
 
   def run(self):
+    conn = self.model.dbConn.getConnection()
     while True:
       try:
         priority, update = self.updates.get(True, 1)
@@ -54,8 +55,10 @@
           continue
 
       try:
-        update.process(self.model)
+        update.process(self.model, conn)
+        conn.commit()
       except:
+        conn.rollback()  
         log.exception("Update failed")
         pass
 
@@ -70,10 +73,10 @@
   def getStatsClass(self, cls):
     return cls + "Stats"
   
-  def process(self):
+  def process(self, model, conn):
     pass
 
-  def processAttributes(self, attrs, cls, model):
+  def processAttributes(self, attrs, cls, model, conn):
     results = {}
     orphan = False
 
@@ -82,45 +85,39 @@
       if name in mint.schema.schemaReservedWordsMap:
         name = mint.schema.schemaReservedWordsMap.get(name)
       
-      try:  
-        if not hasattr(getattr(mint, cls), name):
-          # Discard attrs that we don't have in our schema
-          log.debug("Class '%s' has no field '%s'" % ("mint.schema." + cls, name))
-        elif key.type == 10:
-          # Navigate to referenced objects
-          if name.endswith("Ref"):
-            name = name[:-3]
-          className = name[0].upper() + name[1:]
-          otherClass = getattr(mint, className, None)
-          if otherClass:
-            foreignKey = name + "_id"
-            try:
-              results[foreignKey] = model.getObjectId(otherClass, value)
-            except KeyError:
-              log.info("Referenced object %s '%s' not found by key '%s'" % (className, value, foreignKey))
-            except mint.ObjectNotFound:
-              if not orphan:
-                log.info("Referenced object %s '%s' not found, deferring creation of orphan object" % (className, value))
-                # store object in orphan map, will be picked up later when parent info is received
-                if (className, value.first, value.second) not in model.orphanObjectMap:
-                  model.orphanObjectMap[(className, value.first, value.second)] = set()
-                model.orphanObjectMap[(className, value.first, value.second)].add(self)
-                orphan = True
-          else:
-            log.error("Class '%s' not found" % className)
-        elif key.type == 8:
-          # convert ABSTIME types
-          if value:
-            results[name] = time_unwarp(datetime.fromtimestamp(value/1000000000))
-          else:
-            results[name] = None
-        elif key.type == 14:
-          # convert UUIDs into their string representation, to be handled by sqlobject
-          results[name] = str(value)
+      if not hasattr(getattr(mint, cls), name):
+        # Discard attrs that we don't have in our schema
+        log.debug("Class '%s' has no field '%s'" % ("mint.schema." + cls, name))
+      elif key.type == 10:
+        # Navigate to referenced objects
+        if name.endswith("Ref"):
+          name = name[:-3]
+        className = name[0].upper() + name[1:]
+        otherClass = getattr(mint, className, None)
+        if otherClass:
+          foreignKey = name + "_id"
+          if (value.first, value.second) in model.qmfIdToDbIdMap:
+            results[foreignKey] = model.qmfIdToDbIdMap[(value.first, value.second)]
+          elif not orphan:
+            log.info("Referenced object %s '%s' not found, deferring creation of orphan object" % (className, value))
+            # store object in orphan map, will be picked up later when parent info is received
+            if (className, value.first, value.second) not in model.orphanObjectMap:
+              model.orphanObjectMap[(className, value.first, value.second)] = set()
+            model.orphanObjectMap[(className, value.first, value.second)].add(self)
+            orphan = True
         else:
-          results[name] = value
-      except Exception:
-        continue
+          log.error("Class '%s' not found" % className)
+      elif key.type == 8:
+        # convert ABSTIME types
+        if value:
+          results[name] = time_unwarp(datetime.fromtimestamp(value/1000000000))
+        else:
+          results[name] = None
+      elif key.type == 14:
+        # convert UUIDs into their string representation, to be handled by sqlobject
+        results[name] = str(value)
+      else:
+        results[name] = value
     if orphan:
       return None
     else:
@@ -130,165 +127,127 @@
   def __init__(self, broker, obj):
     ModelUpdate.__init__(self, broker, obj)
 
-  def process(self, model):
-    try:
-      pkg, cls, hash = self.qmfObj.getClassKey()
-      origCls = cls
-      if cls in mint.schema.schemaReservedWordsMap:
-        cls = mint.schema.schemaReservedWordsMap.get(cls)
-      cls = cls[0].upper()+cls[1:]  
-      sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+  def process(self, model, conn):
+    pkg, cls, hash = self.qmfObj.getClassKey()
+    origCls = cls
+    if cls in mint.schema.schemaReservedWordsMap:
+      cls = mint.schema.schemaReservedWordsMap.get(cls)
+    cls = cls[0].upper()+cls[1:]  
+    sqlCls = model.dbStyle.pythonClassToDBTable(cls)
 
-      if not hasattr(mint, cls):
-        # Discard classes that we don't have in our schema
-        log.debug("Class '%s' is not in the schema" % (cls))
-        return  
+    if not hasattr(mint, cls):
+      # Discard classes that we don't have in our schema
+      log.debug("Class '%s' is not in the schema" % (cls))
+      return  
 
-      properties = self.qmfObj.getProperties()
-      timestamps = self.qmfObj.getTimestamps()
-      id = self.qmfObj.getObjectId()
+    properties = self.qmfObj.getProperties()
+    timestamps = self.qmfObj.getTimestamps()
+    id = self.qmfObj.getObjectId()
 
-      attrs = self.processAttributes(properties, cls, model)
-      if attrs == None:
-        # object is orphan, a parent dependency was not found; 
-        # insertion in db is deferred until parent info is received
-        return 
+    attrs = self.processAttributes(properties, cls, model, conn)
+    if attrs == None:
+      # object is orphan, a parent dependency was not found; 
+      # insertion in db is deferred until parent info is received
+      return 
 
-      attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
-      attrs["creationTime"] = time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
-      if timestamps[2] != 0:
-        attrs["deletionTime"] = time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
-        log.debug("%s(%s) marked deleted", cls, id)
+    attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
+    attrs["creationTime"] = time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
+    if timestamps[2] != 0:
+      attrs["deletionTime"] = time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
+      log.debug("%s(%s) marked deleted", cls, id)
 
-      attrs["sourceScopeId"] = id.first
-      attrs["sourceObjectId"] = id.second
-      attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, origCls, hash)
-      attrs["managedBroker"] = str(self.broker.getBrokerId())
+    attrs["sourceScopeId"] = id.first
+    attrs["sourceObjectId"] = id.second
+    attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, origCls, hash)
+    attrs["managedBroker"] = str(self.broker.getBrokerId())
 
-      try:
-        try:
-          conn = model.dbConn.getConnection()
-          cursor = conn.cursor()
-          sql = model.generateSQLUpdate(sqlCls, attrs, id)
+    cursor = conn.cursor()
+    sql = model.generateSQLUpdate(sqlCls, attrs, id)
+    #log.debug("SQL: %s", sql)
+    cursor.execute(sql)
+    if cursor.rowcount == 0:
+      # update failed, need to insert  
+      sql = model.generateSQLInsert(sqlCls, attrs)
+      #log.debug("SQL: %s", sql)
+      cursor.execute(sql)
+      log.debug("%s(%s) created", cls, id)
+
+      if (id.first, id.second) not in model.qmfIdToDbIdMap:
+        sql = " ; select currval('%s_id_seq')" % (sqlCls)
+        #log.debug("SQL: %s", sql)
+        cursor.execute(sql)
+        rec = cursor.fetchone()
+        dbId = rec[0]
+        model.qmfIdToDbIdMap[(id.first, id.second)] = dbId
+
+      if cls == "Broker" and str(self.broker.getBrokerId()) in model.managedBrokers:
+        broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
+        if dbObjId == 0: 
+          sql = model.generateSQLUpdateWhere("broker_registration", \
+                  {"broker_id": model.qmfIdToDbIdMap[(id.first, id.second)]}, \
+                  {"url": self.broker.getFullUrl()})
           #log.debug("SQL: %s", sql)
           cursor.execute(sql)
-          if cursor.rowcount == 0:
-            sql = model.generateSQLInsert(sqlCls, attrs)
-            #log.debug("SQL: %s", sql)
-            cursor.execute(sql)
-            log.debug("%s(%s) created", cls, id)
-          conn.commit()
-        except Exception, e:
-          conn.rollback()
-          print e
-          print_exc()
-      finally:
-        conn.close()
+          model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbId)
 
-      if (cls, id.first, id.second) in model.orphanObjectMap:
-        # this object is the parent of orphan objects in the map, re-enqueue for insertion
-        orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
-        for orphanObj in orphanObjects:
-          model.updateThread.enqueue(orphanObj)
-        log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
+    if (cls, id.first, id.second) in model.orphanObjectMap:
+      # this object is the parent of orphan objects in the map, re-enqueue for insertion
+      orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
+      for orphanObj in orphanObjects:
+        model.updateThread.enqueue(orphanObj)
+      log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
 
-      if cls == "Broker":
-        if str(self.broker.getBrokerId()) in model.managedBrokers:
-          broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
-          if dbObjId == 0: 
-            try:
-              try:  
-                conn = model.dbConn.getConnection()
-                cursor = conn.cursor()
-                sql = model.generateSQLSelectWhere(sqlCls, {"managed_broker": str(self.broker.getBrokerId())})
-                #log.debug("SQL: %s", sql)
-                cursor.execute(sql)
-                rec = cursor.fetchone()
-                dbObjId = rec[0]  
 
-                ###cursor = conn.cursor()
-                sql = model.generateSQLUpdateWhere("broker_registration", {"broker_id": dbObjId}, \
-                                                       {"url": self.broker.getFullUrl()})
-                #log.debug("SQL: %s", sql)
-                cursor.execute(sql)
-                conn.commit()
-                model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbObjId)
-              except Exception, e:
-                conn.rollback()
-                print e
-                print_exc()
-            finally:
-              conn.close()
-    except:
-      print_exc()
-
 class StatisticUpdate(ModelUpdate):
   def __init__(self, broker, obj):
     ModelUpdate.__init__(self, broker, obj)
 
-  def process(self, model):
-    try:
-      pkg, cls, hash = self.qmfObj.getClassKey()
+  def process(self, model, conn):
+    pkg, cls, hash = self.qmfObj.getClassKey()
 
-      origCls = cls
-      if cls in mint.schema.schemaReservedWordsMap:
-        cls = mint.schema.schemaReservedWordsMap.get(cls)
-      cls = cls[0].upper()+cls[1:]  
-      sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+    origCls = cls
+    if cls in mint.schema.schemaReservedWordsMap:
+      cls = mint.schema.schemaReservedWordsMap.get(cls)
+    cls = cls[0].upper()+cls[1:]  
+    sqlCls = model.dbStyle.pythonClassToDBTable(cls)
 
-      if not hasattr(mint, cls):
-        # Discard classes that we don't have in our schema
-        log.debug("Class '%s' is not in the schema" % (cls))
-        return  
+    if not hasattr(mint, cls):
+      # Discard classes that we don't have in our schema
+      log.debug("Class '%s' is not in the schema" % (cls))
+      return  
 
-      statistics = self.qmfObj.getStatistics()
-      timestamps = self.qmfObj.getTimestamps()
-      id = self.qmfObj.getObjectId()
- 
-      statsCls = self.getStatsClass(cls)
-      sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
+    statistics = self.qmfObj.getStatistics()
+    timestamps = self.qmfObj.getTimestamps()
+    id = self.qmfObj.getObjectId()
 
-      attrs = self.processAttributes(statistics, statsCls, model)
-      if attrs == None:
-        # object is orphan, a parent dependency was not found; 
-        # insertion in db is deferred until parent info is received
-        return 
+    statsCls = self.getStatsClass(cls)
+    sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
 
-      attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
+    attrs = self.processAttributes(statistics, statsCls, model, conn)
+    if attrs == None:
+      # object is orphan, a parent dependency was not found; 
+      # insertion in db is deferred until parent info is received
+      return 
 
-      try:
-        try:
-          conn = model.dbConn.getConnection()
-          cursor = conn.cursor()
-          subSql = model.generateSQLSelect(sqlCls, id)
+    attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
 
-          mintCls = getattr(mint, cls)
-          attrs[sqlCls + "_id"] = model.getObjectId(mintCls, id)
-          sql = model.generateSQLInsert(sqlStatsCls, attrs)
+    cursor = conn.cursor()
+    subSql = model.generateSQLSelect(sqlCls, id)
+    sql = model.generateSQLInsert(sqlStatsCls, attrs, {sqlCls + "_id": subSql})
+    #log.debug("SQL: %s", sql)
+    cursor.execute(sql)
 
-          #log.debug("SQL: %s", sql)
-          cursor.execute(sql)
+    sql = "select currval('%s_id_seq')" % (sqlStatsCls)
+    #log.debug("SQL: %s", sql)
+    cursor.execute(sql)
+    rec = cursor.fetchone()
+    dbStatsId = rec[0]  
+    log.debug("%s(%s) created", statsCls, id)
 
-          sql = "select currval('%s_id_seq')" % (sqlStatsCls)
-          #log.debug("SQL: %s", sql)
-          cursor.execute(sql)
-          rec = cursor.fetchone()
-          dbStatsId = rec[0]  
-          log.debug("%s(%s) created", statsCls, id)
+    sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id, updateStats=True)
+    #log.debug("SQL: %s", sql)
+    cursor.execute(sql)
 
-          sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id, updateStats=True)
-          #log.debug("SQL: %s", sql)
-          cursor.execute(sql)
-          conn.commit()
-        except Exception, e:
-          conn.rollback()
-          print e
-          print_exc()
-      finally:
-        conn.close()
-    except:
-      print_exc()
-
 class MethodUpdate(ModelUpdate):
   def __init__(self, broker, seq, response):
     ModelUpdate.__init__(self, broker, response)




More information about the rhmessaging-commits mailing list