rhmessaging commits: r2877 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-11-25 10:07:55 -0500 (Tue, 25 Nov 2008)
New Revision: 2877
Added:
mgmt/trunk/mint/python/mint/sql.py
Log:
Forgot to add this (important\!) file
Added: mgmt/trunk/mint/python/mint/sql.py
===================================================================
--- mgmt/trunk/mint/python/mint/sql.py (rev 0)
+++ mgmt/trunk/mint/python/mint/sql.py 2008-11-25 15:07:55 UTC (rev 2877)
@@ -0,0 +1,220 @@
+import logging, mint
+from time import clock
+from sqlobject import MixedCaseUnderscoreStyle
+
+log = logging.getLogger("mint.sql")
+
+dbStyle = MixedCaseUnderscoreStyle()
+profile = None
+
+def transformTable(table):
+ try:
+ table = mint.schema.schemaReservedWordsMap[table]
+ except KeyError:
+ pass
+
+ table = table[0] + table[1:] # XXX why is this necessary?
+ table = dbStyle.pythonClassToDBTable(table)
+
+ return table
+
+def transformColumn(column):
+ return dbStyle.pythonAttrToDBColumn(column)
+
+class SqlOperation(object):
+ def __init__(self, name):
+ self.name = name
+
+ self.time = None
+ self.text = None
+
+ if profile:
+ profile.ops.append(self)
+
+ def key(self):
+ if hasattr(self, "cls"):
+ return "%s(%s)" % (self.name, getattr(self, "cls").__name__)
+ else:
+ return self.name
+
+ def generate(self):
+ pass
+
+ def execute(self, cursor, values=None):
+ self.text = self.generate()
+
+ try:
+ if profile:
+ start = clock()
+ cursor.execute(self.text, values)
+ self.time = clock() - start
+ else:
+ cursor.execute(self.text, values)
+ except:
+ log.warn("Text: %s", self.text)
+
+ if values:
+ for item in values.items():
+ log.warn(" %-20s %r", *item)
+
+ raise
+
+class SqlGetId(SqlOperation):
+ def __init__(self, cls):
+ super(SqlGetId, self).__init__("get_id")
+
+ self.cls = cls
+
+ def generate(self):
+ table = self.cls.sqlmeta.table
+
+ return """
+ select id from %s
+ where source_scope_id = %%(sourceScopeId)s
+ and source_object_id = %%(sourceObjectId)s
+ """ % table
+
+class SqlSetStatsRefs(SqlOperation):
+ def __init__(self, cls):
+ super(SqlSetStatsRefs, self).__init__("set_stats_refs")
+
+ self.cls = cls
+
+ def generate(self):
+ table = self.cls.sqlmeta.table
+
+ return """
+ update %s
+ set stats_curr_id = %%(statsId)s, stats_prev_id = stats_curr_id
+ where id = %%(id)s
+ """ % table
+
+class SqlInsert(SqlOperation):
+ def __init__(self, cls, attrs):
+ super(SqlInsert, self).__init__("insert")
+
+ self.cls = cls
+ self.attrs = attrs
+
+ def generate(self):
+ table = self.cls.sqlmeta.table
+
+ cols = list()
+ vals = list()
+
+ for name in self.attrs:
+ cols.append(transformColumn(name))
+ vals.append("%%(%s)s" % name)
+
+ colsSql = ", ".join(cols)
+ valsSql = ", ".join(vals)
+
+ insert = "insert into %s (%s) values (%s)" % (table, colsSql, valsSql)
+ select = "select currval('%s_id_seq')" % table
+
+ sql = "%s; %s" % (insert, select)
+
+ return sql
+
+class SqlUpdate(SqlOperation):
+ def __init__(self, cls, attrs):
+ super(SqlUpdate, self).__init__("update")
+
+ self.cls = cls
+ self.attrs = attrs
+
+ def generate(self):
+ table = self.cls.sqlmeta.table
+
+ elems = list()
+
+ for name in self.attrs:
+ elems.append("%s = %%(%s)s" % (transformColumn(name), name))
+
+ elemsSql = ", ".join(elems)
+
+ sql = "update %s set %s where id = %%(id)s" % (table, elemsSql)
+
+ return sql
+
+class SqlGetBrokerRegistration(SqlOperation):
+ def __init__(self):
+ super(SqlGetBrokerRegistration, self).__init__("get_broker_reg")
+
+ def generate(self):
+ return """
+ select id
+ from broker_registration
+ where url = %(url)s
+ """
+
+class SqlAttachBroker(SqlOperation):
+ def __init__(self):
+ super(SqlAttachBroker, self).__init__("attach_broker")
+
+ def generate(self):
+ return """
+ update broker_registration
+ set broker_id = %(id)s
+ where id = %(registrationId)s;
+ update broker
+ set registration_id = %(registrationId)s
+ where id = %(id)s
+ """
+
+class SqlProfile(object):
+ def __init__(self):
+ self.ops = list()
+ self.commitTime = 0.0
+
+ def report(self):
+ timesByKey = dict()
+
+ executeTime = 0.0
+
+ for op in self.ops:
+ if op.time is not None:
+ executeTime += op.time
+
+ try:
+ times = timesByKey[op.key()]
+
+ if op.time is not None:
+ times.append(op.time)
+ except KeyError:
+ if op.time is not None:
+ timesByKey[op.key()] = list((op.time,))
+
+ fmt = "%-40s %9.2f %9.2f %6i"
+ records = list()
+
+ for key, values in timesByKey.items():
+ count = len(values)
+ ttime = sum(values) * 1000
+ atime = ttime / float(count)
+
+ records.append((key, ttime, atime, count))
+
+ print
+
+ srecords = sorted(records, key=lambda x: x[1], reverse=True)
+
+ for i, rec in enumerate(srecords):
+ print fmt % rec
+
+ if i >= 10:
+ break
+
+ print
+
+ srecords = sorted(records, key=lambda x: x[2], reverse=True)
+
+ for i, rec in enumerate(srecords):
+ print fmt % rec
+
+ if i >= 10:
+ break
+
+ print
+ print "Total statement execute time: %9.3f seconds" % executeTime
+ print "Total commit time: %9.3f seconds" % self.commitTime
16 years, 2 months
rhmessaging commits: r2876 - in mgmt/trunk: mint/bin and 2 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-11-24 17:44:41 -0500 (Mon, 24 Nov 2008)
New Revision: 2876
Removed:
mgmt/trunk/mint/python/mint/priqueue.py
Modified:
mgmt/trunk/cumin/python/cumin/__init__.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/bin/mint-bench
mgmt/trunk/mint/python/mint/Makefile
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/cache.py
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/python/mint/schemaparser.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/sql/schema.sql
Log:
Refactor of data layer for consolidated sql operations and reporting.
Also, some schema updates to handle large unsigned values and qmf
class keys as strings.
There is still an unsolved connection issue with this commit. When a
broker is added, you must restart the console for it to appear.
Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/cumin/python/cumin/__init__.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -38,13 +38,7 @@
self.add_resource_dir(os.path.join(self.home, "resources"))
self.model = CuminModel(self, self.config.data, self.config.spec)
- self.broker_connect_thread = BrokerConnectThread(self.model)
- def closeListener(*args):
- self.broker_connect_thread.prompt()
-
- self.model.data.setCloseListener(closeListener)
-
self.main_page = MainPage(self, "index.html")
self.add_page(self.main_page)
self.set_default_page(self.main_page)
@@ -90,50 +84,13 @@
def start(self):
self.model.start()
- self.broker_connect_thread.start()
+ for reg in BrokerRegistration.select():
+ reg.connect(self.model.data)
+
def stop(self):
self.model.stop()
-class BrokerConnectThread(Thread):
- log = logging.getLogger("cumin.mgmt.conn")
-
- def __init__(self, model):
- super(BrokerConnectThread, self).__init__()
-
- self.model = model
- self.setDaemon(True)
-
- self.event = Event()
-
- self.attempts = dict()
-
- def prompt(self):
- self.event.set()
- self.event.clear()
-
- def run(self):
- try:
- self.do_run()
- except Exception, e:
- log.exception(e)
-
- def do_run(self):
- while True:
- for reg in BrokerRegistration.select():
- if reg.broker is None or reg.getBrokerId() not in self.model.data.managedBrokers:
- attempts = self.attempts.get(reg, 0)
- attempts += 1
- self.attempts[reg] = attempts
-
- if attempts < 10:
- reg.connect(self.model.data)
- elif attempts < 100 and attempts % 10 == 0:
- reg.connect(self.model.data)
- elif attempts % 100 == 0:
- reg.connect(self.model.data)
- self.event.wait(10)
-
class CuminServer(WebServer):
def authorized(self, session):
auth = False
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -1727,7 +1727,7 @@
try:
object = self.cumin_class.mint_class(**args)
- self.model.app.broker_connect_thread.prompt()
+ object.connect(self.model.data)
completion("OK")
Modified: mgmt/trunk/mint/bin/mint-bench
===================================================================
--- mgmt/trunk/mint/bin/mint-bench 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/bin/mint-bench 2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,6 +1,6 @@
#!/usr/bin/python
-import sys, os, logging
+import sys, os, logging, mint.sql
from mint.tools import MintBenchTool
@@ -55,7 +55,9 @@
do_main()
if __name__ == "__main__":
+ mint.sql.profile = mint.sql.SqlProfile()
+
try:
main()
except KeyboardInterrupt:
- pass
+ mint.sql.profile.report()
Modified: mgmt/trunk/mint/python/mint/Makefile
===================================================================
--- mgmt/trunk/mint/python/mint/Makefile 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/Makefile 2008-11-24 22:44:41 UTC (rev 2876)
@@ -4,7 +4,7 @@
schema: schema.py
-schema.py: ../../xml/*.xml
- python schemaparser.py schema.py ${dsn} $^
+schema.py: schemaparser.py ../../xml/*.xml
+ python schemaparser.py schema.py ${dsn} ../../xml/*.xml
clean:
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -256,7 +256,9 @@
return None
def destroySelf(self):
- self.disconnect(MintModel.staticInstance)
+ if MintModel.staticInstance:
+ self.disconnect(MintModel.staticInstance)
+
super(BrokerRegistration, self).destroySelf()
class BrokerGroup(SQLObject):
@@ -292,7 +294,6 @@
brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
-
class MintModel(qmf.console.Console):
staticInstance = None
@@ -308,10 +309,14 @@
self.dbStyle = MixedCaseUnderscoreStyle()
self.dbConn = None
- # map containing updateObjects that have a missing parent dependency, for deferred insertion
- # (missing_class, missing_id.first, missing_id.second) -> [updateObject, ..., updateObject]
+ # map containing updateObjects that have a missing parent
+ # dependency, for deferred insertion (missing_class,
+ # missing_id.first, missing_id.second) -> [updateObject, ...,
+ # updateObject]
self.orphanObjectMap = dict()
+ self.orphans = dict()
+
self.updateThread = update.ModelUpdateThread(self)
self.mgmtSession = qmf.console.Session(self)
self.outstandingMethodCalls = dict()
@@ -350,83 +355,6 @@
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def __pythonValueToDB(self, key, value):
- if type(value) == types.DictType:
- value = u"'%s'" % pickle.dumps(value).replace("'", "''")
- elif value is None:
- value = "NULL"
- else:
- try:
- x = int(value)
- except Exception:
- value = u"'%s'" % str(value).replace("'", "''")
- return value
-
- def __generateSQLWhereClause(self, colValMap):
- whereClause = ""
- for (key, value) in colValMap.iteritems():
- value = self.__pythonValueToDB(key, value)
- whereClause += u"%s = %s and " % (self.dbStyle.pythonAttrToDBColumn(key), value)
- whereClause = whereClause[:-5]
- return whereClause
-
- def generateSQLSelect(self, table, id, resultCols="id"):
- return self.generateSQLSelectWhere(table, {"source_scope_id": id.first, "source_object_id": id.second}, resultCols)
-
- def generateSQLSelectWhere(self, table, where, resultCols="id"):
- whereClause = self.__generateSQLWhereClause(where)
- sql = "select %s from %s where %s" % (resultCols, table, whereClause)
- return sql
-
- def generateSQLInsert(self, table, colValMap, subQuery={}):
- columns = u""
- values = u""
- for (key, value) in colValMap.iteritems():
- value = self.__pythonValueToDB(key, value)
- columns += u"%s , " % (self.dbStyle.pythonAttrToDBColumn(key))
- values += u"%s , " % (value)
- for subCol, subVal in subQuery.iteritems():
- columns += u"%s , " % (subCol)
- values += u"(%s) , " % (subVal)
- columns = columns[:-3]
- values = values[:-3]
- sql = u"insert into %s (%s) values (%s)" % (table, columns, values)
- return sql
-
- def generateSQLUpdate(self, table, colValMap, id, updateStats=False):
- return self.generateSQLUpdateWhere(table, colValMap, \
- {"source_scope_id": id.first, "source_object_id": id.second}, updateStats)
-
- def generateSQLUpdateWhere(self, table, colValMap, where, updateStats=False):
- updateValues = u""
- for (key, value) in colValMap.iteritems():
- value = self.__pythonValueToDB(key, value)
- updateValues += u"%s = %s , " % (self.dbStyle.pythonAttrToDBColumn(key), value)
- if updateStats:
- updateValues += u"stats_prev_id = stats_curr_id , "
- updateValues = updateValues[:-3]
- whereClause = self.__generateSQLWhereClause(where)
- sql = u"update %s set %s where %s" % (table, updateValues, whereClause)
- return sql
-
- def addOrphanObject(self, cls, idFirst, idSecond, obj):
- # store object in orphan map, will be picked up later when parent info is received
- log.info("Referenced object %s '%d-%d' not found, deferring creation of orphan object" % (cls, idFirst, idSecond))
- if (cls, idFirst, idSecond) not in self.orphanObjectMap:
- self.orphanObjectMap[(cls, idFirst, idSecond)] = set()
- self.orphanObjectMap[(cls, idFirst, idSecond)].add(obj)
-
- def requeueIfOrphanParent(self, cls, idFirst, idSecond):
- orphans = 0
- if (cls, idFirst, idSecond) in self.orphanObjectMap:
- # this object is the parent of orphan objects in the map, re-enqueue for insertion
- orphanObjects = self.orphanObjectMap.pop((cls, idFirst, idSecond))
- for orphanObj in orphanObjects:
- self.updateThread.enqueue(orphanObj)
- orphans += 1
- log.info("Inserted %d orphan objects whose creation had been deferred" % (orphans))
- return orphans
-
def getSession(self):
return self.mgmtSession
@@ -452,6 +380,7 @@
self.lock()
try:
self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
+
finally:
self.unlock()
@@ -484,20 +413,24 @@
def objectProps(self, broker, record):
""" Invoked when an object is updated. """
+
+ up = update.PropertyUpdate(self, broker, record)
+
if record.getClassKey().getClassName() == "job":
- priority = 1
- else:
- priority = 0
- self.updateThread.enqueue(update.PropertyUpdate(broker, record), priority)
+ up.priority = 1
+ self.updateThread.enqueue(up)
+
def objectStats(self, broker, record):
""" Invoked when an object is updated. """
+
+ up = update.StatisticUpdate(self, broker, record)
+
if record.getClassKey().getClassName() == "job":
- priority = 1
- else:
- priority = 0
- self.updateThread.enqueue(update.StatisticUpdate(broker, record), priority)
+ up.priority = 1
+ self.updateThread.enqueue(up)
+
def event(self, broker, event):
""" Invoked when an event is raised. """
pass
Modified: mgmt/trunk/mint/python/mint/cache.py
===================================================================
--- mgmt/trunk/mint/python/mint/cache.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/cache.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,47 +1,37 @@
from threading import RLock
-class MintCache:
+class MintCache(object):
staticInstance = None
def __init__(self):
assert MintCache.staticInstance is None
MintCache.staticInstance = self
- self.__lock = RLock()
self.__cache = dict()
self.__pending = dict()
self.__dirty = False
def get(self, key, usePending=True):
result = None
- self.__lock.acquire()
+
if key in self.__cache:
result = self.__cache[key]
elif usePending and key in self.__pending:
result = self.__pending[key]
- self.__lock.release()
return result
def set(self, key, value):
- self.__lock.acquire()
self.__pending[key] = value
self.__dirty = True
- self.__lock.release()
def commit(self):
- self.__lock.acquire()
self.__cache.update(self.__pending)
self.__pending.clear()
self.__dirty = False
- self.__lock.release()
def rollback(self):
- self.__lock.acquire()
self.__pending.clear()
self.__dirty = False
- self.__lock.release()
def isDirty(self):
- self.__lock.acquire()
return self.__dirty
- self.__lock.release()
Deleted: mgmt/trunk/mint/python/mint/priqueue.py
===================================================================
--- mgmt/trunk/mint/python/mint/priqueue.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/priqueue.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,39 +0,0 @@
-from Queue import Queue
-from collections import deque
-
-class PriQueue(Queue):
- """ """
- def __init__(self, maxsize=0, slotCount=1):
- self.slotCount=slotCount
- Queue.__init__(self, maxsize)
-
- def _init(self, maxsize):
- self.maxsize = maxsize
- self.slots = []
- for i in range(self.slotCount):
- self.slots.append(deque())
-
- def _qsize(self):
- size = 0
- for i in range(self.slotCount):
- size += len(self.slots[i])
- return size
-
- def _empty(self):
- return self._qsize() == 0
-
- def _full(self):
- return self.maxsize > 0 and self._qsize() == self.maxsize
-
- def _put(self, item):
- slot = item[0]
- if slot in range(self.slotCount):
- self.slots[slot].append(item)
- else:
- raise ValueError("Invalid priority slot")
-
- def _get(self):
- for slot in range(self.slotCount):
- if len(self.slots[slot]) > 0:
- return self.slots[slot].popleft()
- return None
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/schema.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -17,7 +17,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -31,20 +31,20 @@
CheckpointPlatform = StringCol(length=1000, default=None)
ClientMachine = StringCol(length=1000, default=None)
ConcurrencyLimits = StringCol(length=1000, default=None)
- Cpus = IntCol(default=None)
+ Cpus = BigIntCol(default=None)
CurrentRank = FloatCol(default=None)
- Disk = IntCol(default=None)
+ Disk = BigIntCol(default=None)
FileSystemDomain = StringCol(length=1000, default=None)
GlobalJobId = StringCol(length=1000, default=None)
- ImageSize = IntCol(default=None)
+ ImageSize = BigIntCol(default=None)
IsValidCheckpointPlatform = StringCol(length=4000, default=None)
JobId = StringCol(length=1000, default=None)
JobStart = TimestampCol(default=None)
- KFlops = IntCol(default=None)
+ KFlops = BigIntCol(default=None)
Machine = StringCol(length=1000, default=None)
MaxJobRetirementTime = StringCol(length=4000, default=None)
- Memory = IntCol(default=None)
- Mips = IntCol(default=None)
+ Memory = BigIntCol(default=None)
+ Mips = BigIntCol(default=None)
MyAddress = StringCol(length=1000, default=None)
Name = StringCol(length=1000, default=None)
OpSys = StringCol(length=1000, default=None)
@@ -57,23 +57,23 @@
Requirements = StringCol(length=4000, default=None)
PublicNetworkIpAddr = StringCol(length=1000, default=None)
Rank = StringCol(length=4000, default=None)
- SlotID = IntCol(default=None)
+ SlotID = BigIntCol(default=None)
Start = StringCol(length=4000, default=None)
StarterAbilityList = StringCol(length=4000, default=None)
- TotalClaimRunTime = IntCol(default=None)
- TotalClaimSuspendTime = IntCol(default=None)
- TotalCpus = IntCol(default=None)
- TotalDisk = IntCol(default=None)
- TotalJobRunTime = IntCol(default=None)
- TotalJobSuspendTime = IntCol(default=None)
- TotalMemory = IntCol(default=None)
- TotalSlots = IntCol(default=None)
- TotalVirtualMemory = IntCol(default=None)
+ TotalClaimRunTime = BigIntCol(default=None)
+ TotalClaimSuspendTime = BigIntCol(default=None)
+ TotalCpus = BigIntCol(default=None)
+ TotalDisk = BigIntCol(default=None)
+ TotalJobRunTime = BigIntCol(default=None)
+ TotalJobSuspendTime = BigIntCol(default=None)
+ TotalMemory = BigIntCol(default=None)
+ TotalSlots = BigIntCol(default=None)
+ TotalVirtualMemory = BigIntCol(default=None)
UidDomain = StringCol(length=1000, default=None)
- VirtualMemory = IntCol(default=None)
- WindowsBuildNumber = IntCol(default=None)
- WindowsMajorVersion = IntCol(default=None)
- WindowsMinorVersion = IntCol(default=None)
+ VirtualMemory = BigIntCol(default=None)
+ WindowsBuildNumber = BigIntCol(default=None)
+ WindowsMajorVersion = BigIntCol(default=None)
+ WindowsMinorVersion = BigIntCol(default=None)
CondorPlatform = StringCol(length=1000, default=None)
CondorVersion = StringCol(length=1000, default=None)
@@ -88,13 +88,13 @@
slot = ForeignKey('Slot', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
Activity = StringCol(length=1000, default=None)
- ClockDay = IntCol(default=None)
- ClockMin = IntCol(default=None)
+ ClockDay = BigIntCol(default=None)
+ ClockMin = BigIntCol(default=None)
CondorLoadAvg = FloatCol(default=None)
- ConsoleIdle = IntCol(default=None)
+ ConsoleIdle = BigIntCol(default=None)
EnteredCurrentActivity = TimestampCol(default=None)
EnteredCurrentState = TimestampCol(default=None)
- KeyboardIdle = IntCol(default=None)
+ KeyboardIdle = BigIntCol(default=None)
LastBenchmark = TimestampCol(default=None)
LastFetchWorkCompleted = TimestampCol(default=None)
LastFetchWorkSpawned = TimestampCol(default=None)
@@ -103,28 +103,28 @@
MyCurrentTime = TimestampCol(default=None)
NextFetchWorkDelay = IntCol(default=None)
State = StringCol(length=1000, default=None)
- TimeToLive = IntCol(default=None)
+ TimeToLive = BigIntCol(default=None)
TotalCondorLoadAvg = FloatCol(default=None)
TotalLoadAvg = FloatCol(default=None)
- TotalTimeBackfillBusy = IntCol(default=None)
- TotalTimeBackfillIdle = IntCol(default=None)
- TotalTimeBackfillKilling = IntCol(default=None)
- TotalTimeClaimedBusy = IntCol(default=None)
- TotalTimeClaimedIdle = IntCol(default=None)
- TotalTimeClaimedRetiring = IntCol(default=None)
- TotalTimeClaimedSuspended = IntCol(default=None)
- TotalTimeMatchedIdle = IntCol(default=None)
- TotalTimeOwnerIdle = IntCol(default=None)
- TotalTimePreemptingKilling = IntCol(default=None)
- TotalTimePreemptingVacating = IntCol(default=None)
- TotalTimeUnclaimedBenchmarking = IntCol(default=None)
- TotalTimeUnclaimedIdle = IntCol(default=None)
+ TotalTimeBackfillBusy = BigIntCol(default=None)
+ TotalTimeBackfillIdle = BigIntCol(default=None)
+ TotalTimeBackfillKilling = BigIntCol(default=None)
+ TotalTimeClaimedBusy = BigIntCol(default=None)
+ TotalTimeClaimedIdle = BigIntCol(default=None)
+ TotalTimeClaimedRetiring = BigIntCol(default=None)
+ TotalTimeClaimedSuspended = BigIntCol(default=None)
+ TotalTimeMatchedIdle = BigIntCol(default=None)
+ TotalTimeOwnerIdle = BigIntCol(default=None)
+ TotalTimePreemptingKilling = BigIntCol(default=None)
+ TotalTimePreemptingVacating = BigIntCol(default=None)
+ TotalTimeUnclaimedBenchmarking = BigIntCol(default=None)
+ TotalTimeUnclaimedIdle = BigIntCol(default=None)
- MonitorSelfAge = IntCol(default=None)
+ MonitorSelfAge = BigIntCol(default=None)
MonitorSelfCPUUsage = FloatCol(default=None)
MonitorSelfImageSize = FloatCol(default=None)
- MonitorSelfRegisteredSocketCount = IntCol(default=None)
- MonitorSelfResidentSetSize = IntCol(default=None)
+ MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+ MonitorSelfResidentSetSize = BigIntCol(default=None)
MonitorSelfTime = TimestampCol(default=None)
@@ -138,7 +138,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -149,29 +149,29 @@
submitter = ForeignKey('Submitter', cascade='null', default=None)
AccountingGroup = StringCol(length=1000, default=None)
Args = StringCol(length=4000, default=None)
- ClusterId = IntCol(default=None)
+ ClusterId = BigIntCol(default=None)
Cmd = StringCol(length=4000, default=None)
ConcurrencyLimits = StringCol(length=4000, default=None)
CustomGroup = StringCol(length=1000, default=None)
CustomId = StringCol(length=1000, default=None)
- CustomPriority = IntCol(default=None)
+ CustomPriority = BigIntCol(default=None)
GlobalJobId = StringCol(length=1000, default=None)
InRsv = StringCol(length=4000, default=None)
Iwd = StringCol(length=4000, default=None)
- JobStatus = IntCol(default=None)
+ JobStatus = BigIntCol(default=None)
Note = StringCol(length=4000, default=None)
Out = StringCol(length=4000, default=None)
Owner = StringCol(length=1000, default=None)
GridUser = StringCol(length=1000, default=None)
- ProcId = IntCol(default=None)
+ ProcId = BigIntCol(default=None)
QDate = TimestampCol(default=None)
- JobUniverse = IntCol(default=None)
+ JobUniverse = BigIntCol(default=None)
Title = StringCol(length=1000, default=None)
UserLog = StringCol(length=4000, default=None)
HoldReason = StringCol(length=4000, default=None)
DAGNodeName = StringCol(length=1000, default=None)
DAGParentNodeNames = StringCol(length=4000, default=None)
- DAGManJobId = IntCol(default=None)
+ DAGManJobId = BigIntCol(default=None)
Ad = PickleCol(default=None)
@@ -250,7 +250,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -260,7 +260,7 @@
Pool = StringCol(length=1000, default=None)
System = StringCol(length=1000, default=None)
JobQueueBirthdate = TimestampCol(default=None)
- MaxJobsRunning = IntCol(default=None)
+ MaxJobsRunning = BigIntCol(default=None)
Machine = StringCol(length=1000, default=None)
MyAddress = StringCol(length=1000, default=None)
Name = StringCol(length=1000, default=None)
@@ -278,18 +278,18 @@
recTime = TimestampCol(default=None)
scheduler = ForeignKey('Scheduler', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
- NumUsers = IntCol(default=None)
- TotalHeldJobs = IntCol(default=None)
- TotalIdleJobs = IntCol(default=None)
- TotalJobAds = IntCol(default=None)
- TotalRemovedJobs = IntCol(default=None)
- TotalRunningJobs = IntCol(default=None)
+ NumUsers = BigIntCol(default=None)
+ TotalHeldJobs = BigIntCol(default=None)
+ TotalIdleJobs = BigIntCol(default=None)
+ TotalJobAds = BigIntCol(default=None)
+ TotalRemovedJobs = BigIntCol(default=None)
+ TotalRunningJobs = BigIntCol(default=None)
- MonitorSelfAge = IntCol(default=None)
+ MonitorSelfAge = BigIntCol(default=None)
MonitorSelfCPUUsage = FloatCol(default=None)
MonitorSelfImageSize = FloatCol(default=None)
- MonitorSelfRegisteredSocketCount = IntCol(default=None)
- MonitorSelfResidentSetSize = IntCol(default=None)
+ MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+ MonitorSelfResidentSetSize = BigIntCol(default=None)
MonitorSelfTime = TimestampCol(default=None)
@@ -303,7 +303,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -324,9 +324,9 @@
recTime = TimestampCol(default=None)
submitter = ForeignKey('Submitter', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
- HeldJobs = IntCol(default=None)
- IdleJobs = IntCol(default=None)
- RunningJobs = IntCol(default=None)
+ HeldJobs = BigIntCol(default=None)
+ IdleJobs = BigIntCol(default=None)
+ RunningJobs = BigIntCol(default=None)
@@ -339,7 +339,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -389,11 +389,11 @@
negotiator = ForeignKey('Negotiator', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
- MonitorSelfAge = IntCol(default=None)
+ MonitorSelfAge = BigIntCol(default=None)
MonitorSelfCPUUsage = FloatCol(default=None)
MonitorSelfImageSize = FloatCol(default=None)
- MonitorSelfRegisteredSocketCount = IntCol(default=None)
- MonitorSelfResidentSetSize = IntCol(default=None)
+ MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+ MonitorSelfResidentSetSize = BigIntCol(default=None)
MonitorSelfTime = TimestampCol(default=None)
@@ -407,7 +407,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -441,7 +441,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -485,11 +485,11 @@
master = ForeignKey('Master', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
- MonitorSelfAge = IntCol(default=None)
+ MonitorSelfAge = BigIntCol(default=None)
MonitorSelfCPUUsage = FloatCol(default=None)
MonitorSelfImageSize = FloatCol(default=None)
- MonitorSelfRegisteredSocketCount = IntCol(default=None)
- MonitorSelfResidentSetSize = IntCol(default=None)
+ MonitorSelfRegisteredSocketCount = BigIntCol(default=None)
+ MonitorSelfResidentSetSize = BigIntCol(default=None)
MonitorSelfTime = TimestampCol(default=None)
@@ -503,7 +503,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -544,7 +544,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -555,7 +555,7 @@
clusterName = StringCol(length=1000, default=None)
clusterID = StringCol(length=1000, default=None)
publishedURL = StringCol(length=1000, default=None)
- clusterSize = SmallIntCol(default=None)
+ clusterSize = IntCol(default=None)
status = StringCol(length=1000, default=None)
members = StringCol(length=4000, default=None)
@@ -591,7 +591,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -600,15 +600,15 @@
classInfos = dict() # brokerId => classInfo
broker = ForeignKey('Broker', cascade='null', default=None)
location = StringCol(length=1000, default=None)
- defaultInitialFileCount = SmallIntCol(default=None)
- defaultDataFileSize = IntCol(default=None)
+ defaultInitialFileCount = IntCol(default=None)
+ defaultDataFileSize = BigIntCol(default=None)
tplIsInitialized = BoolCol(default=None)
tplDirectory = StringCol(length=1000, default=None)
- tplWritePageSize = IntCol(default=None)
- tplWritePages = IntCol(default=None)
- tplInitialFileCount = SmallIntCol(default=None)
- tplDataFileSize = IntCol(default=None)
- tplCurrentFileCount = IntCol(default=None)
+ tplWritePageSize = BigIntCol(default=None)
+ tplWritePages = BigIntCol(default=None)
+ tplInitialFileCount = IntCol(default=None)
+ tplDataFileSize = BigIntCol(default=None)
+ tplCurrentFileCount = BigIntCol(default=None)
class StoreStats(SQLObject):
@@ -639,7 +639,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -650,13 +650,13 @@
name = StringCol(length=1000, default=None)
directory = StringCol(length=1000, default=None)
baseFileName = StringCol(length=1000, default=None)
- writePageSize = IntCol(default=None)
- writePages = IntCol(default=None)
- readPageSize = IntCol(default=None)
- readPages = IntCol(default=None)
- initialFileCount = SmallIntCol(default=None)
- dataFileSize = IntCol(default=None)
- currentFileCount = IntCol(default=None)
+ writePageSize = BigIntCol(default=None)
+ writePages = BigIntCol(default=None)
+ readPageSize = BigIntCol(default=None)
+ readPages = BigIntCol(default=None)
+ initialFileCount = IntCol(default=None)
+ dataFileSize = BigIntCol(default=None)
+ currentFileCount = BigIntCol(default=None)
def expand(self, model, callback, by):
@@ -715,7 +715,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -749,7 +749,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -757,12 +757,12 @@
statsPrev = ForeignKey('BrokerStats', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
system = ForeignKey('System', cascade='null', default=None)
- port = IntCol(default=None)
- workerThreads = SmallIntCol(default=None)
- maxConns = SmallIntCol(default=None)
- connBacklog = SmallIntCol(default=None)
- stagingThreshold = IntCol(default=None)
- mgmtPubInterval = SmallIntCol(default=None)
+ port = BigIntCol(default=None)
+ workerThreads = IntCol(default=None)
+ maxConns = IntCol(default=None)
+ connBacklog = IntCol(default=None)
+ stagingThreshold = BigIntCol(default=None)
+ mgmtPubInterval = IntCol(default=None)
version = StringCol(length=1000, default=None)
dataDir = StringCol(length=1000, default=None)
@@ -831,7 +831,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -842,8 +842,8 @@
label = StringCol(length=1000, default=None)
broker = ForeignKey('Broker', cascade='null', default=None)
systemId = BLOBCol(default=None)
- brokerBank = IntCol(default=None)
- agentBank = IntCol(default=None)
+ brokerBank = BigIntCol(default=None)
+ agentBank = BigIntCol(default=None)
class AgentStats(SQLObject):
@@ -865,7 +865,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -896,7 +896,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -966,7 +966,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1011,7 +1011,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1045,7 +1045,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1090,7 +1090,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1099,7 +1099,7 @@
classInfos = dict() # brokerId => classInfo
vhost = ForeignKey('Vhost', cascade='null', default=None)
host = StringCol(length=1000, default=None)
- port = IntCol(default=None)
+ port = BigIntCol(default=None)
transport = StringCol(length=1000, default=None)
durable = BoolCol(default=None)
@@ -1156,7 +1156,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1164,7 +1164,7 @@
statsPrev = ForeignKey('BridgeStats', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
link = ForeignKey('Link', cascade='null', default=None)
- channelId = SmallIntCol(default=None)
+ channelId = IntCol(default=None)
durable = BoolCol(default=None)
src = StringCol(length=1000, default=None)
dest = StringCol(length=1000, default=None)
@@ -1201,7 +1201,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1210,9 +1210,9 @@
classInfos = dict() # brokerId => classInfo
vhost = ForeignKey('Vhost', cascade='null', default=None)
name = StringCol(length=1000, default=None)
- channelId = SmallIntCol(default=None)
+ channelId = IntCol(default=None)
clientConnection = ForeignKey('ClientConnection', cascade='null', default=None)
- detachedLifespan = IntCol(default=None)
+ detachedLifespan = BigIntCol(default=None)
attached = BoolCol(default=None)
expireTime = TimestampCol(default=None)
@@ -1265,7 +1265,7 @@
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
source_ids_unique = DatabaseIndex(sourceScopeId, sourceObjectId, unique=True)
- qmfClassKey = BLOBCol(default=None)
+ qmfClassKey = StringCol(length=1000, default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1278,8 +1278,8 @@
release = StringCol(length=1000, default=None)
version = StringCol(length=1000, default=None)
machine = StringCol(length=1000, default=None)
- memTotal = IntCol(default=None)
- swapTotal = IntCol(default=None)
+ memTotal = BigIntCol(default=None)
+ swapTotal = BigIntCol(default=None)
class SysimageStats(SQLObject):
@@ -1289,13 +1289,13 @@
recTime = TimestampCol(default=None)
sysimage = ForeignKey('Sysimage', cascade='null', default=None)
classInfos = dict() # brokerId => classInfo
- memFree = IntCol(default=None)
- swapFree = IntCol(default=None)
+ memFree = BigIntCol(default=None)
+ swapFree = BigIntCol(default=None)
loadAverage1Min = FloatCol(default=None)
loadAverage5Min = FloatCol(default=None)
loadAverage10Min = FloatCol(default=None)
- procTotal = IntCol(default=None)
- procRunning = IntCol(default=None)
+ procTotal = BigIntCol(default=None)
+ procRunning = BigIntCol(default=None)
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -21,8 +21,10 @@
self.dataTypesMap["uuid"] = "BLOBCol"
self.dataTypesMap["int32"] = "IntCol"
self.dataTypesMap["uint8"] = self.dataTypesMap["hilo8"] = self.dataTypesMap["count8"] = self.dataTypesMap["mma8"] = "SmallIntCol"
- self.dataTypesMap["uint16"] = self.dataTypesMap["hilo16"] = self.dataTypesMap["count16"] = self.dataTypesMap["mma16"] = "SmallIntCol"
- self.dataTypesMap["uint32"] = self.dataTypesMap["hilo32"] = self.dataTypesMap["count32"] = self.dataTypesMap["mma32"] = self.dataTypesMap["atomic32"] = "IntCol"
+ self.dataTypesMap["hilo16"] = self.dataTypesMap["count16"] = self.dataTypesMap["mma16"] = "SmallIntCol"
+ self.dataTypesMap["uint16"] = "IntCol"
+ self.dataTypesMap["hilo32"] = self.dataTypesMap["count32"] = self.dataTypesMap["mma32"] = self.dataTypesMap["atomic32"] = "IntCol"
+ self.dataTypesMap["uint32"] = "BigIntCol"
self.dataTypesMap["uint64"] = self.dataTypesMap["hilo64"] = self.dataTypesMap["count64"] = self.dataTypesMap["mma64"] = self.dataTypesMap["mmaTime"] = "BigIntCol"
self.dataTypesMap["float"] = self.dataTypesMap["double"] = "FloatCol"
self.dataTypesMap["absTime"] = "TimestampCol"
@@ -144,7 +146,7 @@
self.generateAttrib("sourceScopeId", "BigIntCol")
self.generateAttrib("sourceObjectId", "BigIntCol")
self.generateSourceIdsIndex(pythonName)
- self.generateAttrib("qmfClassKey", "BLOBCol")
+ self.generateAttrib("qmfClassKey", "StringCol", "length=1000")
self.generateTimestampAttrib("creation")
self.generateTimestampAttrib("deletion")
self.generateAttrib("managedBroker", "StringCol", "length=1000")
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/python/mint/update.py 2008-11-24 22:44:41 UTC (rev 2876)
@@ -1,49 +1,48 @@
+import sys
import logging
import datetime
import types
-from Queue import Full, Empty
+import pickle
+import psycopg2
+from Queue import Queue as ConcurrentQueue, Full, Empty
from threading import Thread
from traceback import print_exc
from qpid.datatypes import UUID
from mint.schema import *
-from mint.priqueue import PriQueue as ConcurrentQueue
+from time import clock
+from sql import *
+from collections import deque
log = logging.getLogger("mint.update")
-def time_unwarp(t):
- """ don't allow future dates """
-
- tnow = datetime.now()
- return t > tnow and tnow or t
-
class ModelUpdateThread(Thread):
def __init__(self, model):
super(ModelUpdateThread, self).__init__()
self.model = model
- self.updates = ConcurrentQueue(slotCount=2)
+ self.updates = UpdateQueue(slotCount=2)
self.stopRequested = False
- self.setDaemon(False)
self.enqueueCount = 0
self.dequeueCount = 0
-
self.commitThreshold = 100
- def enqueue(self, update, priority=0):
+ self.setDaemon(False)
+
+ def enqueue(self, update):
try:
- self.updates.put((priority, update))
+ self.updates.put(update)
self.enqueueCount += 1
except Full:
log.exception("Queue is full")
- pass
def run(self):
conn = self.model.dbConn.getConnection()
+
while True:
try:
- priority, update = self.updates.get(True, 1)
+ update = self.updates.get(True, 1)
self.dequeueCount += 1
@@ -57,225 +56,331 @@
continue
try:
- update.process(self.model, conn)
- if self.dequeueCount % self.commitThreshold == 0 or self.updates.qsize() == 0:
- # commit only every "commitThreshold" updates, or whenever the update queue is empty
- conn.commit()
- self.model.cache.commit()
+ update.process(conn)
+
+ if self.dequeueCount % self.commitThreshold == 0 \
+ or self.updates.qsize() == 0:
+ # commit only every "commitThreshold" updates, or whenever
+ # the update queue is empty
+
+ if profile:
+ start = clock()
+ conn.commit()
+ self.model.cache.commit()
+ profile.commitTime += clock() - start
+ else:
+ conn.commit()
+ self.model.cache.commit()
except:
conn.rollback()
self.model.cache.rollback()
log.exception("Update failed")
- pass
def stop(self):
self.stopRequested = True
+class ReferenceException(Exception):
+ def __init__(self, sought):
+ self.sought = sought
+
+ def __str__(self):
+ return repr(self.sought)
+
class ModelUpdate(object):
- def __init__(self, broker, obj):
+ def __init__(self, model, broker, object):
+ self.model = model
self.broker = broker
- self.qmfObj = obj
+ self.object = object
+ self.priority = 0
- def getStatsClass(self, cls):
- return cls + "Stats"
-
- def process(self, model, conn):
+ def getClass(self):
+ # XXX this is unfortunate
+ name = self.object.getClassKey().getClassName()
+ name = mint.schema.schemaReservedWordsMap.get(name, name)
+ name = name[0].upper() + name[1:]
+
+ try:
+ return schemaNameToClassMap[name]
+ except KeyError:
+ raise ReferenceException(name)
+
+ def process(self, conn):
pass
- def processAttributes(self, attrs, cls, model, conn):
- results = {}
- orphan = False
+ def processAttributes(self, attrs, cls):
+ results = dict()
- for (key, value) in attrs:
+ for key, value in attrs:
name = key.__repr__()
- if name in mint.schema.schemaReservedWordsMap:
- name = mint.schema.schemaReservedWordsMap.get(name)
+ name = mint.schema.schemaReservedWordsMap.get(name, name)
if key.type == 10:
- # Navigate to referenced objects
- if name.endswith("Ref"):
- name = name[:-3]
- className = name[0].upper() + name[1:]
- otherClass = getattr(mint, className, None)
- if otherClass:
- foreignKey = name + "_id"
- valueFirst = value.first
- valueSecond = value.second
- cachedId = model.cache.get((valueFirst, valueSecond))
- if cachedId != None:
- results[foreignKey] = cachedId
- else:
- model.addOrphanObject(className, valueFirst, valueSecond, self)
- orphan = True
- else:
- log.error("Class '%s' not found" % className)
+ self.processReference(name, value, results)
elif key.type == 8:
- # convert ABSTIME types
- if value:
- results[name] = time_unwarp(datetime.fromtimestamp(value/1000000000))
- else:
- results[name] = None
+ self.processTimestamp(name, value, results)
elif key.type == 14:
- # convert UUIDs into their string representation, to be handled by sqlobject
+ # Convert UUIDs into their string representation, to be
+ # handled by sqlobject
results[name] = str(value)
- elif not hasattr(getattr(mint, cls), name):
+ elif key.type == 15:
+ results[name] = pickle.dumps(value)
+ elif not hasattr(cls, name):
# Discard attrs that we don't have in our schema
- log.debug("Class '%s' has no field '%s'" % (cls, name))
+ log.debug("%s has no field '%s'" % (cls, name))
else:
results[name] = value
- if orphan:
- return None
- else:
- return results
+ return results
+
+ # XXX this needs to be a much more straightforward procedure
+ def processReference(self, name, oid, results):
+ if name.endswith("Ref"):
+ name = name[:-3]
+
+ className = name[0].upper() + name[1:]
+ otherClass = getattr(mint, className)
+
+ foreignKey = name + "_id"
+
+ id = self.model.cache.get(oid)
+
+ if id is None:
+ raise ReferenceException(oid)
+
+ results[foreignKey] = id
+
+ def processTimestamp(self, name, tstamp, results):
+ if tstamp:
+ tnow = datetime.now()
+ t = datetime.fromtimestamp(tstamp / 1000000000)
+ t = t > tnow and tnow or t
+
+ results[name] = t
+
class PropertyUpdate(ModelUpdate):
- def __init__(self, broker, obj):
- ModelUpdate.__init__(self, broker, obj)
+ def process(self, conn):
+ try:
+ cls = self.getClass()
+ except ReferenceException, e:
+ log.info("Referenced class %r not found", e.sought)
+ return
- def process(self, model, conn):
- clsKey = self.qmfObj.getClassKey()
- origCls = cls = clsKey.getClassName()
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = cls[0].upper()+cls[1:]
- sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+ oid = self.object.getObjectId()
- if not hasattr(mint, cls):
- # Discard classes that we don't have in our schema
- log.debug("Class '%s' is not in the schema" % (cls))
- return
+ try:
+ attrs = self.processAttributes(self.object.getProperties(), cls)
+ except ReferenceException, e:
+ log.info("Referenced object %r not found", e.sought)
- properties = self.qmfObj.getProperties()
- timestamps = self.qmfObj.getTimestamps()
- id = self.qmfObj.getObjectId()
- idFirst = id.first
- idSecond = id.second
+ try:
+ orphans = self.model.orphans[oid]
+ orphans.append(self)
+ except KeyError:
+ self.model.orphans[oid] = list((self,))
- attrs = self.processAttributes(properties, cls, model, conn)
- if attrs == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ return
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
- attrs["creationTime"] = time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
+ timestamps = self.object.getTimestamps()
+
+ self.processTimestamp("recTime", timestamps[0], attrs)
+ self.processTimestamp("creationTime", timestamps[1], attrs)
+
if timestamps[2] != 0:
- attrs["deletionTime"] = time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
- log.debug("%s(%s) marked deleted", cls, id)
+ self.processTimestamp("deletionTime", timestamps[2], attrs)
- attrs["sourceScopeId"] = idFirst
- attrs["sourceObjectId"] = idSecond
- attrs["qmfClassKey"] = clsKey
+ log.debug("%s(%s) marked deleted", cls.__name__, oid)
+
+ attrs["sourceScopeId"] = oid.first
+ attrs["sourceObjectId"] = oid.second
+ attrs["qmfClassKey"] = str(self.object.getClassKey())
attrs["managedBroker"] = str(self.broker.getBrokerId())
cursor = conn.cursor()
- sql = model.generateSQLUpdate(sqlCls, attrs, id)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- isUpdate = cursor.rowcount > 0
- if not isUpdate:
- # update failed, need to insert
- sql = model.generateSQLInsert(sqlCls, attrs)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- log.debug("%s(%s) created", cls, id)
-
- dbId = model.cache.get((idFirst, idSecond))
- if dbId is None:
- if isUpdate:
- sql = model.generateSQLSelect(sqlCls, id)
- else:
- sql = "select currval('%s_id_seq')" % (sqlCls)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
+
+ # Cases:
+ #
+ # 1. Object is utterly new to mint
+ # 2. Object is in mint's db, but id is not yet cached
+ # 3. Object is in mint's db, and id is cached
+
+ id = self.model.cache.get(oid)
+
+ if id is None:
+ # Case 1 or 2
+
+ op = SqlGetId(cls)
+ op.execute(cursor, attrs)
+
rec = cursor.fetchone()
- dbId = rec[0]
- model.cache.set((idFirst, idSecond), dbId)
- if cls == "Broker" and str(self.broker.getBrokerId()) in model.managedBrokers:
- broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
- if dbObjId == 0:
- sql = model.generateSQLUpdateWhere("broker_registration", \
- {"broker_id": dbId}, {"url": self.broker.getFullUrl()})
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
+ if rec:
+ id = rec[0]
- sql = """
- update broker
- set registration_id =
- (select id from broker_registration where broker_id = %(id)s)
- where id = %(id)s
- """
+ if id is None:
+ # Case 1
- cursor.execute(sql, {"id": dbId})
+ op = SqlInsert(cls, attrs)
+ op.execute(cursor, attrs)
- model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbId)
+ id = cursor.fetchone()[0]
- model.requeueIfOrphanParent(cls, idFirst, idSecond)
+ log.debug("%s(%i) created", cls.__name__, id)
+ if cls is Broker:
+ self.processBroker(cursor, id)
+ else:
+ # Case 2
+
+ attrs["id"] = id
+
+ op = SqlUpdate(cls, attrs)
+ op.execute(cursor, attrs)
+
+ assert cursor.rowcount == 1
+
+ self.model.cache.set(oid, id)
+ else:
+ # Case 3
+
+ attrs["id"] = id
+
+ op = SqlUpdate(cls, attrs)
+ op.execute(cursor, attrs)
+
+ assert cursor.rowcount == 1
+
+ try:
+ orphans = self.model.orphans.pop(oid)
+
+ if orphans:
+ log.info("Re-enqueueing %i orphans whose creation had been deferred",
+ len(orphans))
+
+ for orphan in orphans:
+ self.model.processThread.enqueue(orphan)
+ except KeyError:
+ pass
+
+ def processBroker(self, cursor, id):
+ brokerId = str(self.broker.getBrokerId())
+
+ if brokerId in self.model.managedBrokers:
+ broker, dbObjId = self.model.managedBrokers[brokerId]
+
+ #print "broker, dbObjId", broker, dbObjId
+
+ if dbObjId == 0:
+ op = SqlGetBrokerRegistration()
+ op.execute(cursor, {"url": self.broker.getFullUrl()})
+
+ #print op.text % {"url": self.broker.getFullUrl()}
+
+ rec = cursor.fetchone()
+
+ if rec:
+ rid = rec[0]
+
+ op = SqlAttachBroker()
+ op.execute(cursor, {"id": id, "registrationId": rid})
+
+ #print op.text % {"id": id, "registrationId": rid}
+
+ self.model.managedBrokers[brokerId] = (broker, id)
+
+
+
class StatisticUpdate(ModelUpdate):
- def __init__(self, broker, obj):
- ModelUpdate.__init__(self, broker, obj)
+ def process(self, conn):
+ try:
+ cls = self.getClass()
+ except ReferenceException, e:
+ log.info("Referenced class %r not found", e.sought)
+ return
- def process(self, model, conn):
- origCls = cls = self.qmfObj.getClassKey().getClassName()
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = cls[0].upper()+cls[1:]
- sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+ statsCls = getattr(mint, "%sStats" % cls.__name__)
- if not hasattr(mint, cls):
- # Discard classes that we don't have in our schema
- log.debug("Class '%s' is not in the schema" % (cls))
- return
+ oid = self.object.getObjectId()
+ id = self.model.cache.get(oid)
- statistics = self.qmfObj.getStatistics()
- timestamps = self.qmfObj.getTimestamps()
- id = self.qmfObj.getObjectId()
- idFirst = id.first
- idSecond = id.second
+ if id is None:
+ # Just drop it; we'll get more stats later
+ return
- statsCls = self.getStatsClass(cls)
- sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
+ try:
+ attrs = self.processAttributes(self.object.getStatistics(), statsCls)
+ except ReferenceException:
+ # Drop it
+ return
- attrs = self.processAttributes(statistics, statsCls, model, conn)
- if attrs == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ timestamps = self.object.getTimestamps()
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
- cachedId = model.cache.get((idFirst, idSecond))
- if cachedId != None:
- attrs[sqlCls + "_id"] = cachedId
- else:
- model.addOrphanObject(cls, idFirst, idSecond, self)
-
+ self.processTimestamp("recTime", timestamps[0], attrs)
+
+ attrs["%s_id" % cls.sqlmeta.table] = id
+
cursor = conn.cursor()
- sql = model.generateSQLInsert(sqlStatsCls, attrs)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- sql = "select currval('%s_id_seq')" % (sqlStatsCls)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
- rec = cursor.fetchone()
- dbStatsId = rec[0]
- log.debug("%s(%s) created", statsCls, id)
+ op = SqlInsert(statsCls, attrs)
+ op.execute(cursor, attrs)
- sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id, updateStats=True)
- #log.debug("SQL: %s", sql)
- cursor.execute(sql)
+ statsId = cursor.fetchone()[0]
+ log.debug("%s(%s) created", statsCls.__name__, id)
+
+ op = SqlSetStatsRefs(cls)
+ op.execute(cursor, {"statsId": statsId, "id": id})
+
class MethodUpdate(ModelUpdate):
def __init__(self, broker, seq, response):
- ModelUpdate.__init__(self, broker, response)
+ super(MethodUpdate, self).__init__(broker, response)
+
self.seq = seq
- def process(self, model, conn):
- model.lock()
+ def process(self, conn):
+ self.model.lock()
+
try:
- methodCallback = model.outstandingMethodCalls.pop(self.seq)
- methodCallback(self.qmfObj.text, self.qmfObj.outArgs)
+ methodCallback = self.model.outstandingMethodCalls.pop(self.seq)
+ methodCallback(self.object.text, self.object.outArgs)
finally:
- model.unlock()
+ self.model.unlock()
+
+class UpdateQueue(ConcurrentQueue):
+ def __init__(self, maxsize=0, slotCount=1):
+ self.slotCount = slotCount
+ ConcurrentQueue.__init__(self, maxsize)
+
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.slots = []
+
+ for i in range(self.slotCount):
+ self.slots.append(deque())
+
+ def _qsize(self):
+ size = 0
+
+ for i in range(self.slotCount):
+ size += len(self.slots[i])
+
+ return size
+
+ def _empty(self):
+ return self._qsize() == 0
+
+ def _full(self):
+ return self.maxsize > 0 and self._qsize() == self.maxsize
+
+ def _put(self, update):
+ slot = update.priority
+
+ if slot in range(self.slotCount):
+ self.slots[slot].append(update)
+ else:
+ raise ValueError("Invalid priority slot")
+
+ def _get(self):
+ for slot in range(self.slotCount):
+ if len(self.slots[slot]) > 0:
+ return self.slots[slot].popleft()
+ return None
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2008-11-24 19:41:05 UTC (rev 2875)
+++ mgmt/trunk/mint/sql/schema.sql 2008-11-24 22:44:41 UTC (rev 2876)
@@ -67,7 +67,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -93,7 +93,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -103,8 +103,8 @@
label VARCHAR(1000),
broker_id INT,
system_id BYTEA,
- broker_bank INT,
- agent_bank INT
+ broker_bank BIGINT,
+ agent_bank BIGINT
);
CREATE UNIQUE INDEX agent_source_ids_unique ON agent (source_scope_id, source_object_id);
@@ -119,7 +119,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -145,14 +145,14 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
stats_curr_id INT,
stats_prev_id INT,
link_id INT,
- channel_id SMALLINT,
+ channel_id INT,
durable BOOL,
src VARCHAR(1000),
dest VARCHAR(1000),
@@ -176,19 +176,19 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
stats_curr_id INT,
stats_prev_id INT,
system_id INT,
- port INT,
- worker_threads SMALLINT,
- max_conns SMALLINT,
- conn_backlog SMALLINT,
- staging_threshold INT,
- mgmt_pub_interval SMALLINT,
+ port BIGINT,
+ worker_threads INT,
+ max_conns INT,
+ conn_backlog INT,
+ staging_threshold BIGINT,
+ mgmt_pub_interval INT,
version VARCHAR(1000),
data_dir VARCHAR(1000),
registration_id INT
@@ -206,7 +206,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -237,7 +237,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -247,7 +247,7 @@
cluster_name VARCHAR(1000),
cluster_id VARCHAR(1000),
published_ur_l VARCHAR(1000),
- cluster_size SMALLINT,
+ cluster_size INT,
status VARCHAR(1000),
members VARCHAR(4000)
);
@@ -264,7 +264,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -290,7 +290,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -327,7 +327,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -337,29 +337,29 @@
submitter_id INT,
accounting_group VARCHAR(1000),
args VARCHAR(4000),
- cluster_id INT,
+ cluster_id BIGINT,
cmd VARCHAR(4000),
concurrency_limits VARCHAR(4000),
custom_group VARCHAR(1000),
custom_id VARCHAR(1000),
- custom_priority INT,
+ custom_priority BIGINT,
global_job_id VARCHAR(1000),
in_rsv VARCHAR(4000),
iwd VARCHAR(4000),
- job_status INT,
+ job_status BIGINT,
note VARCHAR(4000),
out VARCHAR(4000),
owner VARCHAR(1000),
grid_user VARCHAR(1000),
- proc_id INT,
+ proc_id BIGINT,
q_date TIMESTAMP,
- job_universe INT,
+ job_universe BIGINT,
title VARCHAR(1000),
user_log VARCHAR(4000),
hold_reason VARCHAR(4000),
dag_node_name VARCHAR(1000),
dag_parent_node_names VARCHAR(4000),
- dag_man_job_id INT,
+ dag_man_job_id BIGINT,
ad BYTEA
);
CREATE UNIQUE INDEX job_source_ids_unique ON job (source_scope_id, source_object_id);
@@ -375,7 +375,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -385,13 +385,13 @@
name VARCHAR(1000),
directory VARCHAR(1000),
base_file_name VARCHAR(1000),
- write_page_size INT,
- write_pages INT,
- read_page_size INT,
- read_pages INT,
- initial_file_count SMALLINT,
- data_file_size INT,
- current_file_count INT
+ write_page_size BIGINT,
+ write_pages BIGINT,
+ read_page_size BIGINT,
+ read_pages BIGINT,
+ initial_file_count INT,
+ data_file_size BIGINT,
+ current_file_count BIGINT
);
CREATE UNIQUE INDEX journal_source_ids_unique ON journal (source_scope_id, source_object_id);
@@ -434,7 +434,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -442,7 +442,7 @@
stats_prev_id INT,
vhost_id INT,
host VARCHAR(1000),
- port INT,
+ port BIGINT,
transport VARCHAR(1000),
durable BOOL
);
@@ -461,7 +461,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -484,11 +484,11 @@
id SERIAL PRIMARY KEY,
rec_time TIMESTAMP,
master_id INT,
- monitor_self_age INT,
+ monitor_self_age BIGINT,
monitor_self_cpu_usage FLOAT,
monitor_self_image_size FLOAT,
- monitor_self_registered_socket_count INT,
- monitor_self_resident_set_size INT,
+ monitor_self_registered_socket_count BIGINT,
+ monitor_self_resident_set_size BIGINT,
monitor_self_time TIMESTAMP
);
@@ -497,7 +497,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -518,11 +518,11 @@
id SERIAL PRIMARY KEY,
rec_time TIMESTAMP,
negotiator_id INT,
- monitor_self_age INT,
+ monitor_self_age BIGINT,
monitor_self_cpu_usage FLOAT,
monitor_self_image_size FLOAT,
- monitor_self_registered_socket_count INT,
- monitor_self_resident_set_size INT,
+ monitor_self_registered_socket_count BIGINT,
+ monitor_self_resident_set_size BIGINT,
monitor_self_time TIMESTAMP
);
@@ -536,7 +536,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -589,7 +589,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -598,7 +598,7 @@
pool VARCHAR(1000),
system VARCHAR(1000),
job_queue_birthdate TIMESTAMP,
- max_jobs_running INT,
+ max_jobs_running BIGINT,
machine VARCHAR(1000),
my_address VARCHAR(1000),
name VARCHAR(1000),
@@ -613,17 +613,17 @@
id SERIAL PRIMARY KEY,
rec_time TIMESTAMP,
scheduler_id INT,
- num_users INT,
- total_held_jobs INT,
- total_idle_jobs INT,
- total_job_ads INT,
- total_removed_jobs INT,
- total_running_jobs INT,
- monitor_self_age INT,
+ num_users BIGINT,
+ total_held_jobs BIGINT,
+ total_idle_jobs BIGINT,
+ total_job_ads BIGINT,
+ total_removed_jobs BIGINT,
+ total_running_jobs BIGINT,
+ monitor_self_age BIGINT,
monitor_self_cpu_usage FLOAT,
monitor_self_image_size FLOAT,
- monitor_self_registered_socket_count INT,
- monitor_self_resident_set_size INT,
+ monitor_self_registered_socket_count BIGINT,
+ monitor_self_resident_set_size BIGINT,
monitor_self_time TIMESTAMP
);
@@ -632,7 +632,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -640,9 +640,9 @@
stats_prev_id INT,
vhost_id INT,
name VARCHAR(1000),
- channel_id SMALLINT,
+ channel_id INT,
client_connection_id INT,
- detached_lifespan INT,
+ detached_lifespan BIGINT,
attached BOOL,
expire_time TIMESTAMP
);
@@ -664,7 +664,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -677,20 +677,20 @@
checkpoint_platform VARCHAR(1000),
client_machine VARCHAR(1000),
concurrency_limits VARCHAR(1000),
- cpus INT,
+ cpus BIGINT,
current_rank FLOAT,
- disk INT,
+ disk BIGINT,
file_system_domain VARCHAR(1000),
global_job_id VARCHAR(1000),
- image_size INT,
+ image_size BIGINT,
is_valid_checkpoint_platform VARCHAR(4000),
job_id VARCHAR(1000),
job_start TIMESTAMP,
- k_flops INT,
+ k_flops BIGINT,
machine VARCHAR(1000),
max_job_retirement_time VARCHAR(4000),
- memory INT,
- mips INT,
+ memory BIGINT,
+ mips BIGINT,
my_address VARCHAR(1000),
name VARCHAR(1000),
op_sys VARCHAR(1000),
@@ -703,23 +703,23 @@
requirements VARCHAR(4000),
public_network_ip_addr VARCHAR(1000),
rank VARCHAR(4000),
- slot_id INT,
+ slot_id BIGINT,
start VARCHAR(4000),
starter_ability_list VARCHAR(4000),
- total_claim_run_time INT,
- total_claim_suspend_time INT,
- total_cpus INT,
- total_disk INT,
- total_job_run_time INT,
- total_job_suspend_time INT,
- total_memory INT,
- total_slots INT,
- total_virtual_memory INT,
+ total_claim_run_time BIGINT,
+ total_claim_suspend_time BIGINT,
+ total_cpus BIGINT,
+ total_disk BIGINT,
+ total_job_run_time BIGINT,
+ total_job_suspend_time BIGINT,
+ total_memory BIGINT,
+ total_slots BIGINT,
+ total_virtual_memory BIGINT,
uid_domain VARCHAR(1000),
- virtual_memory INT,
- windows_build_number INT,
- windows_major_version INT,
- windows_minor_version INT,
+ virtual_memory BIGINT,
+ windows_build_number BIGINT,
+ windows_major_version BIGINT,
+ windows_minor_version BIGINT,
condor_platform VARCHAR(1000),
condor_version VARCHAR(1000),
daemon_start_time TIMESTAMP
@@ -731,13 +731,13 @@
rec_time TIMESTAMP,
slot_id INT,
activity VARCHAR(1000),
- clock_day INT,
- clock_min INT,
+ clock_day BIGINT,
+ clock_min BIGINT,
condor_load_avg FLOAT,
- console_idle INT,
+ console_idle BIGINT,
entered_current_activity TIMESTAMP,
entered_current_state TIMESTAMP,
- keyboard_idle INT,
+ keyboard_idle BIGINT,
last_benchmark TIMESTAMP,
last_fetch_work_completed TIMESTAMP,
last_fetch_work_spawned TIMESTAMP,
@@ -746,27 +746,27 @@
my_current_time TIMESTAMP,
next_fetch_work_delay INT,
state VARCHAR(1000),
- time_to_live INT,
+ time_to_live BIGINT,
total_condor_load_avg FLOAT,
total_load_avg FLOAT,
- total_time_backfill_busy INT,
- total_time_backfill_idle INT,
- total_time_backfill_killing INT,
- total_time_claimed_busy INT,
- total_time_claimed_idle INT,
- total_time_claimed_retiring INT,
- total_time_claimed_suspended INT,
- total_time_matched_idle INT,
- total_time_owner_idle INT,
- total_time_preempting_killing INT,
- total_time_preempting_vacating INT,
- total_time_unclaimed_benchmarking INT,
- total_time_unclaimed_idle INT,
- monitor_self_age INT,
+ total_time_backfill_busy BIGINT,
+ total_time_backfill_idle BIGINT,
+ total_time_backfill_killing BIGINT,
+ total_time_claimed_busy BIGINT,
+ total_time_claimed_idle BIGINT,
+ total_time_claimed_retiring BIGINT,
+ total_time_claimed_suspended BIGINT,
+ total_time_matched_idle BIGINT,
+ total_time_owner_idle BIGINT,
+ total_time_preempting_killing BIGINT,
+ total_time_preempting_vacating BIGINT,
+ total_time_unclaimed_benchmarking BIGINT,
+ total_time_unclaimed_idle BIGINT,
+ monitor_self_age BIGINT,
monitor_self_cpu_usage FLOAT,
monitor_self_image_size FLOAT,
- monitor_self_registered_socket_count INT,
- monitor_self_resident_set_size INT,
+ monitor_self_registered_socket_count BIGINT,
+ monitor_self_resident_set_size BIGINT,
monitor_self_time TIMESTAMP
);
@@ -775,7 +775,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -783,15 +783,15 @@
stats_prev_id INT,
broker_id INT,
location VARCHAR(1000),
- default_initial_file_count SMALLINT,
- default_data_file_size INT,
+ default_initial_file_count INT,
+ default_data_file_size BIGINT,
tpl_is_initialized BOOL,
tpl_directory VARCHAR(1000),
- tpl_write_page_size INT,
- tpl_write_pages INT,
- tpl_initial_file_count SMALLINT,
- tpl_data_file_size INT,
- tpl_current_file_count INT
+ tpl_write_page_size BIGINT,
+ tpl_write_pages BIGINT,
+ tpl_initial_file_count INT,
+ tpl_data_file_size BIGINT,
+ tpl_current_file_count BIGINT
);
CREATE UNIQUE INDEX store_source_ids_unique ON store (source_scope_id, source_object_id);
@@ -815,7 +815,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -833,9 +833,9 @@
id SERIAL PRIMARY KEY,
rec_time TIMESTAMP,
submitter_id INT,
- held_jobs INT,
- idle_jobs INT,
- running_jobs INT
+ held_jobs BIGINT,
+ idle_jobs BIGINT,
+ running_jobs BIGINT
);
CREATE TABLE sysimage (
@@ -843,7 +843,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -855,8 +855,8 @@
release VARCHAR(1000),
version VARCHAR(1000),
machine VARCHAR(1000),
- mem_total INT,
- swap_total INT
+ mem_total BIGINT,
+ swap_total BIGINT
);
CREATE UNIQUE INDEX sysimage_source_ids_unique ON sysimage (source_scope_id, source_object_id);
@@ -864,13 +864,13 @@
id SERIAL PRIMARY KEY,
rec_time TIMESTAMP,
sysimage_id INT,
- mem_free INT,
- swap_free INT,
+ mem_free BIGINT,
+ swap_free BIGINT,
load_average1_min FLOAT,
load_average5_min FLOAT,
load_average10_min FLOAT,
- proc_total INT,
- proc_running INT
+ proc_total BIGINT,
+ proc_running BIGINT
);
CREATE TABLE system (
@@ -878,7 +878,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -904,7 +904,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
- qmf_class_key BYTEA,
+ qmf_class_key VARCHAR(1000),
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
16 years, 2 months
rhmessaging commits: r2875 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-11-24 14:41:05 -0500 (Mon, 24 Nov 2008)
New Revision: 2875
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
Log:
Fix for BZ472215 "qpidd rmgr::get_events() threw JERR__AIO: AIO error". Also fix for txtest failures where journal is forced to run in extern mode.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -246,8 +246,14 @@
{
// Free any previous msg
free_read_buffers();
+
+ // TODO: This is a brutal approach - very inefficient and slow. Rather intruduce a system of remembering
+ // jumpover points and allow the read to jump back to the first known jumpover point - but this needs
+ // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
+ // combination of lid/offset.
if (rid < lastReadRid)
_rmgr.invalidate();
+
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
@@ -262,9 +268,9 @@
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
switch (res) {
case journal::RHM_IORES_SUCCESS:
- if (_dtok.rid() < rid) {
+ if (_dtok.rid() != rid) {
free_read_buffers();
- // reset data token for next read
+ // Reset data token for next read
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
@@ -283,19 +289,23 @@
std::stringstream ss;
ss << "read_data_record() returned " << journal::iores_str(res);
ss << "; exceeded maximum wait time";
- throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+ throw jexception(journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
+ "loadMsgContent");
}
break;
default:
std::stringstream ss;
ss << "read_data_record() returned " << journal::iores_str(res);
- throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+ throw jexception(journal::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl",
+ "loadMsgContent");
}
}
if (!rid_found) {
std::stringstream ss;
- ss << "read_data_record() was unable to find rid " << rid << "; last rid found was " << _dtok.rid();
- throw jexception(0, ss.str().c_str(), "JournalImpl", "loadMsgContent");
+ ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec;
+ ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec;
+ ss << " (" << _dtok.rid() << ")";
+ throw jexception(journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
}
}
if (_external)
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -1257,8 +1257,7 @@
}
}
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": loadContent() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
}
TxnCtxt txn;
txn.begin(env, true);
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -51,6 +51,9 @@
const u_int32_t jerrno::JERR__FILEIO = 0x0104;
const u_int32_t jerrno::JERR__RTCLOCK = 0x0105;
const u_int32_t jerrno::JERR__PTHREAD = 0x0106;
+const u_int32_t jerrno::JERR__TIMEOUT = 0x0107;
+const u_int32_t jerrno::JERR__UNEXPRESPONSE = 0x0108;
+const u_int32_t jerrno::JERR__RECNFOUND = 0x0109;
// class jcntl
const u_int32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
@@ -138,6 +141,9 @@
_err_map[JERR__FILEIO] = "JERR__FILEIO: File read or write failure.";
_err_map[JERR__RTCLOCK] = "JERR__RTCLOCK: Reading real-time clock failed.";
_err_map[JERR__PTHREAD] = "JERR__PTHREAD: pthread failure.";
+ _err_map[JERR__TIMEOUT] = "JERR__TIMEOUT: Timeout waiting for event.";
+ _err_map[JERR__UNEXPRESPONSE] = "JERR__UNEXPRESPONSE: Unexpected response to call or event.";
+ _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal.";
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -69,6 +69,9 @@
static const u_int32_t JERR__FILEIO; ///< File read or write failure
static const u_int32_t JERR__RTCLOCK; ///< Reading real-time clock failed
static const u_int32_t JERR__PTHREAD; ///< pthread failure
+ static const u_int32_t JERR__TIMEOUT; ///< Timeout waiting for an event
+ static const u_int32_t JERR__UNEXPRESPONSE; ///< Unexpected response to call or event
+ static const u_int32_t JERR__RECNFOUND; ///< Record not found
// class jcntl
static const u_int32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -199,8 +199,7 @@
oss << "rid=0x" << std::setw(16) << _hdr._rid;
oss << "; dtok_rid=" << std::setw(16) << dtokp->rid();
oss << "; dtok_id=0x" << std::setw(8) << dtokp->id();
- throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(), "rmgr",
- "read");
+ throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(), "rmgr", "read");
}
}
else
@@ -332,12 +331,38 @@
rmgr::invalidate()
{
if (_rrfc.is_valid())
+ _rrfc.set_invalid();
+}
+
+#define MAX_AIO_SLEEPS 1000 // 10 sec
+#define AIO_SLEEP_TIME 10000 // 10 ms
+void
+rmgr::init_validation()
+{
+ // Wait for any outstanding AIO read operations to complete before synchronizing
+ int aio_sleep_cnt = 0;
+ while (_aio_evt_rem)
{
- for (int i=0; i<_cache_num_pages; i++)
- _page_cb_arr[i]._state = UNUSED;
- _rrfc.unset_findex();
- _pg_offset_dblks = 0;
+ get_events();
+ if (_aio_evt_rem)
+ {
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+ {
+ get_events();
+ usleep(AIO_SLEEP_TIME);
+ }
+ else
+ throw jexception(jerrno::JERR__TIMEOUT,
+ "Invalidate timed out waiting for outstanding read aio to return", "rmgr", "invalidate");
+ }
}
+
+ // Reset all read states and pointers
+ for (int i=0; i<_cache_num_pages; i++)
+ _page_cb_arr[i]._state = UNUSED;
+ _rrfc.unset_findex();
+ _pg_index = 0;
+ _pg_offset_dblks = 0;
}
void
@@ -525,6 +550,7 @@
{
int16_t first_uninit = -1;
u_int16_t num_uninit = 0;
+ u_int16_t num_compl = 0;
bool outstanding = false;
// Index must start with current buffer and cycle around so that first
// uninitialized buffer is initialized first
@@ -543,12 +569,17 @@
case AIO_PENDING:
outstanding = true;
break;
+ case AIO_COMPLETE:
+ num_compl++;
+ break;
default:;
}
}
iores res = RHM_IORES_SUCCESS;
if (num_uninit)
res = init_aio_reads(first_uninit, num_uninit);
+ else if (num_compl == _cache_num_pages) // This condition exists after invalidation
+ res = init_aio_reads(0, _cache_num_pages);
if (outstanding)
get_events();
return res;
@@ -564,6 +595,7 @@
if (!_rrfc.is_valid())
{
+ init_validation();
_jc->get_earliest_fid(); // calls _rrfc.set_findex()
// If this file has not yet been written to, return RHM_IORES_EMPTY
if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
@@ -584,6 +616,7 @@
// space into all contiguous empty pages in one AIO operation.
u_int32_t file_rem_dblks = _rrfc.remaining_dblks();
+ file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary
u_int32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
if (rd_size)
@@ -592,8 +625,7 @@
// TODO: For perf, combine contiguous pages into single read
// 1 or 2 AIOs needed depending on whether read block folds
aio_cb* aiocbp = &_aio_cb_arr[pi];
- aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi],
- rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
+ aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads");
_rrfc.add_subm_cnt_dblks(rd_size);
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -92,6 +92,7 @@
private:
void initialize();
void clean();
+ void init_validation();
iores pre_read_check(data_tok* dtokp);
iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -79,10 +79,7 @@
u_int16_t next_fc_index = _fc_index + 1;
if (next_fc_index == _lfmp->num_jfiles())
next_fc_index = 0;
- fcntl* next_fc = _lfmp->get_fcntlp(next_fc_index);
- _fc_index = next_fc_index;
- _curr_fc = next_fc;
- open_fh(_curr_fc->fname());
+ set_findex(next_fc_index);
return RHM_IORES_SUCCESS;
}
16 years, 2 months
rhmessaging commits: r2874 - mgmt/trunk/sesame/cpp/etc.
by rhmessaging-commits@lists.jboss.org
Author: tedross
Date: 2008-11-24 14:40:44 -0500 (Mon, 24 Nov 2008)
New Revision: 2874
Modified:
mgmt/trunk/sesame/cpp/etc/sesame.conf
Log:
Filled out config file with defaults
Modified: mgmt/trunk/sesame/cpp/etc/sesame.conf
===================================================================
--- mgmt/trunk/sesame/cpp/etc/sesame.conf 2008-11-24 19:25:16 UTC (rev 2873)
+++ mgmt/trunk/sesame/cpp/etc/sesame.conf 2008-11-24 19:40:44 UTC (rev 2874)
@@ -15,9 +15,9 @@
## proto=ssl, 5671
## proto=rdma, 5672
##
-#host=localhost
-#proto=tcp
-#port=5672
+host=localhost
+proto=tcp
+port=5672
##======================
## Agent Authentication
@@ -29,9 +29,9 @@
## the password in this configuration file, you may use pwd-file to point
## to an access-restricted file containing the password.
##
-#mech=PLAIN
-#uid=guest
-#pwd=guest
+mech=PLAIN
+uid=guest
+pwd=guest
#pwd-file=/etc/sesame/password
##==============
@@ -41,5 +41,5 @@
##
## Set the path to the directory where sesame will store persistent data.
##
-#state-dir=/var/sesame
+#state-dir=/var/lib/sesame
16 years, 2 months
rhmessaging commits: r2873 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-24 14:25:16 -0500 (Mon, 24 Nov 2008)
New Revision: 2873
Modified:
mgmt/trunk/cumin/python/cumin/limits.py
mgmt/trunk/cumin/python/cumin/model.py
Log:
Don't convert the limits to strings. Pass them as floats.
Modified: mgmt/trunk/cumin/python/cumin/limits.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/limits.py 2008-11-24 19:13:14 UTC (rev 2872)
+++ mgmt/trunk/cumin/python/cumin/limits.py 2008-11-24 19:25:16 UTC (rev 2873)
@@ -207,7 +207,7 @@
def process_submit(self, session, *args):
max = self.max.get(session)
- imax = 0
+ fmax = 0
errors = False
try:
fmax = float(max)
@@ -221,7 +221,7 @@
if not errors:
limit = args[0]
- limit.max = imax
+ limit.max = fmax
self.frame.set_limit(session, limit)
self.process_cancel(session, *args)
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-24 19:13:14 UTC (rev 2872)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-24 19:25:16 UTC (rev 2873)
@@ -2118,7 +2118,8 @@
def do_invoke(self, limit, negotiator, completion):
Name = limit.id
Max = limit.max
- negotiator.SetLimit(self.model.data, completion, Name, str(Max))
+ negotiator.SetLimit(self.model.data, completion, Name, Max)
+ #negotiator.SetLimit(self.model.data, completion, Name, str(Max))
class CuminJobGroup(CuminClass):
def __init__(self, model):
16 years, 2 months
rhmessaging commits: r2872 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-24 14:13:14 -0500 (Mon, 24 Nov 2008)
New Revision: 2872
Modified:
mgmt/trunk/cumin/python/cumin/limits.py
Log:
Convert limits to float instead of int.
Modified: mgmt/trunk/cumin/python/cumin/limits.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/limits.py 2008-11-24 18:16:15 UTC (rev 2871)
+++ mgmt/trunk/cumin/python/cumin/limits.py 2008-11-24 19:13:14 UTC (rev 2872)
@@ -210,8 +210,8 @@
imax = 0
errors = False
try:
- imax = int(max)
- if imax < 1 or imax > 99999:
+ fmax = float(max)
+ if fmax < 1.0 or fmax > 99999.0:
raise "out of bounds"
except:
errors = True
16 years, 2 months
rhmessaging commits: r2871 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-11-24 13:16:15 -0500 (Mon, 24 Nov 2008)
New Revision: 2871
Modified:
mgmt/trunk/cumin/python/cumin/tools.py
Log:
Normalize broker urls before we store them
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2008-11-24 16:48:07 UTC (rev 2870)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2008-11-24 18:16:15 UTC (rev 2871)
@@ -1,4 +1,4 @@
-import sys, os
+import sys, os, re
from parsley.config import *
from parsley.command import *
@@ -219,6 +219,18 @@
print "Error: a broker at %s already exists" % url
sys.exit(1)
+ url = url.strip()
+
+ expr = re.compile("^amqps?://")
+
+ if not expr.match(url):
+ url = "amqp://%s" % url
+
+ expr = re.compile(":[0-9]+$")
+
+ if not expr.match(url):
+ url = "%s:5672" % url
+
reg = BrokerRegistration(name=name, url=url)
reg.syncUpdate()
16 years, 2 months
rhmessaging commits: r2870 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-24 11:48:07 -0500 (Mon, 24 Nov 2008)
New Revision: 2870
Modified:
mgmt/trunk/cumin/python/cumin/limits.py
Log:
Adding get_sql_values override method to avoid problem with using sql values and the sql "like" clause at the same time.
Modified: mgmt/trunk/cumin/python/cumin/limits.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/limits.py 2008-11-24 14:48:01 UTC (rev 2869)
+++ mgmt/trunk/cumin/python/cumin/limits.py 2008-11-24 16:48:07 UTC (rev 2870)
@@ -252,6 +252,9 @@
"scheduler",
"submitter"])
+ def get_sql_values(self, session, *args):
+ pass
+
def render_sql_where(self, session, limit):
phase_sql = self.get_phase_sql(session)
limits_sql = self.get_limits_sql(session, limit)
16 years, 2 months
rhmessaging commits: r2869 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-24 09:48:01 -0500 (Mon, 24 Nov 2008)
New Revision: 2869
Modified:
mgmt/trunk/cumin/python/cumin/job.py
Log:
Changing the way the job find href is generated so it will work from system frame as well as grid frame.
Modified: mgmt/trunk/cumin/python/cumin/job.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/job.py 2008-11-24 14:36:21 UTC (rev 2868)
+++ mgmt/trunk/cumin/python/cumin/job.py 2008-11-24 14:48:01 UTC (rev 2869)
@@ -225,7 +225,7 @@
try:
first = rows[0]
job = Identifiable(first["id"])
- href = self.frame.job.get_href(session, job)
+ href = self.app.main_page.main.pool.job.get_href(session, job)
self.page.set_redirect_url(session, href)
except:
self.job_search.set_not_found(session, search_term)
16 years, 2 months
rhmessaging commits: r2868 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-24 09:36:21 -0500 (Mon, 24 Nov 2008)
New Revision: 2868
Modified:
mgmt/trunk/cumin/python/cumin/model.py
Log:
Don't try to unpack system id property since it's already a string.
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-24 14:15:50 UTC (rev 2867)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-24 14:36:21 UTC (rev 2868)
@@ -6,7 +6,7 @@
from time import *
from datetime import datetime, timedelta
from types import *
-from struct import unpack
+from struct import unpack, calcsize
from util import *
from formats import *
@@ -672,7 +672,7 @@
def value(self, session, object):
val = super(CuminSystem.SystemIdProperty, self).value(session, object)
- return "%08x-%04x-%04x-%04x-%04x%08x" % unpack("!LHHHHL", val)
+ return val
class CuminMaster(RemoteClass):
def __init__(self, model):
16 years, 2 months