Author: justi9
Date: 2010-06-24 09:34:15 -0400 (Thu, 24 Jun 2010)
New Revision: 4047
Modified:
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
Log:
* Move get_object to MintAgent; add a delete_object there
* Set a max queue size for the update thread; this obviates the yield
logic that was present before
* Use just one cursor, and pipe it through to functions where needed
* Use the incoming qmf update time to reckon whether a sample is too
fidelitous
* A more performant method for dispatching to value transform
functions
* Move model agent delete to after the AgentDelete update is
completed
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/model.py 2010-06-24 13:34:15 UTC (rev 4047)
@@ -78,5 +78,28 @@
self.model = None
+ def get_object(self, cursor, cls, object_id):
+ try:
+ return self.objects_by_id[object_id]
+ except KeyError:
+ obj = RosemaryObject(cls, None)
+ obj._qmf_agent_id = self.id
+ obj._qmf_object_id = object_id
+
+ try:
+ cls.load_object_by_qmf_id(cursor, obj)
+ except RosemaryNotFound:
+ obj._id = cls.get_new_id(cursor)
+
+ self.objects_by_id[object_id] = obj
+
+ return obj
+
+ def delete_object(self, object_id):
+ try:
+ del self.objects_by_id[object_id]
+ except KeyError:
+ pass
+
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.id)
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/session.py 2010-06-24 13:34:15 UTC (rev 4047)
@@ -78,8 +78,6 @@
except KeyError:
return
- agent.delete()
-
if not self.model.app.update_thread.isAlive():
return
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-24 13:34:15 UTC (rev 4047)
@@ -16,35 +16,25 @@
def __init__(self, app):
super(UpdateThread, self).__init__(app)
- self.updates = ConcurrentQueue()
+ self.updates = ConcurrentQueue(maxsize=1000)
self.stats = UpdateStats(self.app)
self.conn = None
- self.read_cursor = None
- self.write_cursor = None
+ self.cursor = None
self.halt_on_error = False
def init(self):
self.conn = self.app.database.get_connection()
- self.read_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
- self.write_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+ self.cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+ self.cursor.stats = self.stats
- self.read_cursor.stats = self.stats
- self.write_cursor.stats = self.stats
-
def enqueue(self, update):
self.updates.put(update)
self.stats.enqueued += 1
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
-
- if self.updates.qsize() > 1000:
- sleep(0)
-
def run(self):
while True:
if self.stop_requested:
@@ -152,7 +142,7 @@
log.debug("Processing %s", self)
try:
- self.do_process(thread.write_cursor, thread.stats)
+ self.do_process(thread.cursor, thread.stats)
thread.conn.commit()
except UpdateException, e:
@@ -185,8 +175,7 @@
self.object = obj
def do_process(self, cursor, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
+ cls, obj = self.get_object(cursor)
update_time, create_time, delete_time = self.object.getTimestamps()
update_time = datetime.fromtimestamp(update_time / 1000000000)
@@ -200,7 +189,7 @@
# We don't have this object yet
stats.dropped += 1; return
- if stats.enqueued - stats.dequeued > 1000:
+ if stats.enqueued - stats.dequeued > 500:
if update_time < now - minutes_ago:
# The sample is too old
stats.dropped += 1; return
@@ -215,7 +204,7 @@
sample_columns = list()
self.process_headers(obj, object_columns)
- self.process_properties(obj, object_columns)
+ self.process_properties(obj, object_columns, cursor)
self.process_statistics(obj, object_columns, sample_columns)
statements = list()
@@ -246,7 +235,7 @@
statements.append(sql)
stats.sampled += 1
- obj._sample_time = now
+ obj._sample_time = update_time
if statements:
text = "; ".join(statements)
@@ -268,7 +257,7 @@
else:
stats.dropped += 1
- def get_class(self):
+ def get_object(self, cursor):
class_key = self.object.getClassKey()
name = class_key.getPackageName()
@@ -284,31 +273,13 @@
cls = pkg._classes_by_lowercase_name[name.lower()]
except KeyError:
raise ClassUnknown(name)
-
- return cls
- def get_object(self, cls, object_id):
- try:
- return self.agent.objects_by_id[object_id]
- except KeyError:
- cursor = self.model.app.update_thread.read_cursor
+ object_id = self.object.getObjectId().objectName
- obj = RosemaryObject(cls, None)
- obj._qmf_agent_id = self.agent.id
- obj._qmf_object_id = object_id
+ obj = self.agent.get_object(cursor, cls, object_id)
+
+ return cls, obj
- #try:
- try:
- cls.load_object_by_qmf_id(cursor, obj)
- except RosemaryNotFound:
- obj._id = cls.get_new_id(cursor)
- #finally:
- # cursor.close()
-
- self.agent.objects_by_id[object_id] = obj
-
- return obj
-
def process_headers(self, obj, columns):
table = obj._class.sql_table
@@ -337,13 +308,14 @@
columns.append(table._qmf_class_key)
columns.append(table._qmf_create_time)
- def process_properties(self, obj, columns):
+ def process_properties(self, obj, columns, cursor):
cls = obj._class
for prop, value in self.object.getProperties():
try:
if prop.type == 10:
- col, nvalue = self.process_reference(cls, prop, value)
+ col, nvalue = self.process_reference \
+ (cls, prop, value, cursor)
else:
col, nvalue = self.process_value(cls, prop, value)
except MappingException, e:
@@ -359,7 +331,7 @@
setattr(obj, col.name, nvalue)
columns.append(col)
- def process_reference(self, cls, prop, value):
+ def process_reference(self, cls, prop, value, cursor):
try:
ref = cls._references_by_name[prop.name]
except KeyError:
@@ -374,9 +346,9 @@
try:
that_id = str(value.objectName)
except:
- raise MappingException("XXX ref isn't an oid")
+ raise MappingException("Reference isn't an oid")
- that = self.get_object(ref.that_cls, that_id)
+ that = self.agent.get_object(cursor, ref.that_cls, that_id)
if not that._sync_time:
msg = "Referenced object %s hasn't appeared yet"
@@ -393,26 +365,10 @@
raise MappingException("Property %s is unknown" % prop)
if value is not None:
- value = self.transform_value(prop, value)
+ value = transform_value(prop, value)
return col, value
- def transform_value(self, attr, value):
- if attr.type == 8: # absTime
- if value == 0:
- value = None
- else:
- value = datetime.fromtimestamp(value / 1000000000)
- # XXX value = TimestampFromTicks(value / 1000000000)
- elif attr.type == 15: # map
- value = pickle.dumps(value)
- elif attr.type == 10: # objId
- value = str(value)
- elif attr.type == 14: # uuid
- value = str(value)
-
- return value
-
def process_statistics(self, obj, update_columns, insert_columns):
for stat, value in self.object.getStatistics():
try:
@@ -423,7 +379,7 @@
continue
if value is not None:
- value = self.transform_value(stat, value)
+ value = transform_value(stat, value)
# XXX hack workaround
if col.name == "MonitorSelfTime":
@@ -450,17 +406,13 @@
class ObjectDelete(ObjectUpdate):
def do_process(self, cursor, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
+ cls, obj = self.get_object(cursor)
self.model.print_event(3, "Deleting %s, from %s", obj, self.agent)
obj.delete(cursor)
- try:
- del self.agent.objects_by_id[self.object.getObjectId().objectName]
- except KeyError:
- pass
+ self.agent.delete_object(obj._qmf_object_id)
stats.deleted += 1
@@ -483,6 +435,8 @@
stats.deleted += 1
+ self.agent.delete()
+
class UpdateException(Exception):
def __init__(self, name):
self.name = name
@@ -501,3 +455,23 @@
class MappingException(Exception):
pass
+
+def transform_default(value):
+ return value
+
+def transform_timestamp(value):
+ if value != 0:
+ return datetime.fromtimestamp(value / 1000000000)
+
+def transform_pickle(value):
+ return pickle.dumps(x)
+
+transformers = list([transform_default for x in range(32)])
+
+transformers[8] = transform_timestamp
+transformers[10] = str
+transformers[14] = str
+transformers[15] = transform_pickle
+
+def transform_value(attr, value):
+ return transformers[attr.type](value)