Author: nunofsantos
Date: 2008-10-02 18:58:09 -0400 (Thu, 02 Oct 2008)
New Revision: 2582
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/update.py
Log:
handle orphan updates -- ie, child objects whose parents' info hasn't been
received yet -- by deferring their insertion/creation until the parent's info is
received; also, datetime hack to temporarily deal with condor absTime attribs
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-10-02 18:16:42 UTC (rev 2581)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-10-02 22:58:09 UTC (rev 2582)
@@ -301,6 +301,10 @@
self.connCloseListener = None
self.__lock = RLock()
+ # map containing updateObjects that have a missing parent dependency, for deferred
insertion
+ # (missing_class, missing_id.first, missing_id.second) -> [updateObject, ...,
updateObject]
+ self.orphanObjectMap = dict()
+
self.updateThread = update.ModelUpdateThread(self)
assert MintModel.staticInstance is None
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-10-02 18:16:42 UTC (rev 2581)
+++ mgmt/trunk/mint/python/mint/update.py 2008-10-02 22:58:09 UTC (rev 2582)
@@ -6,7 +6,9 @@
from qpid.management import managementClient
from struct import unpack
from schema import schemaReservedWordsMap
+from sqlobject import DateTimeCol, TimestampCol, Col
+import types
import mint
log = logging.getLogger("mint.update")
@@ -67,12 +69,13 @@
return package, cls
-def processAttributes(conn, attrs, cls):
+def processAttributes(conn, attrs, cls, model, updateObj):
attrs.pop("id")
if "connectionRef" in attrs:
attrs["clientConnectionRef"] = attrs.pop("connectionRef")
+ orphan = False
for name in attrs.keys():
rename = schemaReservedWordsMap.get(name)
@@ -94,19 +97,35 @@
try:
attrs[attrname] = conn.getObject(othercls, id)
except KeyError:
- log.info("Referenced object %s '%s' not found" % \
- (clsname, id))
+ log.info("Referenced object %s '%s' not found by key
'%s'" % (clsname, id, attrname))
+ except mint.ObjectNotFound:
+ if not orphan:
+ log.info("Referenced object %s '%s' not found, deferring
creation of orphan object" % (clsname, id))
+ # store object in orphan map, will be picked up later when parent info is
received
+ if (clsname, id.first, id.second) not in model.orphanObjectMap:
+ model.orphanObjectMap[(clsname, id.first, id.second)] = set()
+ model.orphanObjectMap[(clsname, id.first, id.second)].add(updateObj)
+ orphan = True
else:
log.error("Class '%s' not found" % clsname)
elif not hasattr(cls, name):
# Remove attrs that we don't have in our schema
-
log.debug("Class '%s' has no field '%s'" % (cls.__name__,
name))
-
del attrs[name]
+ #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
+ elif name in ("DaemonStartTime", "EnteredCurrentActivity",
"EnteredCurrentState", "JobStart",
+ "LastBenchmark", "LastFetchWorkCompleted",
"LastFetchWorkSpawned", "LastPeriodicCheckpoint",
+ "MyCurrentTime", "QDate",
"JobQueueBirthdate", "MonitorSelfTime") \
+ and (type(attrs[name]) is types.LongType or type(attrs[name]) is
types.IntType or attrs[name] == 0):
+ attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
+ elif name.endswith("Time") and type(attrs[name]) is types.IntType and
attrs[name] == 0:
+ attrs[name] = datetime.fromtimestamp(attrs[name])
+ #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
+ if orphan:
+ return None
+ else:
+ return attrs
- return attrs
-
def getStatsClass(cls):
return getattr(mint, cls.__name__ + "Stats")
@@ -153,7 +172,10 @@
attrs = dict(self.props)
id = attrs["id"]
- processAttributes(self.conn, attrs, cls)
+ if processAttributes(self.conn, attrs, cls, model, self) == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
# XXX move these down to the try/except
@@ -186,6 +208,13 @@
if obj.deletionTime:
log.debug("%s(%i) marked deleted", cls.__name__, obj.id)
+ if (cls.__name__, id.first, id.second) in model.orphanObjectMap:
+ # this object is the parent of orphan objects in the map, re-enqueue for insertion
+ orphanObjects = model.orphanObjectMap.pop((cls.__name__, id.first, id.second))
+ for orphanObj in orphanObjects:
+ model.updateThread.enqueue(orphanObj)
+ log.info("Inserted %d orphan objects whose creation had been deferred" %
(len(orphanObjects)))
+
# XXX refactor this to take advantage of the get/create logic
# above
if isinstance(obj, mint.Broker) and obj.managedBroker:
@@ -217,10 +246,6 @@
args = ("stats", self.conn.id, cls, len(self.stats))
log.info("Processing %-8s %-16s %-16s %3i" % args)
- # XXX temporary work around for absTime type size problem
- if cls == "mrggrid.slot":
- return
-
try:
pkg, cls = unmarshalClassInfo(self.classInfo)
except UnknownClassException, e:
@@ -233,7 +258,10 @@
obj = self.conn.getObject(cls, id)
statscls = getStatsClass(cls)
- processAttributes(self.conn, attrs, statscls)
+ if processAttributes(self.conn, attrs, statscls, model, self) == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
attrs["recTime"] = datetime.fromtimestamp(self.timestamps[0]/1000000000)