Author: nunofsantos
Date: 2008-11-17 15:41:29 -0500 (Mon, 17 Nov 2008)
New Revision: 2827
Added:
mgmt/trunk/mint/python/mint/priqueue.py
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/update.py
Log:
use raw sql for insertion and updating of qmf updates to improve database performance;
also includes Ted's priority queue code
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-11-17 18:51:55 UTC (rev 2826)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-11-17 20:41:29 UTC (rev 2827)
@@ -1,6 +1,8 @@
import logging
import qpid.qmfconsole
+import pickle
import struct
+import types
from threading import Lock, RLock
from sqlobject import *
from traceback import print_exc
@@ -224,6 +226,15 @@
log.info("Connection failed: %s ", e.message)
print_exc()
+ def disconnect(self, model):
+ log.info("Disconnecting from broker '%s' at %s" % (self.name,
self.url))
+ try:
+ model.getSession().delBroker(self.qmfBroker)
+ log.info("Disconnection succeeded")
+ except Exception, e:
+ log.info("Disconnection failed: %s ", e.message)
+ print_exc()
+
def getBrokerId(self):
if self.qmfBroker is not None:
return str(self.qmfBroker.getBrokerId())
@@ -237,6 +248,10 @@
except IndexError:
return None
+ def destroySelf(self):
+ self.disconnect(MintModel.staticInstance)
+ super(BrokerRegistration, self).destroySelf()
+
class BrokerGroup(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -283,6 +298,8 @@
self.connCloseListener = None
self.__lock = RLock()
+ self.dbStyle = MixedCaseUnderscoreStyle()
+ self.dbConn = None
# map containing updateObjects that have a missing parent dependency, for deferred
insertion
# (missing_class, missing_id.first, missing_id.second) -> [updateObject, ...,
updateObject]
@@ -311,7 +328,7 @@
raise e
def init(self):
- sqlhub.processConnection = connectionForURI(self.dataUri)
+ sqlhub.processConnection = self.dbConn = connectionForURI(self.dataUri)
def start(self):
self.updateThread.start()
@@ -322,29 +339,90 @@
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def getObject(self, cls, id, broker):
- obj = None
+ def getObjectId(self, cls, id):
if isinstance(id, qpid.qmfconsole.ObjectId):
- compositeId = "%s:%s" % (id.first, id.second)
try:
- obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second)[0]
- except IndexError:
+ conn = self.dbConn.getConnection()
+ cursor = conn.cursor()
+ cursor.execute("select id from %s where source_scope_id = %s and
source_object_id = %s" \
+ % (self.dbStyle.pythonClassToDBTable(cls.__name__), id.first,
id.second));
+ for rec in cursor:
+ return rec[0]
+ except Exception:
raise ObjectNotFound()
- elif cls.__name__.endswith("Pool"):
+ finally:
+ conn.close()
+ else:
+ raise ObjectNotFound()
+
+ def __pythonValueToDB(self, key, value):
+ if key == "qmfClassKey":
+ value = u"'%s'" % unicode(value,
"raw_unicode_escape").replace("'", "''")
+ elif type(value) == types.DictType:
+ value = u"'%s'" % pickle.dumps(value).replace("'",
"''")
+ elif value is None:
+ value = "NULL"
+ else:
try:
- obj = cls.selectBy(sourceId=id)[0]
- except IndexError:
- raise ObjectNotFound()
-
- return obj
+ x = int(value)
+ except Exception:
+ value = u"'%s'" % str(value).replace("'",
"''")
+ return value
+ def __generateSQLWhereClause(self, colValMap):
+ whereClause = ""
+ for (key, value) in colValMap.iteritems():
+ value = self.__pythonValueToDB(key, value)
+ whereClause += u"%s = %s and " % (self.dbStyle.pythonAttrToDBColumn(key),
value)
+ whereClause = whereClause[:-5]
+ return whereClause
+
+ def generateSQLSelect(self, table, id, resultCols="id"):
+ return self.generateSQLSelectWhere(table, {"source_scope_id": id.first,
"source_object_id": id.second}, resultCols)
+
+ def generateSQLSelectWhere(self, table, where, resultCols="id"):
+ whereClause = self.__generateSQLWhereClause(where)
+ sql = "select %s from %s where %s" % (resultCols, table, whereClause)
+ return sql
+
+ def generateSQLInsert(self, table, colValMap, subQuery={}):
+ columns = u""
+ values = u""
+ for (key, value) in colValMap.iteritems():
+ value = self.__pythonValueToDB(key, value)
+ columns += u"%s , " % (self.dbStyle.pythonAttrToDBColumn(key))
+ values += u"%s , " % (value)
+ for subCol, subVal in subQuery.iteritems():
+ columns += u"%s , " % (subCol)
+ values += u"(%s) , " % (subVal)
+ columns = columns[:-3]
+ values = values[:-3]
+ sql = u"insert into %s (%s) values (%s)" % (table, columns, values)
+ return sql
+
+ def generateSQLUpdate(self, table, colValMap, id, updateStats=False):
+ return self.generateSQLUpdateWhere(table, colValMap, \
+ {"source_scope_id": id.first,
"source_object_id": id.second}, updateStats)
+
+ def generateSQLUpdateWhere(self, table, colValMap, where, updateStats=False):
+ updateValues = u""
+ for (key, value) in colValMap.iteritems():
+ value = self.__pythonValueToDB(key, value)
+ updateValues += u"%s = %s , " % (self.dbStyle.pythonAttrToDBColumn(key),
value)
+ if updateStats:
+ updateValues += u"stats_prev_id = stats_curr_id , "
+ updateValues = updateValues[:-3]
+ whereClause = self.__generateSQLWhereClause(where)
+ sql = u"update %s set %s where %s" % (table, updateValues, whereClause)
+ return sql
+
def getSession(self):
return self.mgmtSession
def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
self.lock()
try:
- broker = self.managedBrokers[managedBroker]
+ broker, dbObjId = self.managedBrokers[managedBroker]
finally:
self.unlock()
@@ -363,7 +441,7 @@
""" Invoked when a connection is established to a broker
"""
self.lock()
try:
- self.managedBrokers[str(broker.getBrokerId())] = broker
+ self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
finally:
self.unlock()
@@ -396,11 +474,19 @@
def objectProps(self, broker, record):
""" Invoked when an object is updated. """
- self.updateThread.enqueue(update.PropertyUpdate(broker, record))
+ if record.getClassKey()[1] == "job":
+ priority = 1
+ else:
+ priority = 0
+ self.updateThread.enqueue(update.PropertyUpdate(broker, record), priority)
def objectStats(self, broker, record):
""" Invoked when an object is updated. """
- self.updateThread.enqueue(update.StatisticUpdate(broker, record))
+ if record.getClassKey()[1] == "job":
+ priority = 1
+ else:
+ priority = 0
+ self.updateThread.enqueue(update.StatisticUpdate(broker, record), priority)
def event(self, broker, event):
""" Invoked when an event is raised. """
@@ -412,7 +498,7 @@
def brokerInfo(self, broker):
self.lock()
try:
- self.managedBrokers[str(broker.getBrokerId())] = broker
+ self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
finally:
self.unlock()
Added: mgmt/trunk/mint/python/mint/priqueue.py
===================================================================
--- mgmt/trunk/mint/python/mint/priqueue.py (rev 0)
+++ mgmt/trunk/mint/python/mint/priqueue.py 2008-11-17 20:41:29 UTC (rev 2827)
@@ -0,0 +1,39 @@
+from Queue import Queue
+from collections import deque
+
+class PriQueue(Queue):
+ """ """
+ def __init__(self, maxsize=0, slotCount=1):
+ self.slotCount=slotCount
+ Queue.__init__(self, maxsize)
+
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.slots = []
+ for i in range(self.slotCount):
+ self.slots.append(deque())
+
+ def _qsize(self):
+ size = 0
+ for i in range(self.slotCount):
+ size += len(self.slots[i])
+ return size
+
+ def _empty(self):
+ return self._qsize() == 0
+
+ def _full(self):
+ return self.maxsize > 0 and self._qsize() == self.maxsize
+
+ def _put(self, item):
+ slot = item[0]
+ if slot in range(self.slotCount):
+ self.slots[slot].append(item)
+ else:
+ raise ValueError("Invalid priority slot")
+
+ def _get(self):
+ for slot in range(self.slotCount):
+ if len(self.slots[slot]) > 0:
+ return self.slots[slot].popleft()
+ return None
Property changes on: mgmt/trunk/mint/python/mint/priqueue.py
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-11-17 18:51:55 UTC (rev 2826)
+++ mgmt/trunk/mint/python/mint/update.py 2008-11-17 20:41:29 UTC (rev 2827)
@@ -1,12 +1,12 @@
import logging
import datetime
import types
-import struct
-from Queue import Queue as ConcurrentQueue, Full, Empty
+from Queue import Full, Empty
from threading import Thread
from traceback import print_exc
from qpid.datatypes import UUID
from mint.schema import *
+from mint.priqueue import PriQueue as ConcurrentQueue
log = logging.getLogger("mint.update")
@@ -21,13 +21,13 @@
super(ModelUpdateThread, self).__init__()
self.model = model
- self.updates = ConcurrentQueue()
+ self.updates = ConcurrentQueue(slotCount=2)
self.stopRequested = False
self.setDaemon(False)
- def enqueue(self, update):
+ def enqueue(self, update, priority=0):
try:
- self.updates.put(update)
+ self.updates.put((priority, update))
except Full:
log.exception("Queue is full")
pass
@@ -35,7 +35,9 @@
def run(self):
while True:
try:
- update = self.updates.get(True, 1)
+ priority, update = self.updates.get(True, 1)
+ if self.stopRequested:
+ break
except Empty:
if self.stopRequested:
break
@@ -58,8 +60,8 @@
self.qmfObj = obj
def getStatsClass(self, cls):
- return getattr(mint, cls.__name__ + "Stats")
-
+ return cls + "Stats"
+
def process(self):
pass
@@ -87,8 +89,9 @@
if othercls:
attrname = name[0:-3]
+ attrname += "_id"
try:
- attrs[attrname] = model.getObject(othercls, id, self.broker)
+ attrs[attrname] = model.getObjectId(othercls, id)
except KeyError:
log.info("Referenced object %s '%s' not found by key
'%s'" % (clsname, id, attrname))
except mint.ObjectNotFound:
@@ -101,18 +104,18 @@
orphan = True
else:
log.error("Class '%s' not found" % clsname)
- elif not hasattr(cls, orig_name):
+ elif not hasattr(eval("mint.schema."+cls), name):
# Remove attrs that we don't have in our schema
- log.debug("Class '%s' has no field '%s'" %
(cls.__name__, name))
+ log.debug("Class '%s' has no field '%s'" %
("mint.schema."+cls, name))
del attrs[name]
#XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
elif name in ("DaemonStartTime", "EnteredCurrentActivity",
"EnteredCurrentState", "JobStart",
"LastBenchmark", "LastFetchWorkCompleted",
"LastFetchWorkSpawned", "LastPeriodicCheckpoint",
"MyCurrentTime", "QDate",
"JobQueueBirthdate", "MonitorSelfTime") \
and (type(attrs[name]) is types.LongType or type(attrs[name]) is
types.IntType or attrs[name] == 0):
- attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
+ attrs[name] = time_unwarp(datetime.fromtimestamp(attrs[name]/1000000000))
elif name.endswith("Time") and type(attrs[name]) is types.IntType and
attrs[name] == 0:
- attrs[name] = datetime.fromtimestamp(attrs[name])
+ attrs[name] = time_unwarp(datetime.fromtimestamp(attrs[name]))
#XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
elif isinstance(attrs[name], UUID):
# convert UUIDs into their string representation, to be handled by sqlobject
@@ -129,42 +132,50 @@
def process(self, model):
try:
- cls = self.qmfObj.getSchema().getKey()[1]
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = eval(cls[0].upper()+cls[1:])
attrs = dict(self.qmfObj.getProperties())
timestamps = self.qmfObj.getTimestamps()
id = self.qmfObj.getObjectId()
+ pkg, cls, hash = self.qmfObj.getClassKey()
+ origCls = cls
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = cls[0].upper()+cls[1:]
+ sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+
if self.processAttributes(attrs, cls, model) == None:
# object is orphan, a parent dependency was not found;
# insertion in db is deferred until parent info is received
return
- obj = None
+ attrs["recTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
+ attrs["creationTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
+ if timestamps[2] != 0:
+ attrs["deletionTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
+ log.debug("%s(%s) marked deleted", cls, id)
+
+ attrs["sourceScopeId"] = id.first
+ attrs["sourceObjectId"] = id.second
+ attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, origCls, hash)
+ attrs["managedBroker"] = str(self.broker.getBrokerId())
+
+ conn = model.dbConn.getConnection()
try:
- obj = model.getObject(cls, id, self.broker)
- except mint.ObjectNotFound:
- obj = cls()
- log.debug("%s(%i) created", cls.__name__, obj.id)
- attrs["sourceScopeId"] = id.first
- attrs["sourceObjectId"] = id.second
- pkg, cls, hash = self.qmfObj.getClassKey()
- attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, cls, hash)
- attrs["managedBroker"] = str(self.broker.getBrokerId())
+ cursor = conn.cursor()
+ sql = model.generateSQLUpdate(sqlCls, attrs, id)
+ cursor.execute(sql)
+ if cursor.rowcount == 0:
+ sql = model.generateSQLInsert(sqlCls, attrs)
+ cursor.execute(sql)
+ log.debug("%s(%s) created", cls, id)
+ conn.commit()
except Exception, e:
+ conn.rollback()
print e
+ print_exc()
+ finally:
+ conn.close()
- attrs["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
- attrs["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
- if timestamps[2] != 0:
- attrs["deletionTime"] =
datetime.fromtimestamp(timestamps[2]/1000000000)
- log.debug("%s(%i) marked deleted", cls, obj.id)
-
- obj.set(**attrs)
- obj.syncUpdate()
-
if (cls, id.first, id.second) in model.orphanObjectMap:
# this object is the parent of orphan objects in the map, re-enqueue for
insertion
orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
@@ -172,64 +183,83 @@
model.updateThread.enqueue(orphanObj)
log.info("Inserted %d orphan objects whose creation had been deferred"
% (len(orphanObjects)))
- if cls == "broker":
- if str(self.broker.getBrokerId()) in model.managedBrokers:
- model.managedBrokers[str(self.broker.getBrokerId())].broker = self.broker
- reg = mint.BrokerRegistration.selectBy(url=self.broker.getFullUrl())[0]
- reg.broker = obj
- reg.syncUpdate()
+ if cls == "Broker":
+ if str(self.broker.getBrokerId()) in model.managedBrokers:
+ broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
+ if dbObjId == 0:
+ try:
+ conn = model.dbConn.getConnection()
+ cursor = conn.cursor()
+ sql = model.generateSQLSelectWhere(sqlCls, {"managed_broker":
str(self.broker.getBrokerId())})
+ cursor.execute(sql)
+ rec = cursor.fetchone()
+ dbObjId = rec[0]
+ cursor = conn.cursor()
+ sql = model.generateSQLUpdateWhere("broker_registration",
{"broker_id": dbObjId}, \
+ {"url":
self.broker.getFullUrl()})
+ cursor.execute(sql)
+ conn.commit()
+ model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbObjId)
+ except Exception, e:
+ conn.rollback()
+ print e
+ print_exc()
+ finally:
+ conn.close()
except:
print_exc()
- attrs["managedBroker"] = str(self.broker.getBrokerId())
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (timestamps[0]/1000000000))
- attrs["creationTime"] = datetime.fromtimestamp \
- (timestamps[1]/1000000000)
-
-
class StatisticUpdate(ModelUpdate):
def __init__(self, broker, obj):
ModelUpdate.__init__(self, broker, obj)
def process(self, model):
try:
- cls = self.qmfObj.getSchema().getKey()[1]
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = eval(cls[0].upper()+cls[1:])
attrs = dict(self.qmfObj.getStatistics())
timestamps = self.qmfObj.getTimestamps()
id = self.qmfObj.getObjectId()
+ pkg, cls, hash = self.qmfObj.getClassKey()
- obj = None
- try:
- obj = model.getObject(cls, id, self.broker)
- except mint.ObjectNotFound:
- # handle this
- raise mint.ObjectNotFound
+ origCls = cls
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = cls[0].upper()+cls[1:]
+ sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+
+ statsCls = self.getStatsClass(cls)
+ sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
-
- statscls = self.getStatsClass(cls)
- if self.processAttributes(attrs, statscls, model) == None:
+ if self.processAttributes(attrs, statsCls, model) == None:
# object is orphan, a parent dependency was not found;
# insertion in db is deferred until parent info is received
return
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (timestamps[0]/1000000000))
+ attrs["recTime"] =
time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
- # Set the stat->obj reference
- attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
- statsobj = statscls()
- statsobj.set(**attrs)
- statsobj.syncUpdate()
+ conn = model.dbConn.getConnection()
+ try:
+ cursor = conn.cursor()
+ subSql = model.generateSQLSelect(sqlCls, id)
+ sql = model.generateSQLInsert(sqlStatsCls, attrs, {sqlCls + "_id":
subSql})
+ cursor.execute(sql)
- obj.statsPrev = obj.statsCurr
- obj.statsCurr = statsobj
- obj.syncUpdate()
+ sql = "select currval('%s_id_seq')" % (sqlStatsCls)
+ cursor.execute(sql)
+ rec = cursor.fetchone()
+ dbStatsId = rec[0]
+ log.debug("%s(%s) created", statsCls, id)
+ sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id,
updateStats=True)
+ cursor.execute(sql)
+ conn.commit()
+ except Exception, e:
+ conn.rollback()
+ print e
+ print_exc()
+ finally:
+ conn.close()
+
except:
print_exc()