[rhmessaging-commits] rhmessaging commits: r2827 - mgmt/trunk/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Nov 17 15:41:29 EST 2008


Author: nunofsantos
Date: 2008-11-17 15:41:29 -0500 (Mon, 17 Nov 2008)
New Revision: 2827

Added:
   mgmt/trunk/mint/python/mint/priqueue.py
Modified:
   mgmt/trunk/mint/python/mint/__init__.py
   mgmt/trunk/mint/python/mint/update.py
Log:
use raw sql for insertion and updating of qmf updates to improve database performance; also includes Ted's priority queue code

Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py	2008-11-17 18:51:55 UTC (rev 2826)
+++ mgmt/trunk/mint/python/mint/__init__.py	2008-11-17 20:41:29 UTC (rev 2827)
@@ -1,6 +1,8 @@
 import logging
 import qpid.qmfconsole
+import pickle
 import struct
+import types
 from threading import Lock, RLock
 from sqlobject import *
 from traceback import print_exc
@@ -224,6 +226,15 @@
       log.info("Connection failed: %s ", e.message)
       print_exc()
 
+  def disconnect(self, model):
+    log.info("Disconnecting from broker '%s' at %s" % (self.name, self.url))
+    try:
+      model.getSession().delBroker(self.qmfBroker)
+      log.info("Disconnection succeeded")
+    except Exception, e:
+      log.info("Disconnection failed: %s ", e.message)
+      print_exc()
+
   def getBrokerId(self):
     if self.qmfBroker is not None:
       return str(self.qmfBroker.getBrokerId())
@@ -237,6 +248,10 @@
       except IndexError:
         return None
 
+  def destroySelf(self):
+    self.disconnect(MintModel.staticInstance)
+    super(BrokerRegistration, self).destroySelf()
+
 class BrokerGroup(SQLObject):
   class sqlmeta:
     lazyUpdate = True
@@ -283,6 +298,8 @@
 
     self.connCloseListener = None
     self.__lock = RLock()
+    self.dbStyle = MixedCaseUnderscoreStyle()
+    self.dbConn = None
 
     # map containing updateObjects that have a missing parent dependency, for deferred insertion
     # (missing_class, missing_id.first, missing_id.second) -> [updateObject, ..., updateObject]
@@ -311,7 +328,7 @@
       raise e
 
   def init(self):
-    sqlhub.processConnection = connectionForURI(self.dataUri)
+    sqlhub.processConnection = self.dbConn = connectionForURI(self.dataUri)
 
   def start(self):
     self.updateThread.start()
@@ -322,29 +339,90 @@
   def setCloseListener(self, connCloseListener):
     self.connCloseListener = connCloseListener
 
-  def getObject(self, cls, id, broker):
-    obj = None
+  def getObjectId(self, cls, id):
     if isinstance(id, qpid.qmfconsole.ObjectId):
-      compositeId = "%s:%s" % (id.first, id.second)
       try:
-        obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second)[0]
-      except IndexError:
+        conn = self.dbConn.getConnection()
+        cursor = conn.cursor()
+        cursor.execute("select id from %s where source_scope_id = %s and source_object_id = %s" \
+                         % (self.dbStyle.pythonClassToDBTable(cls.__name__), id.first, id.second));
+        for rec in cursor:
+          return rec[0]
+      except Exception:
         raise ObjectNotFound()
-    elif cls.__name__.endswith("Pool"):
+      finally:
+        conn.close()
+    else:
+      raise ObjectNotFound()
+
+  def __pythonValueToDB(self, key, value):
+    if key == "qmfClassKey":
+      value = u"'%s'" % unicode(value, "raw_unicode_escape").replace("'", "''")
+    elif type(value) == types.DictType:
+      value = u"'%s'" % pickle.dumps(value).replace("'", "''")
+    elif value is None:
+      value = "NULL"
+    else:
       try:
-        obj = cls.selectBy(sourceId=id)[0]
-      except IndexError:
-        raise ObjectNotFound()
-      
-    return obj
+        x = int(value)
+      except Exception:
+        value = u"'%s'" % str(value).replace("'", "''")
+    return value    
 
+  def __generateSQLWhereClause(self, colValMap):
+    whereClause = ""
+    for (key, value) in colValMap.iteritems():
+      value = self.__pythonValueToDB(key, value)
+      whereClause += u"%s = %s and " % (self.dbStyle.pythonAttrToDBColumn(key), value)
+    whereClause = whereClause[:-5]
+    return whereClause
+
+  def generateSQLSelect(self, table, id, resultCols="id"):
+    return self.generateSQLSelectWhere(table, {"source_scope_id": id.first, "source_object_id": id.second}, resultCols)
+
+  def generateSQLSelectWhere(self, table, where, resultCols="id"):
+    whereClause = self.__generateSQLWhereClause(where)
+    sql = "select %s from %s where %s" % (resultCols, table, whereClause)  
+    return sql
+
+  def generateSQLInsert(self, table, colValMap, subQuery={}):
+    columns = u""
+    values = u""
+    for (key, value) in colValMap.iteritems():
+      value = self.__pythonValueToDB(key, value)
+      columns += u"%s , " % (self.dbStyle.pythonAttrToDBColumn(key))
+      values += u"%s , " % (value)
+    for subCol, subVal in subQuery.iteritems():
+      columns += u"%s , " % (subCol)
+      values += u"(%s) , " % (subVal)
+    columns = columns[:-3]
+    values = values[:-3]
+    sql = u"insert into %s (%s) values (%s)" % (table, columns, values)
+    return sql
+
+  def generateSQLUpdate(self, table, colValMap, id, updateStats=False):
+    return self.generateSQLUpdateWhere(table, colValMap, \
+                                       {"source_scope_id": id.first, "source_object_id": id.second}, updateStats)
+
+  def generateSQLUpdateWhere(self, table, colValMap, where, updateStats=False):
+    updateValues = u""
+    for (key, value) in colValMap.iteritems():
+      value = self.__pythonValueToDB(key, value)
+      updateValues += u"%s = %s , " % (self.dbStyle.pythonAttrToDBColumn(key), value)
+    if updateStats:
+      updateValues += u"stats_prev_id = stats_curr_id , "
+    updateValues = updateValues[:-3]
+    whereClause = self.__generateSQLWhereClause(where)
+    sql = u"update %s set %s where %s" % (table, updateValues, whereClause)
+    return sql
+
   def getSession(self):
     return self.mgmtSession
 
   def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
     self.lock()
     try:
-      broker = self.managedBrokers[managedBroker]
+      broker, dbObjId = self.managedBrokers[managedBroker]
     finally:
       self.unlock()
 
@@ -363,7 +441,7 @@
     """ Invoked when a connection is established to a broker """
     self.lock()
     try:
-      self.managedBrokers[str(broker.getBrokerId())] = broker
+      self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
     finally:
       self.unlock()
 
@@ -396,11 +474,19 @@
 
   def objectProps(self, broker, record):
     """ Invoked when an object is updated. """
-    self.updateThread.enqueue(update.PropertyUpdate(broker, record))
+    if record.getClassKey()[1] == "job":
+      priority = 1
+    else:
+      priority = 0
+    self.updateThread.enqueue(update.PropertyUpdate(broker, record), priority)
 
   def objectStats(self, broker, record):
     """ Invoked when an object is updated. """
-    self.updateThread.enqueue(update.StatisticUpdate(broker, record))
+    if record.getClassKey()[1] == "job":
+      priority = 1
+    else:
+      priority = 0
+    self.updateThread.enqueue(update.StatisticUpdate(broker, record), priority)
 
   def event(self, broker, event):
     """ Invoked when an event is raised. """
@@ -412,7 +498,7 @@
   def brokerInfo(self, broker):
     self.lock()
     try:
-      self.managedBrokers[str(broker.getBrokerId())] = broker
+      self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
     finally:
       self.unlock()
 

Added: mgmt/trunk/mint/python/mint/priqueue.py
===================================================================
--- mgmt/trunk/mint/python/mint/priqueue.py	                        (rev 0)
+++ mgmt/trunk/mint/python/mint/priqueue.py	2008-11-17 20:41:29 UTC (rev 2827)
@@ -0,0 +1,39 @@
+from Queue import Queue
+from collections import deque
+
+class PriQueue(Queue):
+  """ """
+  def __init__(self, maxsize=0, slotCount=1):
+    self.slotCount=slotCount
+    Queue.__init__(self, maxsize)
+
+  def _init(self, maxsize):
+    self.maxsize = maxsize
+    self.slots = []
+    for i in range(self.slotCount):
+      self.slots.append(deque())
+
+  def _qsize(self):
+    size = 0
+    for i in range(self.slotCount):
+      size += len(self.slots[i])
+    return size
+
+  def _empty(self):
+    return self._qsize() == 0
+
+  def _full(self):
+    return self.maxsize > 0 and self._qsize() == self.maxsize
+
+  def _put(self, item):
+    slot = item[0]
+    if slot in range(self.slotCount):
+      self.slots[slot].append(item)
+    else:
+      raise ValueError("Invalid priority slot")
+
+  def _get(self):
+    for slot in range(self.slotCount):
+      if len(self.slots[slot]) > 0:
+        return self.slots[slot].popleft()
+    return None


Property changes on: mgmt/trunk/mint/python/mint/priqueue.py
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2008-11-17 18:51:55 UTC (rev 2826)
+++ mgmt/trunk/mint/python/mint/update.py	2008-11-17 20:41:29 UTC (rev 2827)
@@ -1,12 +1,12 @@
 import logging
 import datetime
 import types
-import struct
-from Queue import Queue as ConcurrentQueue, Full, Empty
+from Queue import Full, Empty
 from threading import Thread
 from traceback import print_exc
 from qpid.datatypes import UUID
 from mint.schema import *
+from mint.priqueue import PriQueue as ConcurrentQueue
 
 log = logging.getLogger("mint.update")
 
@@ -21,13 +21,13 @@
     super(ModelUpdateThread, self).__init__()
 
     self.model = model
-    self.updates = ConcurrentQueue()
+    self.updates = ConcurrentQueue(slotCount=2)
     self.stopRequested = False
     self.setDaemon(False)
 
-  def enqueue(self, update):
+  def enqueue(self, update, priority=0):
     try:
-      self.updates.put(update)
+      self.updates.put((priority, update))
     except Full:
       log.exception("Queue is full")
       pass
@@ -35,7 +35,9 @@
   def run(self):
     while True:
       try:
-        update = self.updates.get(True, 1)
+        priority, update = self.updates.get(True, 1)
+        if self.stopRequested:
+          break
       except Empty:
         if self.stopRequested:
           break
@@ -58,8 +60,8 @@
     self.qmfObj = obj
 
   def getStatsClass(self, cls):
-    return getattr(mint, cls.__name__ + "Stats")
-
+    return cls + "Stats"
+  
   def process(self):
     pass
 
@@ -87,8 +89,9 @@
         if othercls:
           attrname = name[0:-3]
 
+          attrname += "_id"
           try:
-            attrs[attrname] = model.getObject(othercls, id, self.broker)
+            attrs[attrname] = model.getObjectId(othercls, id)
           except KeyError:
             log.info("Referenced object %s '%s' not found by key '%s'" % (clsname, id, attrname))
           except mint.ObjectNotFound:
@@ -101,18 +104,18 @@
               orphan = True
         else:
           log.error("Class '%s' not found" % clsname)
-      elif not hasattr(cls, orig_name):
+      elif not hasattr(eval("mint.schema."+cls), name):
         # Remove attrs that we don't have in our schema
-        log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
+        log.debug("Class '%s' has no field '%s'" % ("mint.schema."+cls, name))
         del attrs[name]
       #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
       elif name in ("DaemonStartTime", "EnteredCurrentActivity", "EnteredCurrentState", "JobStart", 
                     "LastBenchmark", "LastFetchWorkCompleted", "LastFetchWorkSpawned", "LastPeriodicCheckpoint", 
                     "MyCurrentTime", "QDate", "JobQueueBirthdate", "MonitorSelfTime") \
                     and (type(attrs[name]) is types.LongType or type(attrs[name]) is types.IntType or attrs[name] == 0):
-        attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
+        attrs[name] = time_unwarp(datetime.fromtimestamp(attrs[name]/1000000000))
       elif name.endswith("Time") and type(attrs[name]) is types.IntType and attrs[name] == 0:
-        attrs[name] = datetime.fromtimestamp(attrs[name])
+        attrs[name] = time_unwarp(datetime.fromtimestamp(attrs[name]))
       #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
       elif isinstance(attrs[name], UUID):
         # convert UUIDs into their string representation, to be handled by sqlobject
@@ -129,42 +132,50 @@
 
   def process(self, model):
     try:
-      cls = self.qmfObj.getSchema().getKey()[1]
-      if cls in mint.schema.schemaReservedWordsMap:
-        cls = mint.schema.schemaReservedWordsMap.get(cls)
-      cls = eval(cls[0].upper()+cls[1:])
       attrs = dict(self.qmfObj.getProperties())
       timestamps = self.qmfObj.getTimestamps()
       id = self.qmfObj.getObjectId()
+      pkg, cls, hash = self.qmfObj.getClassKey()
 
+      origCls = cls
+      if cls in mint.schema.schemaReservedWordsMap:
+        cls = mint.schema.schemaReservedWordsMap.get(cls)
+      cls = cls[0].upper()+cls[1:]  
+      sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+
       if self.processAttributes(attrs, cls, model) == None:
         # object is orphan, a parent dependency was not found; 
         # insertion in db is deferred until parent info is received
         return 
 
-      obj = None
+      attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
+      attrs["creationTime"] = time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
+      if timestamps[2] != 0:
+        attrs["deletionTime"] = time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
+        log.debug("%s(%s) marked deleted", cls, id)
+
+      attrs["sourceScopeId"] = id.first
+      attrs["sourceObjectId"] = id.second
+      attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, origCls, hash)
+      attrs["managedBroker"] = str(self.broker.getBrokerId())
+
+      conn = model.dbConn.getConnection()
       try:
-        obj = model.getObject(cls, id, self.broker)
-      except mint.ObjectNotFound:
-        obj = cls()
-        log.debug("%s(%i) created", cls.__name__, obj.id)
-        attrs["sourceScopeId"] = id.first
-        attrs["sourceObjectId"] = id.second
-        pkg, cls, hash = self.qmfObj.getClassKey()
-        attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, cls, hash)
-        attrs["managedBroker"] = str(self.broker.getBrokerId())
+        cursor = conn.cursor()
+        sql = model.generateSQLUpdate(sqlCls, attrs, id)
+        cursor.execute(sql)
+        if cursor.rowcount == 0:
+          sql = model.generateSQLInsert(sqlCls, attrs)
+          cursor.execute(sql)
+          log.debug("%s(%s) created", cls, id)
+        conn.commit()
       except Exception, e:
+        conn.rollback()
         print e
+        print_exc()
+      finally:
+        conn.close()
 
-      attrs["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
-      attrs["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
-      if timestamps[2] != 0:
-        attrs["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
-        log.debug("%s(%i) marked deleted", cls, obj.id)
-
-      obj.set(**attrs)
-      obj.syncUpdate()
-
       if (cls, id.first, id.second) in model.orphanObjectMap:
         # this object is the parent of orphan objects in the map, re-enqueue for insertion
         orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
@@ -172,64 +183,83 @@
           model.updateThread.enqueue(orphanObj)
         log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
 
-      if cls == "broker":
-       if str(self.broker.getBrokerId()) in model.managedBrokers:
-        model.managedBrokers[str(self.broker.getBrokerId())].broker = self.broker
-        reg = mint.BrokerRegistration.selectBy(url=self.broker.getFullUrl())[0]
-        reg.broker = obj
-        reg.syncUpdate()
+      if cls == "Broker":
+        if str(self.broker.getBrokerId()) in model.managedBrokers:
+          broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
+          if dbObjId == 0: 
+            try:
+              conn = model.dbConn.getConnection()
+              cursor = conn.cursor()
+              sql = model.generateSQLSelectWhere(sqlCls, {"managed_broker": str(self.broker.getBrokerId())})
+              cursor.execute(sql)
+              rec = cursor.fetchone()
+              dbObjId = rec[0]  
 
+              cursor = conn.cursor()
+              sql = model.generateSQLUpdateWhere("broker_registration", {"broker_id": dbObjId}, \
+                                                     {"url": self.broker.getFullUrl()})
+              cursor.execute(sql)
+              conn.commit()
+              model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbObjId)
+            except Exception, e:
+              conn.rollback()
+              print e
+              print_exc()
+            finally:
+              conn.close()
     except:
       print_exc()
 
-    attrs["managedBroker"] = str(self.broker.getBrokerId())
-    attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
-        (timestamps[0]/1000000000))
-    attrs["creationTime"] = datetime.fromtimestamp \
-        (timestamps[1]/1000000000)
-
-
 class StatisticUpdate(ModelUpdate):
   def __init__(self, broker, obj):
     ModelUpdate.__init__(self, broker, obj)
 
   def process(self, model):
     try:
-      cls = self.qmfObj.getSchema().getKey()[1]
-      if cls in mint.schema.schemaReservedWordsMap:
-        cls = mint.schema.schemaReservedWordsMap.get(cls)
-      cls = eval(cls[0].upper()+cls[1:])
       attrs = dict(self.qmfObj.getStatistics())
       timestamps = self.qmfObj.getTimestamps()
       id = self.qmfObj.getObjectId()
+      pkg, cls, hash = self.qmfObj.getClassKey()
 
-      obj = None
-      try:
-        obj = model.getObject(cls, id, self.broker)
-      except mint.ObjectNotFound:
-        # handle this
-        raise mint.ObjectNotFound
+      origCls = cls
+      if cls in mint.schema.schemaReservedWordsMap:
+        cls = mint.schema.schemaReservedWordsMap.get(cls)
+      cls = cls[0].upper()+cls[1:]  
+      sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+ 
+      statsCls = self.getStatsClass(cls)
+      sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
 
-
-      statscls = self.getStatsClass(cls)
-      if self.processAttributes(attrs, statscls, model) == None:
+      if self.processAttributes(attrs, statsCls, model) == None:
         # object is orphan, a parent dependency was not found; 
         # insertion in db is deferred until parent info is received
         return 
 
-      attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
-                                   (timestamps[0]/1000000000))
+      attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
 
-      # Set the stat->obj reference
-      attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
-      statsobj = statscls()
-      statsobj.set(**attrs)
-      statsobj.syncUpdate()
+      conn = model.dbConn.getConnection()
+      try:
+        cursor = conn.cursor()
+        subSql = model.generateSQLSelect(sqlCls, id)
+        sql = model.generateSQLInsert(sqlStatsCls, attrs, {sqlCls + "_id": subSql})
+        cursor.execute(sql)
 
-      obj.statsPrev = obj.statsCurr
-      obj.statsCurr = statsobj
-      obj.syncUpdate()
+        sql = "select currval('%s_id_seq')" % (sqlStatsCls)
+        cursor.execute(sql)
+        rec = cursor.fetchone()
+        dbStatsId = rec[0]  
+        log.debug("%s(%s) created", statsCls, id)
 
+        sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id, updateStats=True)
+        cursor.execute(sql)
+        conn.commit()
+      except Exception, e:
+        conn.rollback()
+        print e
+        print_exc()
+      finally:
+        conn.close()
+
     except:
       print_exc()
 




More information about the rhmessaging-commits mailing list