[rhmessaging-commits] rhmessaging commits: r2876 - in mgmt/trunk: mint/bin and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Nov 24 17:44:41 EST 2008


Author: justi9
Date: 2008-11-24 17:44:41 -0500 (Mon, 24 Nov 2008)
New Revision: 2876

Removed:
   mgmt/trunk/mint/python/mint/priqueue.py
Modified:
   mgmt/trunk/cumin/python/cumin/__init__.py
   mgmt/trunk/cumin/python/cumin/model.py
   mgmt/trunk/mint/bin/mint-bench
   mgmt/trunk/mint/python/mint/Makefile
   mgmt/trunk/mint/python/mint/__init__.py
   mgmt/trunk/mint/python/mint/cache.py
   mgmt/trunk/mint/python/mint/schema.py
   mgmt/trunk/mint/python/mint/schemaparser.py
   mgmt/trunk/mint/python/mint/update.py
   mgmt/trunk/mint/sql/schema.sql
Log:
Refactor of data layer for consolidated sql operations and reporting.

Also, some schema updates to handle large unsigned values and qmf
class keys as strings.

There is still an unsolved connection issue with this commit.  When a
broker is added, you must restart the console for it to appear.



Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/cumin/python/cumin/__init__.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -38,13 +38,7 @@
         self.add_resource_dir(os.path.join(self.home, "resources"))
 
         self.model = CuminModel(self, self.config.data, self.config.spec)
-        self.broker_connect_thread = BrokerConnectThread(self.model)
 
-        def closeListener(*args):
-            self.broker_connect_thread.prompt()
-
-        self.model.data.setCloseListener(closeListener)
-
         self.main_page = MainPage(self, "index.html")
         self.add_page(self.main_page)
         self.set_default_page(self.main_page)
@@ -90,50 +84,13 @@
 
     def start(self):
         self.model.start()
-        self.broker_connect_thread.start()
 
+        for reg in BrokerRegistration.select():
+            reg.connect(self.model.data)
+
     def stop(self):
         self.model.stop()
 
-class BrokerConnectThread(Thread):
-    log = logging.getLogger("cumin.mgmt.conn")
-
-    def __init__(self, model):
-        super(BrokerConnectThread, self).__init__()
-
-        self.model = model
-        self.setDaemon(True)
-
-        self.event = Event()
-
-        self.attempts = dict()
-
-    def prompt(self):
-        self.event.set()
-        self.event.clear()
-
-    def run(self):
-        try:
-            self.do_run()
-        except Exception, e:
-            log.exception(e)
-
-    def do_run(self):
-        while True:
-            for reg in BrokerRegistration.select():
-                if reg.broker is None or reg.getBrokerId() not in self.model.data.managedBrokers:
-                    attempts = self.attempts.get(reg, 0)
-                    attempts += 1
-                    self.attempts[reg] = attempts
-
-                    if attempts < 10:
-                        reg.connect(self.model.data)
-                    elif attempts < 100 and attempts % 10 == 0:
-                        reg.connect(self.model.data)
-                    elif attempts % 100 == 0:
-                        reg.connect(self.model.data)
-            self.event.wait(10)
-
 class CuminServer(WebServer):
     def authorized(self, session):
         auth = False

Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/cumin/python/cumin/model.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -1727,7 +1727,7 @@
             try:
                 object = self.cumin_class.mint_class(**args)
 
-                self.model.app.broker_connect_thread.prompt()
+                object.connect(self.model.data)
 
                 completion("OK")
 

Modified: mgmt/trunk/mint/bin/mint-bench
===================================================================
--- mgmt/trunk/mint/bin/mint-bench	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/bin/mint-bench	2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,6 +1,6 @@
 #!/usr/bin/python
 
-import sys, os, logging
+import sys, os, logging, mint.sql
 
 from mint.tools import MintBenchTool
 
@@ -55,7 +55,9 @@
         do_main()
 
 if __name__ == "__main__":
+    mint.sql.profile = mint.sql.SqlProfile()
+
     try:
         main()
     except KeyboardInterrupt:
-        pass
+        mint.sql.profile.report()

Modified: mgmt/trunk/mint/python/mint/Makefile
===================================================================
--- mgmt/trunk/mint/python/mint/Makefile	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/Makefile	2008-11-24 22:44:41 UTC (rev 2876)
@@ -4,7 +4,7 @@
 
 schema: schema.py
 
-schema.py: ../../xml/*.xml
-	python schemaparser.py schema.py ${dsn} $^
+schema.py: schemaparser.py ../../xml/*.xml
+	python schemaparser.py schema.py ${dsn} ../../xml/*.xml
 
 clean:

Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/__init__.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -256,7 +256,9 @@
         return None
 
   def destroySelf(self):
-    self.disconnect(MintModel.staticInstance)
+    if MintModel.staticInstance:
+      self.disconnect(MintModel.staticInstance)
+
     super(BrokerRegistration, self).destroySelf()
 
 class BrokerGroup(SQLObject):
@@ -292,7 +294,6 @@
   brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
   properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
 
-  
 class MintModel(qmf.console.Console):
   staticInstance = None
 
@@ -308,10 +309,14 @@
     self.dbStyle = MixedCaseUnderscoreStyle()
     self.dbConn = None
 
-    # map containing updateObjects that have a missing parent dependency, for deferred insertion
-    # (missing_class, missing_id.first, missing_id.second) -> [updateObject, ..., updateObject]
+    # map containing updateObjects that have a missing parent
+    # dependency, for deferred insertion (missing_class,
+    # missing_id.first, missing_id.second) -> [updateObject, ...,
+    # updateObject]
     self.orphanObjectMap = dict()
 
+    self.orphans = dict()
+
     self.updateThread = update.ModelUpdateThread(self)
     self.mgmtSession = qmf.console.Session(self)
     self.outstandingMethodCalls = dict()
@@ -350,83 +355,6 @@
   def setCloseListener(self, connCloseListener):
     self.connCloseListener = connCloseListener
 
-  def __pythonValueToDB(self, key, value):
-    if type(value) == types.DictType:
-      value = u"'%s'" % pickle.dumps(value).replace("'", "''")
-    elif value is None:
-      value = "NULL"
-    else:
-      try:
-        x = int(value)
-      except Exception:
-        value = u"'%s'" % str(value).replace("'", "''")
-    return value    
-
-  def __generateSQLWhereClause(self, colValMap):
-    whereClause = ""
-    for (key, value) in colValMap.iteritems():
-      value = self.__pythonValueToDB(key, value)
-      whereClause += u"%s = %s and " % (self.dbStyle.pythonAttrToDBColumn(key), value)
-    whereClause = whereClause[:-5]
-    return whereClause
-
-  def generateSQLSelect(self, table, id, resultCols="id"):
-    return self.generateSQLSelectWhere(table, {"source_scope_id": id.first, "source_object_id": id.second}, resultCols)
-
-  def generateSQLSelectWhere(self, table, where, resultCols="id"):
-    whereClause = self.__generateSQLWhereClause(where)
-    sql = "select %s from %s where %s" % (resultCols, table, whereClause)  
-    return sql
-
-  def generateSQLInsert(self, table, colValMap, subQuery={}):
-    columns = u""
-    values = u""
-    for (key, value) in colValMap.iteritems():
-      value = self.__pythonValueToDB(key, value)
-      columns += u"%s , " % (self.dbStyle.pythonAttrToDBColumn(key))
-      values += u"%s , " % (value)
-    for subCol, subVal in subQuery.iteritems():
-      columns += u"%s , " % (subCol)
-      values += u"(%s) , " % (subVal)
-    columns = columns[:-3]
-    values = values[:-3]
-    sql = u"insert into %s (%s) values (%s)" % (table, columns, values)
-    return sql
-
-  def generateSQLUpdate(self, table, colValMap, id, updateStats=False):
-    return self.generateSQLUpdateWhere(table, colValMap, \
-                                       {"source_scope_id": id.first, "source_object_id": id.second}, updateStats)
-
-  def generateSQLUpdateWhere(self, table, colValMap, where, updateStats=False):
-    updateValues = u""
-    for (key, value) in colValMap.iteritems():
-      value = self.__pythonValueToDB(key, value)
-      updateValues += u"%s = %s , " % (self.dbStyle.pythonAttrToDBColumn(key), value)
-    if updateStats:
-      updateValues += u"stats_prev_id = stats_curr_id , "
-    updateValues = updateValues[:-3]
-    whereClause = self.__generateSQLWhereClause(where)
-    sql = u"update %s set %s where %s" % (table, updateValues, whereClause)
-    return sql
-
-  def addOrphanObject(self, cls, idFirst, idSecond, obj):
-    # store object in orphan map, will be picked up later when parent info is received
-    log.info("Referenced object %s '%d-%d' not found, deferring creation of orphan object" % (cls, idFirst, idSecond))
-    if (cls, idFirst, idSecond) not in self.orphanObjectMap:
-      self.orphanObjectMap[(cls, idFirst, idSecond)] = set()
-    self.orphanObjectMap[(cls, idFirst, idSecond)].add(obj)
-
-  def requeueIfOrphanParent(self, cls, idFirst, idSecond):
-    orphans = 0
-    if (cls, idFirst, idSecond) in self.orphanObjectMap:
-      # this object is the parent of orphan objects in the map, re-enqueue for insertion
-      orphanObjects = self.orphanObjectMap.pop((cls, idFirst, idSecond))
-      for orphanObj in orphanObjects:
-        self.updateThread.enqueue(orphanObj)
-        orphans += 1
-      log.info("Inserted %d orphan objects whose creation had been deferred" % (orphans))
-    return orphans
-    
   def getSession(self):
     return self.mgmtSession
 
@@ -452,6 +380,7 @@
     self.lock()
     try:
       self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
+
     finally:
       self.unlock()
 
@@ -484,20 +413,24 @@
 
   def objectProps(self, broker, record):
     """ Invoked when an object is updated. """
+
+    up = update.PropertyUpdate(self, broker, record)
+
     if record.getClassKey().getClassName() == "job":
-      priority = 1
-    else:
-      priority = 0
-    self.updateThread.enqueue(update.PropertyUpdate(broker, record), priority)
+      up.priority = 1
 
+    self.updateThread.enqueue(up)
+
   def objectStats(self, broker, record):
     """ Invoked when an object is updated. """
+
+    up = update.StatisticUpdate(self, broker, record)
+
     if record.getClassKey().getClassName() == "job":
-      priority = 1
-    else:
-      priority = 0
-    self.updateThread.enqueue(update.StatisticUpdate(broker, record), priority)
+      up.priority = 1
 
+    self.updateThread.enqueue(up)
+
   def event(self, broker, event):
     """ Invoked when an event is raised. """
     pass

Modified: mgmt/trunk/mint/python/mint/cache.py
===================================================================
--- mgmt/trunk/mint/python/mint/cache.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/cache.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,47 +1,37 @@
 from threading import RLock
 
-class MintCache:
+class MintCache(object):
   staticInstance = None
 
   def __init__(self):
     assert MintCache.staticInstance is None
     MintCache.staticInstance = self
 
-    self.__lock = RLock()
     self.__cache = dict()
     self.__pending = dict()
     self.__dirty = False
 
   def get(self, key, usePending=True):
     result = None
-    self.__lock.acquire()
+
     if key in self.__cache:
       result = self.__cache[key]
     elif usePending and key in self.__pending:
       result = self.__pending[key]
-    self.__lock.release()
     return result
 
   def set(self, key, value):
-    self.__lock.acquire()
     self.__pending[key] = value
     self.__dirty = True
-    self.__lock.release()
 
   def commit(self):
-    self.__lock.acquire()
     self.__cache.update(self.__pending)
     self.__pending.clear()
     self.__dirty = False
-    self.__lock.release()
 
   def rollback(self):
-    self.__lock.acquire()
     self.__pending.clear()
     self.__dirty = False
-    self.__lock.release()
 
   def isDirty(self):
-    self.__lock.acquire()
     return self.__dirty
-    self.__lock.release()

Deleted: mgmt/trunk/mint/python/mint/priqueue.py
===================================================================
--- mgmt/trunk/mint/python/mint/priqueue.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/priqueue.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,39 +0,0 @@
-from Queue import Queue
-from collections import deque
-
-class PriQueue(Queue):
-  """ """
-  def __init__(self, maxsize=0, slotCount=1):
-    self.slotCount=slotCount
-    Queue.__init__(self, maxsize)
-
-  def _init(self, maxsize):
-    self.maxsize = maxsize
-    self.slots = []
-    for i in range(self.slotCount):
-      self.slots.append(deque())
-
-  def _qsize(self):
-    size = 0
-    for i in range(self.slotCount):
-      size += len(self.slots[i])
-    return size
-
-  def _empty(self):
-    return self._qsize() == 0
-
-  def _full(self):
-    return self.maxsize > 0 and self._qsize() == self.maxsize
-
-  def _put(self, item):
-    slot = item[0]
-    if slot in range(self.slotCount):
-      self.slots[slot].append(item)
-    else:
-      raise ValueError("Invalid priority slot")
-
-  def _get(self):
-    for slot in range(self.slotCount):
-      if len(self.slots[slot]) > 0:
-        return self.slots[slot].popleft()
-    return None

Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/schema.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -17,7 +17,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -31,20 +31,20 @@
   CheckpointPlatform = StringCol(length=1000, default=None)
   ClientMachine = StringCol(length=1000, default=None)
   ConcurrencyLimits = StringCol(length=1000, default=None)
-  Cpus = IntCol(default=None)
+  Cpus = BigIntCol(default=None)
   CurrentRank = FloatCol(default=None)
-  Disk = IntCol(default=None)
+  Disk = BigIntCol(default=None)
   FileSystemDomain = StringCol(length=1000, default=None)
   GlobalJobId = StringCol(length=1000, default=None)
-  ImageSize = IntCol(default=None)
+  ImageSize = BigIntCol(default=None)
   IsValidCheckpointPlatform = StringCol(length=4000, default=None)
   JobId = StringCol(length=1000, default=None)
   JobStart = TimestampCol(default=None)
-  KFlops = IntCol(default=None)
+  KFlops = BigIntCol(default=None)
   Machine = StringCol(length=1000, default=None)
   MaxJobRetirementTime = StringCol(length=4000, default=None)
-  Memory = IntCol(default=None)
-  Mips = IntCol(default=None)
+  Memory = BigIntCol(default=None)
+  Mips = BigIntCol(default=None)
   MyAddress = StringCol(length=1000, default=None)
   Name = StringCol(length=1000, default=None)
   OpSys = StringCol(length=1000, default=None)
@@ -57,23 +57,23 @@
   Requirements = StringCol(length=4000, default=None)
   PublicNetworkIpAddr = StringCol(length=1000, default=None)
   Rank = StringCol(length=4000, default=None)
-  SlotID = IntCol(default=None)
+  SlotID = BigIntCol(default=None)
   Start = StringCol(length=4000, default=None)
   StarterAbilityList = StringCol(length=4000, default=None)
-  TotalClaimRunTime = IntCol(default=None)
-  TotalClaimSuspendTime = IntCol(default=None)
-  TotalCpus = IntCol(default=None)
-  TotalDisk = IntCol(default=None)
-  TotalJobRunTime = IntCol(default=None)
-  TotalJobSuspendTime = IntCol(default=None)
-  TotalMemory = IntCol(default=None)
-  TotalSlots = IntCol(default=None)
-  TotalVirtualMemory = IntCol(default=None)
+  TotalClaimRunTime = BigIntCol(default=None)
+  TotalClaimSuspendTime = BigIntCol(default=None)
+  TotalCpus = BigIntCol(default=None)
+  TotalDisk = BigIntCol(default=None)
+  TotalJobRunTime = BigIntCol(default=None)
+  TotalJobSuspendTime = BigIntCol(default=None)
+  TotalMemory = BigIntCol(default=None)
+  TotalSlots = BigIntCol(default=None)
+  TotalVirtualMemory = BigIntCol(default=None)
   UidDomain = StringCol(length=1000, default=None)
-  VirtualMemory = IntCol(default=None)
-  WindowsBuildNumber = IntCol(default=None)
-  WindowsMajorVersion = IntCol(default=None)
-  WindowsMinorVersion = IntCol(default=None)
+  VirtualMemory = BigIntCol(default=None)
+  WindowsBuildNumber = BigIntCol(default=None)
+  WindowsMajorVersion = BigIntCol(default=None)
+  WindowsMinorVersion = BigIntCol(default=None)
 
   CondorPlatform = StringCol(length=1000, default=None)
   CondorVersion = StringCol(length=1000, default=None)
@@ -88,13 +88,13 @@
   slot = ForeignKey('Slot', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
   Activity = StringCol(length=1000, default=None)
-  ClockDay = IntCol(default=None)
-  ClockMin = IntCol(default=None)
+  ClockDay = BigIntCol(default=None)
+  ClockMin = BigIntCol(default=None)
   CondorLoadAvg = FloatCol(default=None)
-  ConsoleIdle = IntCol(default=None)
+  ConsoleIdle = BigIntCol(default=None)
   EnteredCurrentActivity = TimestampCol(default=None)
   EnteredCurrentState = TimestampCol(default=None)
-  KeyboardIdle = IntCol(default=None)
+  KeyboardIdle = BigIntCol(default=None)
   LastBenchmark = TimestampCol(default=None)
   LastFetchWorkCompleted = TimestampCol(default=None)
   LastFetchWorkSpawned = TimestampCol(default=None)
@@ -103,28 +103,28 @@
   MyCurrentTime = TimestampCol(default=None)
   NextFetchWorkDelay = IntCol(default=None)
   State = StringCol(length=1000, default=None)
-  TimeToLive = IntCol(default=None)
+  TimeToLive = BigIntCol(default=None)
   TotalCondorLoadAvg = FloatCol(default=None)
   TotalLoadAvg = FloatCol(default=None)
-  TotalTimeBackfillBusy = IntCol(default=None)
-  TotalTimeBackfillIdle = IntCol(default=None)
-  TotalTimeBackfillKilling = IntCol(default=None)
-  TotalTimeClaimedBusy = IntCol(default=None)
-  TotalTimeClaimedIdle = IntCol(default=None)
-  TotalTimeClaimedRetiring = IntCol(default=None)
-  TotalTimeClaimedSuspended = IntCol(default=None)
-  TotalTimeMatchedIdle = IntCol(default=None)
-  TotalTimeOwnerIdle = IntCol(default=None)
-  TotalTimePreemptingKilling = IntCol(default=None)
-  TotalTimePreemptingVacating = IntCol(default=None)
-  TotalTimeUnclaimedBenchmarking = IntCol(default=None)
-  TotalTimeUnclaimedIdle = IntCol(default=None)
+  TotalTimeBackfillBusy = BigIntCol(default=None)
+  TotalTimeBackfillIdle = BigIntCol(default=None)
+  TotalTimeBackfillKilling = BigIntCol(default=None)
+  TotalTimeClaimedBusy = BigIntCol(default=None)
+  TotalTimeClaimedIdle = BigIntCol(default=None)
+  TotalTimeClaimedRetiring = BigIntCol(default=None)
+  TotalTimeClaimedSuspended = BigIntCol(default=None)
+  TotalTimeMatchedIdle = BigIntCol(default=None)
+  TotalTimeOwnerIdle = BigIntCol(default=None)
+  TotalTimePreemptingKilling = BigIntCol(default=None)
+  TotalTimePreemptingVacating = BigIntCol(default=None)
+  TotalTimeUnclaimedBenchmarking = BigIntCol(default=None)
+  TotalTimeUnclaimedIdle = BigIntCol(default=None)
 
-  MonitorSelfAge = IntCol(default=None)
+  MonitorSelfAge = BigIntCol(default=None)
   MonitorSelfCPUUsage = FloatCol(default=None)
   MonitorSelfImageSize = FloatCol(default=None)
-  MonitorSelfRegisteredSocketCount = IntCol(default=None)
-  MonitorSelfResidentSetSize = IntCol(default=None)
+  MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+  MonitorSelfResidentSetSize = BigIntCol(default=None)
   MonitorSelfTime = TimestampCol(default=None)
 
 
@@ -138,7 +138,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -149,29 +149,29 @@
   submitter = ForeignKey('Submitter', cascade='null', default=None)
   AccountingGroup = StringCol(length=1000, default=None)
   Args = StringCol(length=4000, default=None)
-  ClusterId = IntCol(default=None)
+  ClusterId = BigIntCol(default=None)
   Cmd = StringCol(length=4000, default=None)
   ConcurrencyLimits = StringCol(length=4000, default=None)
   CustomGroup = StringCol(length=1000, default=None)
   CustomId = StringCol(length=1000, default=None)
-  CustomPriority = IntCol(default=None)
+  CustomPriority = BigIntCol(default=None)
   GlobalJobId = StringCol(length=1000, default=None)
   InRsv = StringCol(length=4000, default=None)
   Iwd = StringCol(length=4000, default=None)
-  JobStatus = IntCol(default=None)
+  JobStatus = BigIntCol(default=None)
   Note = StringCol(length=4000, default=None)
   Out = StringCol(length=4000, default=None)
   Owner = StringCol(length=1000, default=None)
   GridUser = StringCol(length=1000, default=None)
-  ProcId = IntCol(default=None)
+  ProcId = BigIntCol(default=None)
   QDate = TimestampCol(default=None)
-  JobUniverse = IntCol(default=None)
+  JobUniverse = BigIntCol(default=None)
   Title = StringCol(length=1000, default=None)
   UserLog = StringCol(length=4000, default=None)
   HoldReason = StringCol(length=4000, default=None)
   DAGNodeName = StringCol(length=1000, default=None)
   DAGParentNodeNames = StringCol(length=4000, default=None)
-  DAGManJobId = IntCol(default=None)
+  DAGManJobId = BigIntCol(default=None)
   Ad = PickleCol(default=None)
 
 
@@ -250,7 +250,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -260,7 +260,7 @@
   Pool = StringCol(length=1000, default=None)
   System = StringCol(length=1000, default=None)
   JobQueueBirthdate = TimestampCol(default=None)
-  MaxJobsRunning = IntCol(default=None)
+  MaxJobsRunning = BigIntCol(default=None)
   Machine = StringCol(length=1000, default=None)
   MyAddress = StringCol(length=1000, default=None)
   Name = StringCol(length=1000, default=None)
@@ -278,18 +278,18 @@
   recTime = TimestampCol(default=None)
   scheduler = ForeignKey('Scheduler', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
-  NumUsers = IntCol(default=None)
-  TotalHeldJobs = IntCol(default=None)
-  TotalIdleJobs = IntCol(default=None)
-  TotalJobAds = IntCol(default=None)
-  TotalRemovedJobs = IntCol(default=None)
-  TotalRunningJobs = IntCol(default=None)
+  NumUsers = BigIntCol(default=None)
+  TotalHeldJobs = BigIntCol(default=None)
+  TotalIdleJobs = BigIntCol(default=None)
+  TotalJobAds = BigIntCol(default=None)
+  TotalRemovedJobs = BigIntCol(default=None)
+  TotalRunningJobs = BigIntCol(default=None)
 
-  MonitorSelfAge = IntCol(default=None)
+  MonitorSelfAge = BigIntCol(default=None)
   MonitorSelfCPUUsage = FloatCol(default=None)
   MonitorSelfImageSize = FloatCol(default=None)
-  MonitorSelfRegisteredSocketCount = IntCol(default=None)
-  MonitorSelfResidentSetSize = IntCol(default=None)
+  MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+  MonitorSelfResidentSetSize = BigIntCol(default=None)
   MonitorSelfTime = TimestampCol(default=None)
 
 
@@ -303,7 +303,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -324,9 +324,9 @@
   recTime = TimestampCol(default=None)
   submitter = ForeignKey('Submitter', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
-  HeldJobs = IntCol(default=None)
-  IdleJobs = IntCol(default=None)
-  RunningJobs = IntCol(default=None)
+  HeldJobs = BigIntCol(default=None)
+  IdleJobs = BigIntCol(default=None)
+  RunningJobs = BigIntCol(default=None)
 
 
 
@@ -339,7 +339,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -389,11 +389,11 @@
   negotiator = ForeignKey('Negotiator', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
 
-  MonitorSelfAge = IntCol(default=None)
+  MonitorSelfAge = BigIntCol(default=None)
   MonitorSelfCPUUsage = FloatCol(default=None)
   MonitorSelfImageSize = FloatCol(default=None)
-  MonitorSelfRegisteredSocketCount = IntCol(default=None)
-  MonitorSelfResidentSetSize = IntCol(default=None)
+  MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+  MonitorSelfResidentSetSize = BigIntCol(default=None)
   MonitorSelfTime = TimestampCol(default=None)
 
 
@@ -407,7 +407,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -441,7 +441,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -485,11 +485,11 @@
   master = ForeignKey('Master', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
 
-  MonitorSelfAge = IntCol(default=None)
+  MonitorSelfAge = BigIntCol(default=None)
   MonitorSelfCPUUsage = FloatCol(default=None)
   MonitorSelfImageSize = FloatCol(default=None)
-  MonitorSelfRegisteredSocketCount = IntCol(default=None)
-  MonitorSelfResidentSetSize = IntCol(default=None)
+  MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+  MonitorSelfResidentSetSize = BigIntCol(default=None)
   MonitorSelfTime = TimestampCol(default=None)
 
 
@@ -503,7 +503,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -544,7 +544,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -555,7 +555,7 @@
   clusterName = StringCol(length=1000, default=None)
   clusterID = StringCol(length=1000, default=None)
   publishedURL = StringCol(length=1000, default=None)
-  clusterSize = SmallIntCol(default=None)
+  clusterSize = IntCol(default=None)
   status = StringCol(length=1000, default=None)
   members = StringCol(length=4000, default=None)
 
@@ -591,7 +591,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -600,15 +600,15 @@
   classInfos = dict() # brokerId => classInfo
   broker = ForeignKey('Broker', cascade='null', default=None)
   location = StringCol(length=1000, default=None)
-  defaultInitialFileCount = SmallIntCol(default=None)
-  defaultDataFileSize = IntCol(default=None)
+  defaultInitialFileCount = IntCol(default=None)
+  defaultDataFileSize = BigIntCol(default=None)
   tplIsInitialized = BoolCol(default=None)
   tplDirectory = StringCol(length=1000, default=None)
-  tplWritePageSize = IntCol(default=None)
-  tplWritePages = IntCol(default=None)
-  tplInitialFileCount = SmallIntCol(default=None)
-  tplDataFileSize = IntCol(default=None)
-  tplCurrentFileCount = IntCol(default=None)
+  tplWritePageSize = BigIntCol(default=None)
+  tplWritePages = BigIntCol(default=None)
+  tplInitialFileCount = IntCol(default=None)
+  tplDataFileSize = BigIntCol(default=None)
+  tplCurrentFileCount = BigIntCol(default=None)
 
 
 class StoreStats(SQLObject):
@@ -639,7 +639,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -650,13 +650,13 @@
   name = StringCol(length=1000, default=None)
   directory = StringCol(length=1000, default=None)
   baseFileName = StringCol(length=1000, default=None)
-  writePageSize = IntCol(default=None)
-  writePages = IntCol(default=None)
-  readPageSize = IntCol(default=None)
-  readPages = IntCol(default=None)
-  initialFileCount = SmallIntCol(default=None)
-  dataFileSize = IntCol(default=None)
-  currentFileCount = IntCol(default=None)
+  writePageSize = BigIntCol(default=None)
+  writePages = BigIntCol(default=None)
+  readPageSize = BigIntCol(default=None)
+  readPages = BigIntCol(default=None)
+  initialFileCount = IntCol(default=None)
+  dataFileSize = BigIntCol(default=None)
+  currentFileCount = BigIntCol(default=None)
 
 
   def expand(self, model, callback, by):
@@ -715,7 +715,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -749,7 +749,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -757,12 +757,12 @@
   statsPrev = ForeignKey('BrokerStats', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
   system = ForeignKey('System', cascade='null', default=None)
-  port = IntCol(default=None)
-  workerThreads = SmallIntCol(default=None)
-  maxConns = SmallIntCol(default=None)
-  connBacklog = SmallIntCol(default=None)
-  stagingThreshold = IntCol(default=None)
-  mgmtPubInterval = SmallIntCol(default=None)
+  port = BigIntCol(default=None)
+  workerThreads = IntCol(default=None)
+  maxConns = IntCol(default=None)
+  connBacklog = IntCol(default=None)
+  stagingThreshold = BigIntCol(default=None)
+  mgmtPubInterval = IntCol(default=None)
   version = StringCol(length=1000, default=None)
   dataDir = StringCol(length=1000, default=None)
 
@@ -831,7 +831,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -842,8 +842,8 @@
   label = StringCol(length=1000, default=None)
   broker = ForeignKey('Broker', cascade='null', default=None)
   systemId = BLOBCol(default=None)
-  brokerBank = IntCol(default=None)
-  agentBank = IntCol(default=None)
+  brokerBank = BigIntCol(default=None)
+  agentBank = BigIntCol(default=None)
 
 
 class AgentStats(SQLObject):
@@ -865,7 +865,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -896,7 +896,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -966,7 +966,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1011,7 +1011,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1045,7 +1045,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1090,7 +1090,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1099,7 +1099,7 @@
   classInfos = dict() # brokerId => classInfo
   vhost = ForeignKey('Vhost', cascade='null', default=None)
   host = StringCol(length=1000, default=None)
-  port = IntCol(default=None)
+  port = BigIntCol(default=None)
   transport = StringCol(length=1000, default=None)
   durable = BoolCol(default=None)
 
@@ -1156,7 +1156,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1164,7 +1164,7 @@
   statsPrev = ForeignKey('BridgeStats', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
   link = ForeignKey('Link', cascade='null', default=None)
-  channelId = SmallIntCol(default=None)
+  channelId = IntCol(default=None)
   durable = BoolCol(default=None)
   src = StringCol(length=1000, default=None)
   dest = StringCol(length=1000, default=None)
@@ -1201,7 +1201,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1210,9 +1210,9 @@
   classInfos = dict() # brokerId => classInfo
   vhost = ForeignKey('Vhost', cascade='null', default=None)
   name = StringCol(length=1000, default=None)
-  channelId = SmallIntCol(default=None)
+  channelId = IntCol(default=None)
   clientConnection = ForeignKey('ClientConnection', cascade='null', default=None)
-  detachedLifespan = IntCol(default=None)
+  detachedLifespan = BigIntCol(default=None)
   attached = BoolCol(default=None)
   expireTime = TimestampCol(default=None)
 
@@ -1265,7 +1265,7 @@
   sourceScopeId = BigIntCol(default=None)
   sourceObjectId = BigIntCol(default=None)
   source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
-  qmfClassKey = BLOBCol(default=None)
+  qmfClassKey = StringCol(length=1000, default=None)
   creationTime = TimestampCol(default=None)
   deletionTime = TimestampCol(default=None)
   managedBroker = StringCol(length=1000, default=None)
@@ -1278,8 +1278,8 @@
   release = StringCol(length=1000, default=None)
   version = StringCol(length=1000, default=None)
   machine = StringCol(length=1000, default=None)
-  memTotal = IntCol(default=None)
-  swapTotal = IntCol(default=None)
+  memTotal = BigIntCol(default=None)
+  swapTotal = BigIntCol(default=None)
 
 
 class SysimageStats(SQLObject):
@@ -1289,13 +1289,13 @@
   recTime = TimestampCol(default=None)
   sysimage = ForeignKey('Sysimage', cascade='null', default=None)
   classInfos = dict() # brokerId => classInfo
-  memFree = IntCol(default=None)
-  swapFree = IntCol(default=None)
+  memFree = BigIntCol(default=None)
+  swapFree = BigIntCol(default=None)
   loadAverage1Min = FloatCol(default=None)
   loadAverage5Min = FloatCol(default=None)
   loadAverage10Min = FloatCol(default=None)
-  procTotal = IntCol(default=None)
-  procRunning = IntCol(default=None)
+  procTotal = BigIntCol(default=None)
+  procRunning = BigIntCol(default=None)
 
 
 

Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/schemaparser.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -21,8 +21,10 @@
     self.dataTypesMap["uuid"] = "BLOBCol"
     self.dataTypesMap["int32"] = "IntCol"
     self.dataTypesMap["uint8"] = self.dataTypesMap["hilo8"] = self.dataTypesMap["count8"] = self.dataTypesMap["mma8"] = "SmallIntCol"
-    self.dataTypesMap["uint16"] = self.dataTypesMap["hilo16"] = self.dataTypesMap["count16"] = self.dataTypesMap["mma16"] = "SmallIntCol"
-    self.dataTypesMap["uint32"] = self.dataTypesMap["hilo32"] = self.dataTypesMap["count32"] = self.dataTypesMap["mma32"] = self.dataTypesMap["atomic32"] = "IntCol"
+    self.dataTypesMap["hilo16"] = self.dataTypesMap["count16"] = self.dataTypesMap["mma16"] = "SmallIntCol"
+    self.dataTypesMap["uint16"] = "IntCol"
+    self.dataTypesMap["hilo32"] = self.dataTypesMap["count32"] = self.dataTypesMap["mma32"] = self.dataTypesMap["atomic32"] = "IntCol"
+    self.dataTypesMap["uint32"] = "BigIntCol"
     self.dataTypesMap["uint64"] = self.dataTypesMap["hilo64"] = self.dataTypesMap["count64"] = self.dataTypesMap["mma64"] = self.dataTypesMap["mmaTime"] = "BigIntCol"
     self.dataTypesMap["float"] = self.dataTypesMap["double"] = "FloatCol"
     self.dataTypesMap["absTime"] = "TimestampCol"
@@ -144,7 +146,7 @@
       self.generateAttrib("sourceScopeId", "BigIntCol")
       self.generateAttrib("sourceObjectId", "BigIntCol")
       self.generateSourceIdsIndex(pythonName)
-      self.generateAttrib("qmfClassKey", "BLOBCol")
+      self.generateAttrib("qmfClassKey", "StringCol", "length=1000")
       self.generateTimestampAttrib("creation")
       self.generateTimestampAttrib("deletion")
       self.generateAttrib("managedBroker", "StringCol", "length=1000")

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/update.py	2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,49 +1,48 @@
+import sys
 import logging
 import datetime
 import types
-from Queue import Full, Empty
+import pickle
+import psycopg2
+from Queue import Queue as ConcurrentQueue, Full, Empty
 from threading import Thread
 from traceback import print_exc
 from qpid.datatypes import UUID
 from mint.schema import *
-from mint.priqueue import PriQueue as ConcurrentQueue
+from time import clock
+from sql import *
+from collections import deque
 
 log = logging.getLogger("mint.update")
 
-def time_unwarp(t):
-    """ don't allow future dates """
-    
-    tnow = datetime.now()
-    return t > tnow and tnow or t
-    
 class ModelUpdateThread(Thread):
   def __init__(self, model):
     super(ModelUpdateThread, self).__init__()
 
     self.model = model
-    self.updates = ConcurrentQueue(slotCount=2)
+    self.updates = UpdateQueue(slotCount=2)
     self.stopRequested = False
-    self.setDaemon(False)
 
     self.enqueueCount = 0
     self.dequeueCount = 0
-
     self.commitThreshold = 100
 
-  def enqueue(self, update, priority=0):
+    self.setDaemon(False)
+
+  def enqueue(self, update):
     try:
-      self.updates.put((priority, update))
+      self.updates.put(update)
 
       self.enqueueCount += 1
     except Full:
       log.exception("Queue is full")
-      pass
 
   def run(self):
     conn = self.model.dbConn.getConnection()
+
     while True:
       try:
-        priority, update = self.updates.get(True, 1)
+        update = self.updates.get(True, 1)
 
         self.dequeueCount += 1
 
@@ -57,225 +56,331 @@
           continue
 
       try:
-        update.process(self.model, conn)
-        if self.dequeueCount % self.commitThreshold == 0 or self.updates.qsize() == 0:
-          # commit only every "commitThreshold" updates, or whenever the update queue is empty
-          conn.commit()
-          self.model.cache.commit()
+        update.process(conn)
+
+        if self.dequeueCount % self.commitThreshold == 0 \
+                or self.updates.qsize() == 0:
+          # commit only every "commitThreshold" updates, or whenever
+          # the update queue is empty
+
+          if profile:
+            start = clock()
+            conn.commit()
+            self.model.cache.commit()
+            profile.commitTime += clock() - start
+          else:
+            conn.commit()
+            self.model.cache.commit()
       except:
         conn.rollback()  
         self.model.cache.rollback()
         log.exception("Update failed")
-        pass
 
   def stop(self):
     self.stopRequested = True
 
+class ReferenceException(Exception):
+    def __init__(self, sought):
+        self.sought = sought
+
+    def __str__(self):
+        return repr(self.sought)
+
 class ModelUpdate(object):
-  def __init__(self, broker, obj):
+  def __init__(self, model, broker, object):
+    self.model = model
     self.broker = broker
-    self.qmfObj = obj
+    self.object = object
+    self.priority = 0
 
-  def getStatsClass(self, cls):
-    return cls + "Stats"
-  
-  def process(self, model, conn):
+  def getClass(self):
+    # XXX this is unfortunate
+    name = self.object.getClassKey().getClassName()
+    name = mint.schema.schemaReservedWordsMap.get(name, name)
+    name = name[0].upper() + name[1:]
+
+    try:
+      return schemaNameToClassMap[name]
+    except KeyError:
+      raise ReferenceException(name)
+
+  def process(self, conn):
     pass
 
-  def processAttributes(self, attrs, cls, model, conn):
-    results = {}
-    orphan = False
+  def processAttributes(self, attrs, cls):
+    results = dict()
 
-    for (key, value) in attrs:
+    for key, value in attrs:
       name = key.__repr__()
-      if name in mint.schema.schemaReservedWordsMap:
-        name = mint.schema.schemaReservedWordsMap.get(name)
+      name = mint.schema.schemaReservedWordsMap.get(name, name)
       
       if key.type == 10:
-        # Navigate to referenced objects
-        if name.endswith("Ref"):
-          name = name[:-3]
-        className = name[0].upper() + name[1:]
-        otherClass = getattr(mint, className, None)
-        if otherClass:
-          foreignKey = name + "_id"
-          valueFirst = value.first
-          valueSecond = value.second
-          cachedId = model.cache.get((valueFirst, valueSecond))
-          if cachedId != None:
-            results[foreignKey] = cachedId
-          else:
-            model.addOrphanObject(className, valueFirst, valueSecond, self)  
-            orphan = True
-        else:
-          log.error("Class '%s' not found" % className)
+        self.processReference(name, value, results)
       elif key.type == 8:
-        # convert ABSTIME types
-        if value:
-          results[name] = time_unwarp(datetime.fromtimestamp(value/1000000000))
-        else:
-          results[name] = None
+        self.processTimestamp(name, value, results)
       elif key.type == 14:
-        # convert UUIDs into their string representation, to be handled by sqlobject
+        # Convert UUIDs into their string representation, to be
+        # handled by sqlobject
         results[name] = str(value)
-      elif not hasattr(getattr(mint, cls), name):
+      elif key.type == 15:
+        results[name] = pickle.dumps(value)
+      elif not hasattr(cls, name):
         # Discard attrs that we don't have in our schema
-        log.debug("Class '%s' has no field '%s'" % (cls, name))
+        log.debug("%s has no field '%s'" % (cls, name))
       else:
         results[name] = value
-    if orphan:
-      return None
-    else:
-      return results
 
+    return results
+
+  # XXX this needs to be a much more straightforward procedure
+  def processReference(self, name, oid, results):
+    if name.endswith("Ref"):
+      name = name[:-3]
+
+    className = name[0].upper() + name[1:]
+    otherClass = getattr(mint, className)
+
+    foreignKey = name + "_id"
+
+    id = self.model.cache.get(oid)
+
+    if id is None:
+      raise ReferenceException(oid)
+
+    results[foreignKey] = id
+
+  def processTimestamp(self, name, tstamp, results):
+    if tstamp:
+      tnow = datetime.now()
+      t = datetime.fromtimestamp(tstamp / 1000000000)
+      t = t > tnow and tnow or t
+
+      results[name] = t
+
 class PropertyUpdate(ModelUpdate):
-  def __init__(self, broker, obj):
-    ModelUpdate.__init__(self, broker, obj)
+  def process(self, conn):
+    try:
+      cls = self.getClass()
+    except ReferenceException, e:
+      log.info("Referenced class %r not found", e.sought)
+      return
 
-  def process(self, model, conn):
-    clsKey = self.qmfObj.getClassKey()
-    origCls = cls = clsKey.getClassName()
-    if cls in mint.schema.schemaReservedWordsMap:
-      cls = mint.schema.schemaReservedWordsMap.get(cls)
-    cls = cls[0].upper()+cls[1:]  
-    sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+    oid = self.object.getObjectId()
 
-    if not hasattr(mint, cls):
-      # Discard classes that we don't have in our schema
-      log.debug("Class '%s' is not in the schema" % (cls))
-      return  
+    try:
+      attrs = self.processAttributes(self.object.getProperties(), cls)
+    except ReferenceException, e:
+      log.info("Referenced object %r not found", e.sought)
 
-    properties = self.qmfObj.getProperties()
-    timestamps = self.qmfObj.getTimestamps()
-    id = self.qmfObj.getObjectId()
-    idFirst = id.first
-    idSecond = id.second
+      try:
+        orphans = self.model.orphans[oid]
+        orphans.append(self)
+      except KeyError:
+        self.model.orphans[oid] = list((self,))
 
-    attrs = self.processAttributes(properties, cls, model, conn)
-    if attrs == None:
-      # object is orphan, a parent dependency was not found; 
-      # insertion in db is deferred until parent info is received
-      return 
+      return
 
-    attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
-    attrs["creationTime"] = time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
+    timestamps = self.object.getTimestamps()
+
+    self.processTimestamp("recTime", timestamps[0], attrs)
+    self.processTimestamp("creationTime", timestamps[1], attrs)
+
     if timestamps[2] != 0:
-      attrs["deletionTime"] = time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
-      log.debug("%s(%s) marked deleted", cls, id)
+      self.processTimestamp("deletionTime", timestamps[2], attrs)
 
-    attrs["sourceScopeId"] = idFirst
-    attrs["sourceObjectId"] = idSecond
-    attrs["qmfClassKey"] = clsKey
+      log.debug("%s(%s) marked deleted", cls.__name__, oid)
+
+    attrs["sourceScopeId"] = oid.first
+    attrs["sourceObjectId"] = oid.second
+    attrs["qmfClassKey"] = str(self.object.getClassKey())
     attrs["managedBroker"] = str(self.broker.getBrokerId())
 
     cursor = conn.cursor()
-    sql = model.generateSQLUpdate(sqlCls, attrs, id)
-    #log.debug("SQL: %s", sql)
-    cursor.execute(sql)
-    isUpdate = cursor.rowcount > 0
-    if not isUpdate:
-      # update failed, need to insert  
-      sql = model.generateSQLInsert(sqlCls, attrs)
-      #log.debug("SQL: %s", sql)
-      cursor.execute(sql)
-      log.debug("%s(%s) created", cls, id)
- 
-    dbId = model.cache.get((idFirst, idSecond))
-    if dbId is None:
-      if isUpdate:
-        sql = model.generateSQLSelect(sqlCls, id)
-      else:
-        sql = "select currval('%s_id_seq')" % (sqlCls)
-      #log.debug("SQL: %s", sql)
-      cursor.execute(sql)
+
+    # Cases:
+    #
+    # 1. Object is utterly new to mint
+    # 2. Object is in mint's db, but id is not yet cached
+    # 3. Object is in mint's db, and id is cached
+
+    id = self.model.cache.get(oid)
+
+    if id is None:
+      # Case 1 or 2
+
+      op = SqlGetId(cls)
+      op.execute(cursor, attrs)
+
       rec = cursor.fetchone()
-      dbId = rec[0]
-      model.cache.set((idFirst, idSecond), dbId)
 
-    if cls == "Broker" and str(self.broker.getBrokerId()) in model.managedBrokers:
-      broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
-      if dbObjId == 0: 
-        sql = model.generateSQLUpdateWhere("broker_registration", \
-                {"broker_id": dbId}, {"url": self.broker.getFullUrl()})
-        #log.debug("SQL: %s", sql)
-        cursor.execute(sql)
+      if rec:
+        id = rec[0]
 
-        sql = """
-          update broker
-          set registration_id =
-            (select id from broker_registration where broker_id = %(id)s)
-          where id = %(id)s
-        """
+      if id is None:
+        # Case 1
 
-        cursor.execute(sql, {"id": dbId})
+        op = SqlInsert(cls, attrs)
+        op.execute(cursor, attrs)
 
-        model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbId)
+        id = cursor.fetchone()[0]
 
-    model.requeueIfOrphanParent(cls, idFirst, idSecond)
+        log.debug("%s(%i) created", cls.__name__, id)
 
+        if cls is Broker:
+          self.processBroker(cursor, id)
+      else:
+        # Case 2
+
+        attrs["id"] = id
+
+        op = SqlUpdate(cls, attrs)
+        op.execute(cursor, attrs)
+
+        assert cursor.rowcount == 1
+
+      self.model.cache.set(oid, id)
+    else:
+      # Case 3
+
+      attrs["id"] = id
+      
+      op = SqlUpdate(cls, attrs)
+      op.execute(cursor, attrs)
+
+      assert cursor.rowcount == 1
+
+    try:
+      orphans = self.model.orphans.pop(oid)
+
+      if orphans:
+        log.info("Re-enqueueing %i orphans whose creation had been deferred",
+                 len(orphans))
+
+        for orphan in orphans:
+          self.model.processThread.enqueue(orphan)
+    except KeyError:
+      pass
+
+  def processBroker(self, cursor, id):
+    brokerId = str(self.broker.getBrokerId())
+
+    if brokerId in self.model.managedBrokers:
+      broker, dbObjId = self.model.managedBrokers[brokerId]
+
+      #print "broker, dbObjId", broker, dbObjId
+
+      if dbObjId == 0:
+        op = SqlGetBrokerRegistration()
+        op.execute(cursor, {"url": self.broker.getFullUrl()})
+
+        #print op.text % {"url": self.broker.getFullUrl()}
+
+        rec = cursor.fetchone()
+
+        if rec:
+          rid = rec[0]
+
+          op = SqlAttachBroker()
+          op.execute(cursor, {"id": id, "registrationId": rid})
+
+          #print op.text % {"id": id, "registrationId": rid}
+
+        self.model.managedBrokers[brokerId] = (broker, id)
+
+
+
 class StatisticUpdate(ModelUpdate):
-  def __init__(self, broker, obj):
-    ModelUpdate.__init__(self, broker, obj)
+  def process(self, conn):
+    try:
+      cls = self.getClass()
+    except ReferenceException, e:
+      log.info("Referenced class %r not found", e.sought)
+      return
 
-  def process(self, model, conn):
-    origCls = cls = self.qmfObj.getClassKey().getClassName()
-    if cls in mint.schema.schemaReservedWordsMap:
-      cls = mint.schema.schemaReservedWordsMap.get(cls)
-    cls = cls[0].upper()+cls[1:]  
-    sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+    statsCls = getattr(mint, "%sStats" % cls.__name__)
 
-    if not hasattr(mint, cls):
-      # Discard classes that we don't have in our schema
-      log.debug("Class '%s' is not in the schema" % (cls))
-      return  
+    oid = self.object.getObjectId()
+    id = self.model.cache.get(oid)
 
-    statistics = self.qmfObj.getStatistics()
-    timestamps = self.qmfObj.getTimestamps()
-    id = self.qmfObj.getObjectId()
-    idFirst = id.first
-    idSecond = id.second
+    if id is None:
+      # Just drop it; we'll get more stats later
+      return
 
-    statsCls = self.getStatsClass(cls)
-    sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
+    try:
+      attrs = self.processAttributes(self.object.getStatistics(), statsCls)
+    except ReferenceException:
+      # Drop it
+      return
 
-    attrs = self.processAttributes(statistics, statsCls, model, conn)
-    if attrs == None:
-      # object is orphan, a parent dependency was not found; 
-      # insertion in db is deferred until parent info is received
-      return 
+    timestamps = self.object.getTimestamps()
 
-    attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
-    cachedId = model.cache.get((idFirst, idSecond))
-    if cachedId != None:
-      attrs[sqlCls + "_id"] = cachedId
-    else:
-      model.addOrphanObject(cls, idFirst, idSecond, self)
-      
+    self.processTimestamp("recTime", timestamps[0], attrs)
+
+    attrs["%s_id" % cls.sqlmeta.table] = id
+
     cursor = conn.cursor()
-    sql = model.generateSQLInsert(sqlStatsCls, attrs)
-    #log.debug("SQL: %s", sql)
-    cursor.execute(sql)
 
-    sql = "select currval('%s_id_seq')" % (sqlStatsCls)
-    #log.debug("SQL: %s", sql)
-    cursor.execute(sql)
-    rec = cursor.fetchone()
-    dbStatsId = rec[0]  
-    log.debug("%s(%s) created", statsCls, id)
+    op = SqlInsert(statsCls, attrs)
+    op.execute(cursor, attrs)
 
-    sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id, updateStats=True)
-    #log.debug("SQL: %s", sql)
-    cursor.execute(sql)
+    statsId = cursor.fetchone()[0]
 
+    log.debug("%s(%s) created", statsCls.__name__, id)
+
+    op = SqlSetStatsRefs(cls)
+    op.execute(cursor, {"statsId": statsId, "id": id})
+
 class MethodUpdate(ModelUpdate):
   def __init__(self, broker, seq, response):
-    ModelUpdate.__init__(self, broker, response)
+    super(MethodUpdate, self).__init__(broker, response)
+
     self.seq = seq
 
-  def process(self, model, conn):
-    model.lock()
+  def process(self, conn):
+    self.model.lock()
+
     try:
-      methodCallback = model.outstandingMethodCalls.pop(self.seq)
-      methodCallback(self.qmfObj.text, self.qmfObj.outArgs)
+      methodCallback = self.model.outstandingMethodCalls.pop(self.seq)
+      methodCallback(self.object.text, self.object.outArgs)
     finally:
-      model.unlock()
+      self.model.unlock()
+
+class UpdateQueue(ConcurrentQueue):
+  def __init__(self, maxsize=0, slotCount=1):
+    self.slotCount = slotCount
+    ConcurrentQueue.__init__(self, maxsize)
+
+  def _init(self, maxsize):
+    self.maxsize = maxsize
+    self.slots = []
+
+    for i in range(self.slotCount):
+      self.slots.append(deque())
+
+  def _qsize(self):
+    size = 0
+
+    for i in range(self.slotCount):
+      size += len(self.slots[i])
+
+    return size
+
+  def _empty(self):
+    return self._qsize() == 0
+
+  def _full(self):
+    return self.maxsize > 0 and self._qsize() == self.maxsize
+
+  def _put(self, update):
+    slot = update.priority
+
+    if slot in range(self.slotCount):
+      self.slots[slot].append(update)
+    else:
+      raise ValueError("Invalid priority slot")
+
+  def _get(self):
+    for slot in range(self.slotCount):
+      if len(self.slots[slot]) > 0:
+        return self.slots[slot].popleft()
+    return None

Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql	2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/sql/schema.sql	2008-11-24 22:44:41 UTC (rev 2876)
@@ -67,7 +67,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -93,7 +93,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -103,8 +103,8 @@
     label VARCHAR(1000),
     broker_id INT,
     system_id BYTEA,
-    broker_bank INT,
-    agent_bank INT
+    broker_bank BIGINT,
+    agent_bank BIGINT
 );
 CREATE UNIQUE INDEX agent_source_ids_unique ON agent (source_scope_id, source_object_id);
 
@@ -119,7 +119,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -145,14 +145,14 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
     stats_curr_id INT,
     stats_prev_id INT,
     link_id INT,
-    channel_id SMALLINT,
+    channel_id INT,
     durable BOOL,
     src VARCHAR(1000),
     dest VARCHAR(1000),
@@ -176,19 +176,19 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
     stats_curr_id INT,
     stats_prev_id INT,
     system_id INT,
-    port INT,
-    worker_threads SMALLINT,
-    max_conns SMALLINT,
-    conn_backlog SMALLINT,
-    staging_threshold INT,
-    mgmt_pub_interval SMALLINT,
+    port BIGINT,
+    worker_threads INT,
+    max_conns INT,
+    conn_backlog INT,
+    staging_threshold BIGINT,
+    mgmt_pub_interval INT,
     version VARCHAR(1000),
     data_dir VARCHAR(1000),
     registration_id INT
@@ -206,7 +206,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -237,7 +237,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -247,7 +247,7 @@
     cluster_name VARCHAR(1000),
     cluster_id VARCHAR(1000),
     published_ur_l VARCHAR(1000),
-    cluster_size SMALLINT,
+    cluster_size INT,
     status VARCHAR(1000),
     members VARCHAR(4000)
 );
@@ -264,7 +264,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -290,7 +290,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -327,7 +327,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -337,29 +337,29 @@
     submitter_id INT,
     accounting_group VARCHAR(1000),
     args VARCHAR(4000),
-    cluster_id INT,
+    cluster_id BIGINT,
     cmd VARCHAR(4000),
     concurrency_limits VARCHAR(4000),
     custom_group VARCHAR(1000),
     custom_id VARCHAR(1000),
-    custom_priority INT,
+    custom_priority BIGINT,
     global_job_id VARCHAR(1000),
     in_rsv VARCHAR(4000),
     iwd VARCHAR(4000),
-    job_status INT,
+    job_status BIGINT,
     note VARCHAR(4000),
     out VARCHAR(4000),
     owner VARCHAR(1000),
     grid_user VARCHAR(1000),
-    proc_id INT,
+    proc_id BIGINT,
     q_date TIMESTAMP,
-    job_universe INT,
+    job_universe BIGINT,
     title VARCHAR(1000),
     user_log VARCHAR(4000),
     hold_reason VARCHAR(4000),
     dag_node_name VARCHAR(1000),
     dag_parent_node_names VARCHAR(4000),
-    dag_man_job_id INT,
+    dag_man_job_id BIGINT,
     ad BYTEA
 );
 CREATE UNIQUE INDEX job_source_ids_unique ON job (source_scope_id, source_object_id);
@@ -375,7 +375,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -385,13 +385,13 @@
     name VARCHAR(1000),
     directory VARCHAR(1000),
     base_file_name VARCHAR(1000),
-    write_page_size INT,
-    write_pages INT,
-    read_page_size INT,
-    read_pages INT,
-    initial_file_count SMALLINT,
-    data_file_size INT,
-    current_file_count INT
+    write_page_size BIGINT,
+    write_pages BIGINT,
+    read_page_size BIGINT,
+    read_pages BIGINT,
+    initial_file_count INT,
+    data_file_size BIGINT,
+    current_file_count BIGINT
 );
 CREATE UNIQUE INDEX journal_source_ids_unique ON journal (source_scope_id, source_object_id);
 
@@ -434,7 +434,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -442,7 +442,7 @@
     stats_prev_id INT,
     vhost_id INT,
     host VARCHAR(1000),
-    port INT,
+    port BIGINT,
     transport VARCHAR(1000),
     durable BOOL
 );
@@ -461,7 +461,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -484,11 +484,11 @@
     id SERIAL PRIMARY KEY,
     rec_time TIMESTAMP,
     master_id INT,
-    monitor_self_age INT,
+    monitor_self_age BIGINT,
     monitor_self_cpu_usage FLOAT,
     monitor_self_image_size FLOAT,
-    monitor_self_registered_socket_count INT,
-    monitor_self_resident_set_size INT,
+    monitor_self_registered_socket_count BIGINT,
+    monitor_self_resident_set_size BIGINT,
     monitor_self_time TIMESTAMP
 );
 
@@ -497,7 +497,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -518,11 +518,11 @@
     id SERIAL PRIMARY KEY,
     rec_time TIMESTAMP,
     negotiator_id INT,
-    monitor_self_age INT,
+    monitor_self_age BIGINT,
     monitor_self_cpu_usage FLOAT,
     monitor_self_image_size FLOAT,
-    monitor_self_registered_socket_count INT,
-    monitor_self_resident_set_size INT,
+    monitor_self_registered_socket_count BIGINT,
+    monitor_self_resident_set_size BIGINT,
     monitor_self_time TIMESTAMP
 );
 
@@ -536,7 +536,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -589,7 +589,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -598,7 +598,7 @@
     pool VARCHAR(1000),
     system VARCHAR(1000),
     job_queue_birthdate TIMESTAMP,
-    max_jobs_running INT,
+    max_jobs_running BIGINT,
     machine VARCHAR(1000),
     my_address VARCHAR(1000),
     name VARCHAR(1000),
@@ -613,17 +613,17 @@
     id SERIAL PRIMARY KEY,
     rec_time TIMESTAMP,
     scheduler_id INT,
-    num_users INT,
-    total_held_jobs INT,
-    total_idle_jobs INT,
-    total_job_ads INT,
-    total_removed_jobs INT,
-    total_running_jobs INT,
-    monitor_self_age INT,
+    num_users BIGINT,
+    total_held_jobs BIGINT,
+    total_idle_jobs BIGINT,
+    total_job_ads BIGINT,
+    total_removed_jobs BIGINT,
+    total_running_jobs BIGINT,
+    monitor_self_age BIGINT,
     monitor_self_cpu_usage FLOAT,
     monitor_self_image_size FLOAT,
-    monitor_self_registered_socket_count INT,
-    monitor_self_resident_set_size INT,
+    monitor_self_registered_socket_count BIGINT,
+    monitor_self_resident_set_size BIGINT,
     monitor_self_time TIMESTAMP
 );
 
@@ -632,7 +632,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -640,9 +640,9 @@
     stats_prev_id INT,
     vhost_id INT,
     name VARCHAR(1000),
-    channel_id SMALLINT,
+    channel_id INT,
     client_connection_id INT,
-    detached_lifespan INT,
+    detached_lifespan BIGINT,
     attached BOOL,
     expire_time TIMESTAMP
 );
@@ -664,7 +664,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -677,20 +677,20 @@
     checkpoint_platform VARCHAR(1000),
     client_machine VARCHAR(1000),
     concurrency_limits VARCHAR(1000),
-    cpus INT,
+    cpus BIGINT,
     current_rank FLOAT,
-    disk INT,
+    disk BIGINT,
     file_system_domain VARCHAR(1000),
     global_job_id VARCHAR(1000),
-    image_size INT,
+    image_size BIGINT,
     is_valid_checkpoint_platform VARCHAR(4000),
     job_id VARCHAR(1000),
     job_start TIMESTAMP,
-    k_flops INT,
+    k_flops BIGINT,
     machine VARCHAR(1000),
     max_job_retirement_time VARCHAR(4000),
-    memory INT,
-    mips INT,
+    memory BIGINT,
+    mips BIGINT,
     my_address VARCHAR(1000),
     name VARCHAR(1000),
     op_sys VARCHAR(1000),
@@ -703,23 +703,23 @@
     requirements VARCHAR(4000),
     public_network_ip_addr VARCHAR(1000),
     rank VARCHAR(4000),
-    slot_id INT,
+    slot_id BIGINT,
     start VARCHAR(4000),
     starter_ability_list VARCHAR(4000),
-    total_claim_run_time INT,
-    total_claim_suspend_time INT,
-    total_cpus INT,
-    total_disk INT,
-    total_job_run_time INT,
-    total_job_suspend_time INT,
-    total_memory INT,
-    total_slots INT,
-    total_virtual_memory INT,
+    total_claim_run_time BIGINT,
+    total_claim_suspend_time BIGINT,
+    total_cpus BIGINT,
+    total_disk BIGINT,
+    total_job_run_time BIGINT,
+    total_job_suspend_time BIGINT,
+    total_memory BIGINT,
+    total_slots BIGINT,
+    total_virtual_memory BIGINT,
     uid_domain VARCHAR(1000),
-    virtual_memory INT,
-    windows_build_number INT,
-    windows_major_version INT,
-    windows_minor_version INT,
+    virtual_memory BIGINT,
+    windows_build_number BIGINT,
+    windows_major_version BIGINT,
+    windows_minor_version BIGINT,
     condor_platform VARCHAR(1000),
     condor_version VARCHAR(1000),
     daemon_start_time TIMESTAMP
@@ -731,13 +731,13 @@
     rec_time TIMESTAMP,
     slot_id INT,
     activity VARCHAR(1000),
-    clock_day INT,
-    clock_min INT,
+    clock_day BIGINT,
+    clock_min BIGINT,
     condor_load_avg FLOAT,
-    console_idle INT,
+    console_idle BIGINT,
     entered_current_activity TIMESTAMP,
     entered_current_state TIMESTAMP,
-    keyboard_idle INT,
+    keyboard_idle BIGINT,
     last_benchmark TIMESTAMP,
     last_fetch_work_completed TIMESTAMP,
     last_fetch_work_spawned TIMESTAMP,
@@ -746,27 +746,27 @@
     my_current_time TIMESTAMP,
     next_fetch_work_delay INT,
     state VARCHAR(1000),
-    time_to_live INT,
+    time_to_live BIGINT,
     total_condor_load_avg FLOAT,
     total_load_avg FLOAT,
-    total_time_backfill_busy INT,
-    total_time_backfill_idle INT,
-    total_time_backfill_killing INT,
-    total_time_claimed_busy INT,
-    total_time_claimed_idle INT,
-    total_time_claimed_retiring INT,
-    total_time_claimed_suspended INT,
-    total_time_matched_idle INT,
-    total_time_owner_idle INT,
-    total_time_preempting_killing INT,
-    total_time_preempting_vacating INT,
-    total_time_unclaimed_benchmarking INT,
-    total_time_unclaimed_idle INT,
-    monitor_self_age INT,
+    total_time_backfill_busy BIGINT,
+    total_time_backfill_idle BIGINT,
+    total_time_backfill_killing BIGINT,
+    total_time_claimed_busy BIGINT,
+    total_time_claimed_idle BIGINT,
+    total_time_claimed_retiring BIGINT,
+    total_time_claimed_suspended BIGINT,
+    total_time_matched_idle BIGINT,
+    total_time_owner_idle BIGINT,
+    total_time_preempting_killing BIGINT,
+    total_time_preempting_vacating BIGINT,
+    total_time_unclaimed_benchmarking BIGINT,
+    total_time_unclaimed_idle BIGINT,
+    monitor_self_age BIGINT,
     monitor_self_cpu_usage FLOAT,
     monitor_self_image_size FLOAT,
-    monitor_self_registered_socket_count INT,
-    monitor_self_resident_set_size INT,
+    monitor_self_registered_socket_count BIGINT,
+    monitor_self_resident_set_size BIGINT,
     monitor_self_time TIMESTAMP
 );
 
@@ -775,7 +775,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -783,15 +783,15 @@
     stats_prev_id INT,
     broker_id INT,
     location VARCHAR(1000),
-    default_initial_file_count SMALLINT,
-    default_data_file_size INT,
+    default_initial_file_count INT,
+    default_data_file_size BIGINT,
     tpl_is_initialized BOOL,
     tpl_directory VARCHAR(1000),
-    tpl_write_page_size INT,
-    tpl_write_pages INT,
-    tpl_initial_file_count SMALLINT,
-    tpl_data_file_size INT,
-    tpl_current_file_count INT
+    tpl_write_page_size BIGINT,
+    tpl_write_pages BIGINT,
+    tpl_initial_file_count INT,
+    tpl_data_file_size BIGINT,
+    tpl_current_file_count BIGINT
 );
 CREATE UNIQUE INDEX store_source_ids_unique ON store (source_scope_id, source_object_id);
 
@@ -815,7 +815,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -833,9 +833,9 @@
     id SERIAL PRIMARY KEY,
     rec_time TIMESTAMP,
     submitter_id INT,
-    held_jobs INT,
-    idle_jobs INT,
-    running_jobs INT
+    held_jobs BIGINT,
+    idle_jobs BIGINT,
+    running_jobs BIGINT
 );
 
 CREATE TABLE sysimage (
@@ -843,7 +843,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -855,8 +855,8 @@
     release VARCHAR(1000),
     version VARCHAR(1000),
     machine VARCHAR(1000),
-    mem_total INT,
-    swap_total INT
+    mem_total BIGINT,
+    swap_total BIGINT
 );
 CREATE UNIQUE INDEX sysimage_source_ids_unique ON sysimage (source_scope_id, source_object_id);
 
@@ -864,13 +864,13 @@
     id SERIAL PRIMARY KEY,
     rec_time TIMESTAMP,
     sysimage_id INT,
-    mem_free INT,
-    swap_free INT,
+    mem_free BIGINT,
+    swap_free BIGINT,
     load_average1_min FLOAT,
     load_average5_min FLOAT,
     load_average10_min FLOAT,
-    proc_total INT,
-    proc_running INT
+    proc_total BIGINT,
+    proc_running BIGINT
 );
 
 CREATE TABLE system (
@@ -878,7 +878,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),
@@ -904,7 +904,7 @@
     rec_time TIMESTAMP,
     source_scope_id BIGINT,
     source_object_id BIGINT,
-    qmf_class_key BYTEA,
+    qmf_class_key VARCHAR(1000),
     creation_time TIMESTAMP,
     deletion_time TIMESTAMP,
     managed_broker VARCHAR(1000),




More information about the rhmessaging-commits mailing list