Author: nunofsantos
Date: 2008-11-19 12:21:39 -0500 (Wed, 19 Nov 2008)
New Revision: 2844
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/update.py
Log:
more database optimizations, namely by connection reuse, and better caching of
qmfId-to-dbId mappings
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-11-19 16:40:42 UTC (rev 2843)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-11-19 17:21:39 UTC (rev 2844)
@@ -341,30 +341,26 @@
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def getObjectId(self, cls, id):
- if isinstance(id, qpid.qmfconsole.ObjectId):
- first = id.first
- second = id.second
- dbId = None
- if (first, second) in self.qmfIdToDbIdMap:
- dbId = self.qmfIdToDbIdMap[(first, second)]
- else:
- try:
- try:
- conn = self.dbConn.getConnection()
- cursor = conn.cursor()
- cursor.execute("select id from %s where source_scope_id = %s and
source_object_id = %s" \
- % (self.dbStyle.pythonClassToDBTable(cls.__name__), first,
second));
- rec = cursor.fetchone()
- dbId = rec[0]
- self.qmfIdToDbIdMap[(first, second)] = dbId
- except Exception:
- raise ObjectNotFound()
- finally:
- conn.close()
- return dbId
- else:
- raise ObjectNotFound()
+# def getObjectId(self, cls, id, conn):
+# if isinstance(id, qpid.qmfconsole.ObjectId):
+# first = id.first
+# second = id.second
+# dbId = None
+# if (first, second) in self.qmfIdToDbIdMap:
+# dbId = self.qmfIdToDbIdMap[(first, second)]
+# else:
+# try:
+# cursor = conn.cursor()
+# cursor.execute("select id from %s where source_scope_id = %s and
source_object_id = %s" \
+# % (self.dbStyle.pythonClassToDBTable(cls.__name__), first,
second));
+# rec = cursor.fetchone()
+# dbId = rec[0]
+# self.qmfIdToDbIdMap[(first, second)] = dbId
+# except Exception:
+# raise ObjectNotFound()
+# return dbId
+# else:
+# raise ObjectNotFound()
def __pythonValueToDB(self, key, value):
if key == "qmfClassKey":
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-11-19 16:40:42 UTC (rev 2843)
+++ mgmt/trunk/mint/python/mint/update.py 2008-11-19 17:21:39 UTC (rev 2844)
@@ -38,6 +38,7 @@
pass
def run(self):
+ conn = self.model.dbConn.getConnection()
while True:
try:
priority, update = self.updates.get(True, 1)
@@ -54,8 +55,10 @@
continue
try:
- update.process(self.model)
+ update.process(self.model, conn)
+ conn.commit()
except:
+ conn.rollback()
log.exception("Update failed")
pass
@@ -70,10 +73,10 @@
def getStatsClass(self, cls):
return cls + "Stats"
- def process(self):
+ def process(self, model, conn):
pass
- def processAttributes(self, attrs, cls, model):
+ def processAttributes(self, attrs, cls, model, conn):
results = {}
orphan = False
@@ -82,45 +85,39 @@
if name in mint.schema.schemaReservedWordsMap:
name = mint.schema.schemaReservedWordsMap.get(name)
- try:
- if not hasattr(getattr(mint, cls), name):
- # Discard attrs that we don't have in our schema
- log.debug("Class '%s' has no field '%s'" %
("mint.schema." + cls, name))
- elif key.type == 10:
- # Navigate to referenced objects
- if name.endswith("Ref"):
- name = name[:-3]
- className = name[0].upper() + name[1:]
- otherClass = getattr(mint, className, None)
- if otherClass:
- foreignKey = name + "_id"
- try:
- results[foreignKey] = model.getObjectId(otherClass, value)
- except KeyError:
- log.info("Referenced object %s '%s' not found by key
'%s'" % (className, value, foreignKey))
- except mint.ObjectNotFound:
- if not orphan:
- log.info("Referenced object %s '%s' not found, deferring
creation of orphan object" % (className, value))
- # store object in orphan map, will be picked up later when parent info is
received
- if (className, value.first, value.second) not in model.orphanObjectMap:
- model.orphanObjectMap[(className, value.first, value.second)] = set()
- model.orphanObjectMap[(className, value.first, value.second)].add(self)
- orphan = True
- else:
- log.error("Class '%s' not found" % className)
- elif key.type == 8:
- # convert ABSTIME types
- if value:
- results[name] = time_unwarp(datetime.fromtimestamp(value/1000000000))
- else:
- results[name] = None
- elif key.type == 14:
- # convert UUIDs into their string representation, to be handled by sqlobject
- results[name] = str(value)
+ if not hasattr(getattr(mint, cls), name):
+ # Discard attrs that we don't have in our schema
+ log.debug("Class '%s' has no field '%s'" %
("mint.schema." + cls, name))
+ elif key.type == 10:
+ # Navigate to referenced objects
+ if name.endswith("Ref"):
+ name = name[:-3]
+ className = name[0].upper() + name[1:]
+ otherClass = getattr(mint, className, None)
+ if otherClass:
+ foreignKey = name + "_id"
+ if (value.first, value.second) in model.qmfIdToDbIdMap:
+ results[foreignKey] = model.qmfIdToDbIdMap[(value.first, value.second)]
+ elif not orphan:
+ log.info("Referenced object %s '%s' not found, deferring
creation of orphan object" % (className, value))
+ # store object in orphan map, will be picked up later when parent info is
received
+ if (className, value.first, value.second) not in model.orphanObjectMap:
+ model.orphanObjectMap[(className, value.first, value.second)] = set()
+ model.orphanObjectMap[(className, value.first, value.second)].add(self)
+ orphan = True
else:
- results[name] = value
- except Exception:
- continue
+ log.error("Class '%s' not found" % className)
+ elif key.type == 8:
+ # convert ABSTIME types
+ if value:
+ results[name] = time_unwarp(datetime.fromtimestamp(value/1000000000))
+ else:
+ results[name] = None
+ elif key.type == 14:
+ # convert UUIDs into their string representation, to be handled by sqlobject
+ results[name] = str(value)
+ else:
+ results[name] = value
if orphan:
return None
else:
@@ -130,165 +127,127 @@
def __init__(self, broker, obj):
ModelUpdate.__init__(self, broker, obj)
- def process(self, model):
- try:
- pkg, cls, hash = self.qmfObj.getClassKey()
- origCls = cls
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = cls[0].upper()+cls[1:]
- sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+ def process(self, model, conn):
+ pkg, cls, hash = self.qmfObj.getClassKey()
+ origCls = cls
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = cls[0].upper()+cls[1:]
+ sqlCls = model.dbStyle.pythonClassToDBTable(cls)
- if not hasattr(mint, cls):
- # Discard classes that we don't have in our schema
- log.debug("Class '%s' is not in the schema" % (cls))
- return
+ if not hasattr(mint, cls):
+ # Discard classes that we don't have in our schema
+ log.debug("Class '%s' is not in the schema" % (cls))
+ return
- properties = self.qmfObj.getProperties()
- timestamps = self.qmfObj.getTimestamps()
- id = self.qmfObj.getObjectId()
+ properties = self.qmfObj.getProperties()
+ timestamps = self.qmfObj.getTimestamps()
+ id = self.qmfObj.getObjectId()
- attrs = self.processAttributes(properties, cls, model)
- if attrs == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ attrs = self.processAttributes(properties, cls, model, conn)
+ if attrs == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
- attrs["recTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
- attrs["creationTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
- if timestamps[2] != 0:
- attrs["deletionTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
- log.debug("%s(%s) marked deleted", cls, id)
+ attrs["recTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
+ attrs["creationTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
+ if timestamps[2] != 0:
+ attrs["deletionTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
+ log.debug("%s(%s) marked deleted", cls, id)
- attrs["sourceScopeId"] = id.first
- attrs["sourceObjectId"] = id.second
- attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, origCls, hash)
- attrs["managedBroker"] = str(self.broker.getBrokerId())
+ attrs["sourceScopeId"] = id.first
+ attrs["sourceObjectId"] = id.second
+ attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, origCls, hash)
+ attrs["managedBroker"] = str(self.broker.getBrokerId())
- try:
- try:
- conn = model.dbConn.getConnection()
- cursor = conn.cursor()
- sql = model.generateSQLUpdate(sqlCls, attrs, id)
+ cursor = conn.cursor()
+ sql = model.generateSQLUpdate(sqlCls, attrs, id)
+ #log.debug("SQL: %s", sql)
+ cursor.execute(sql)
+ if cursor.rowcount == 0:
+ # update failed, need to insert
+ sql = model.generateSQLInsert(sqlCls, attrs)
+ #log.debug("SQL: %s", sql)
+ cursor.execute(sql)
+ log.debug("%s(%s) created", cls, id)
+
+ if (id.first, id.second) not in model.qmfIdToDbIdMap:
+ sql = " ; select currval('%s_id_seq')" % (sqlCls)
+ #log.debug("SQL: %s", sql)
+ cursor.execute(sql)
+ rec = cursor.fetchone()
+ dbId = rec[0]
+ model.qmfIdToDbIdMap[(id.first, id.second)] = dbId
+
+ if cls == "Broker" and str(self.broker.getBrokerId()) in
model.managedBrokers:
+ broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
+ if dbObjId == 0:
+ sql = model.generateSQLUpdateWhere("broker_registration", \
+ {"broker_id": model.qmfIdToDbIdMap[(id.first, id.second)]},
\
+ {"url": self.broker.getFullUrl()})
#log.debug("SQL: %s", sql)
cursor.execute(sql)
- if cursor.rowcount == 0:
- sql = model.generateSQLInsert(sqlCls, attrs)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- log.debug("%s(%s) created", cls, id)
- conn.commit()
- except Exception, e:
- conn.rollback()
- print e
- print_exc()
- finally:
- conn.close()
+ model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbId)
- if (cls, id.first, id.second) in model.orphanObjectMap:
- # this object is the parent of orphan objects in the map, re-enqueue for
insertion
- orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
- for orphanObj in orphanObjects:
- model.updateThread.enqueue(orphanObj)
- log.info("Inserted %d orphan objects whose creation had been deferred"
% (len(orphanObjects)))
+ if (cls, id.first, id.second) in model.orphanObjectMap:
+ # this object is the parent of orphan objects in the map, re-enqueue for insertion
+ orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
+ for orphanObj in orphanObjects:
+ model.updateThread.enqueue(orphanObj)
+ log.info("Inserted %d orphan objects whose creation had been deferred" %
(len(orphanObjects)))
- if cls == "Broker":
- if str(self.broker.getBrokerId()) in model.managedBrokers:
- broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
- if dbObjId == 0:
- try:
- try:
- conn = model.dbConn.getConnection()
- cursor = conn.cursor()
- sql = model.generateSQLSelectWhere(sqlCls, {"managed_broker":
str(self.broker.getBrokerId())})
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- rec = cursor.fetchone()
- dbObjId = rec[0]
- ###cursor = conn.cursor()
- sql = model.generateSQLUpdateWhere("broker_registration",
{"broker_id": dbObjId}, \
- {"url":
self.broker.getFullUrl()})
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- conn.commit()
- model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbObjId)
- except Exception, e:
- conn.rollback()
- print e
- print_exc()
- finally:
- conn.close()
- except:
- print_exc()
-
class StatisticUpdate(ModelUpdate):
def __init__(self, broker, obj):
ModelUpdate.__init__(self, broker, obj)
- def process(self, model):
- try:
- pkg, cls, hash = self.qmfObj.getClassKey()
+ def process(self, model, conn):
+ pkg, cls, hash = self.qmfObj.getClassKey()
- origCls = cls
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = cls[0].upper()+cls[1:]
- sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+ origCls = cls
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = cls[0].upper()+cls[1:]
+ sqlCls = model.dbStyle.pythonClassToDBTable(cls)
- if not hasattr(mint, cls):
- # Discard classes that we don't have in our schema
- log.debug("Class '%s' is not in the schema" % (cls))
- return
+ if not hasattr(mint, cls):
+ # Discard classes that we don't have in our schema
+ log.debug("Class '%s' is not in the schema" % (cls))
+ return
- statistics = self.qmfObj.getStatistics()
- timestamps = self.qmfObj.getTimestamps()
- id = self.qmfObj.getObjectId()
-
- statsCls = self.getStatsClass(cls)
- sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
+ statistics = self.qmfObj.getStatistics()
+ timestamps = self.qmfObj.getTimestamps()
+ id = self.qmfObj.getObjectId()
- attrs = self.processAttributes(statistics, statsCls, model)
- if attrs == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ statsCls = self.getStatsClass(cls)
+ sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
- attrs["recTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
+ attrs = self.processAttributes(statistics, statsCls, model, conn)
+ if attrs == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
- try:
- try:
- conn = model.dbConn.getConnection()
- cursor = conn.cursor()
- subSql = model.generateSQLSelect(sqlCls, id)
+ attrs["recTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
- mintCls = getattr(mint, cls)
- attrs[sqlCls + "_id"] = model.getObjectId(mintCls, id)
- sql = model.generateSQLInsert(sqlStatsCls, attrs)
+ cursor = conn.cursor()
+ subSql = model.generateSQLSelect(sqlCls, id)
+ sql = model.generateSQLInsert(sqlStatsCls, attrs, {sqlCls + "_id":
subSql})
+ #log.debug("SQL: %s", sql)
+ cursor.execute(sql)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
+ sql = "select currval('%s_id_seq')" % (sqlStatsCls)
+ #log.debug("SQL: %s", sql)
+ cursor.execute(sql)
+ rec = cursor.fetchone()
+ dbStatsId = rec[0]
+ log.debug("%s(%s) created", statsCls, id)
- sql = "select currval('%s_id_seq')" % (sqlStatsCls)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- rec = cursor.fetchone()
- dbStatsId = rec[0]
- log.debug("%s(%s) created", statsCls, id)
+ sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id,
updateStats=True)
+ #log.debug("SQL: %s", sql)
+ cursor.execute(sql)
- sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId},
id, updateStats=True)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- conn.commit()
- except Exception, e:
- conn.rollback()
- print e
- print_exc()
- finally:
- conn.close()
- except:
- print_exc()
-
class MethodUpdate(ModelUpdate):
def __init__(self, broker, seq, response):
ModelUpdate.__init__(self, broker, response)