Author: justi9
Date: 2010-07-02 15:58:53 -0400 (Fri, 02 Jul 2010)
New Revision: 4069
Modified:
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/rosemary/python/rosemary/model.py
Log:
* Remove _qmf_class_key from the schema; the new qmf call
implementation doesn't need it
* Handle deletes as a type of update
* Reorganize object update into three distinct cases: create, update,
and delete; this fell out from the data corruption fix
* Be more aggressive about dropping too-old or too-recent samples
* Use an exception to signal a dropped update; this avoids any extra
communication with the database in this case
* Fix get_object_by_qmf_id in rosemary, and use it in update.py
* Remove sampled from the stats; they are always accounted for in
created or updated
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/mint/python/mint/model.py 2010-07-02 19:58:53 UTC (rev 4069)
@@ -80,27 +80,18 @@
self.model = None
- def get_object(self, cursor, cls, object_id):
+ def get_object_by_id(self, object_id):
try:
return self.objects_by_id[object_id]
except KeyError:
- obj = RosemaryObject(cls, None)
- obj._qmf_agent_id = self.id
- obj._qmf_object_id = object_id
+ pass
- try:
- cls.load_object_by_qmf_id(cursor, obj)
- except RosemaryNotFound:
- obj._id = cls.get_new_id(cursor)
-
- return obj
-
def add_object(self, obj):
self.objects_by_id[obj._qmf_object_id] = obj
- def delete_object(self, object_id):
+ def delete_object(self, obj):
try:
- del self.objects_by_id[object_id]
+ del self.objects_by_id[obj._qmf_object_id]
except KeyError:
pass
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/mint/python/mint/session.py 2010-07-02 19:58:53 UTC (rev 4069)
@@ -111,11 +111,7 @@
if not self.model.app.update_thread.isAlive():
return
- if obj.getTimestamps()[2]:
- up = ObjectDelete(self.model, agent, obj)
- else:
- up = ObjectUpdate(self.model, agent, obj)
-
+ up = ObjectUpdate(self.model, agent, obj)
self.model.app.update_thread.enqueue(up)
def objectStats(self, broker, obj):
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/mint/python/mint/update.py 2010-07-02 19:58:53 UTC (rev 4069)
@@ -51,10 +51,10 @@
class UpdateStats(object):
names = ("*Enqueued", "*Dequeued", "Depth",
"*Created", "*Updated",
- "*Sampled", "*Deleted", "*Dropped", "*Sql
Ops",
- "Errors", "Cpu (%)", "Mem (M)")
- headings = ("%10s " * 12) % names
- values_fmt = "%10.1f %10.1f %10i %10.1f %10.1f %10.1f %10.1f " + \
+ "*Deleted", "*Dropped", "*Sql Ops",
"Errors", "Cpu (%)",
+ "Mem (M)")
+ headings = ("%10s " * 11) % names
+ values_fmt = "%10.1f %10.1f %10i %10.1f %10.1f %10.1f " + \
"%10.1f %10.1f %10i %10i %10.1f"
then = None
@@ -66,7 +66,6 @@
self.created = 0
self.updated = 0
- self.sampled = 0
self.deleted = 0
self.dropped = 0
@@ -112,7 +111,6 @@
self.now.dequeued - self.then.dequeued,
self.now.created - self.then.created,
self.now.updated - self.then.updated,
- self.now.sampled - self.then.sampled,
self.now.deleted - self.then.deleted,
self.now.dropped - self.then.dropped,
self.now.sql_ops - self.then.sql_ops]
@@ -145,8 +143,10 @@
self.do_process(thread.cursor, thread.stats)
thread.conn.commit()
+ except UpdateDropped:
+ thread.stats.dropped += 1
except UpdateException, e:
- log.info("Update could not be completed; %s", e)
+ log.exception("Update could not be completed")
thread.conn.rollback()
except:
@@ -168,171 +168,180 @@
return self.__class__.__name__
class ObjectUpdate(Update):
- def __init__(self, model, agent, obj):
+ def __init__(self, model, agent, object):
super(ObjectUpdate, self).__init__(model)
self.agent = agent
- self.object = obj
+ self.object = object
def do_process(self, cursor, stats):
- cls, obj = self.get_object(cursor)
+ cls = self.get_class()
+ obj_id = self.get_object_id()
+ obj = self.agent.get_object_by_id(obj_id)
+ if not obj:
+ try:
+ obj = cls.get_object_by_qmf_id(cursor, self.agent.id, obj_id)
+ except RosemaryNotFound:
+ pass
+
update_time, create_time, delete_time = self.object.getTimestamps()
- update_time = datetime.fromtimestamp(update_time / 1000000000)
- now = datetime.now()
+ if obj:
+ if delete_time != 0:
+ self.delete_object(cursor, stats, obj)
+ return
- if self.object.getStatistics() and not self.object.getProperties():
- # Just stats; do we want it?
+ if not self.object.getProperties() and self.object.getStatistics():
+ # Just stats; do we want it?
+ # if stats.enqueued - stats.dequeued > 500:
- if not obj._sync_time:
- # We don't have this object yet
- stats.dropped += 1; return
+ now = datetime.now()
+ update = datetime.fromtimestamp(update_time / 1000000000)
+ sample = obj._sample_time
- if stats.enqueued - stats.dequeued > 500:
- if update_time < now - minutes_ago:
+ if update < now - minutes_ago:
# The sample is too old
- stats.dropped += 1; return
+ raise UpdateDropped()
- if obj._sample_time and obj._sample_time > now - seconds_ago:
+ if sample and sample > now - seconds_ago:
# The samples are too fidelitous
- stats.dropped += 1; return
+ raise UpdateDropped()
- obj._qmf_update_time = update_time
+ self.update_object(cursor, stats, obj)
+ else:
+ if not self.object.getProperties():
+ raise UpdateDropped()
- object_columns = list()
- sample_columns = list()
+ if delete_time != 0:
+ raise UpdateDropped()
- self.process_headers(obj, object_columns)
- self.process_properties(obj, object_columns, cursor)
- self.process_statistics(obj, object_columns, sample_columns)
+ obj = self.create_object(cursor, stats, cls)
- statements = list()
+ assert obj
- if object_columns:
- object_columns.append(cls.sql_table._qmf_update_time)
+ self.agent.add_object(obj)
- if obj._sync_time:
- sql = cls.sql_update.emit(object_columns)
- stats.updated += 1
+ def get_class(self):
+ class_key = self.object.getClassKey()
+ name = class_key.getPackageName()
- self.model.print_event(4, "Updating %s", obj)
- else:
- sql = cls.sql_insert.emit(object_columns)
- stats.created += 1
+ try:
+ pkg = self.model._packages_by_name[name]
+ except KeyError:
+ raise PackageUnknown(name)
- message = "Creating %s, from %s"
- self.model.print_event(3, message, obj, self.agent)
+ name = class_key.getClassName()
- statements.append(sql)
+ try:
+ cls = pkg._classes_by_lowercase_name[name.lower()]
+ except KeyError:
+ raise ClassUnknown(name)
+
+ return cls
- if sample_columns:
- sample_columns.append(cls.sql_samples_table._qmf_update_time)
+ def get_object_id(self):
+ return self.object.getObjectId().objectName
- sql = cls.sql_samples_insert.emit(sample_columns)
- statements.append(sql)
+ def create_object(self, cursor, stats, cls):
+ update_time, create_time, delete_time = self.object.getTimestamps()
+ create_time = datetime.fromtimestamp(create_time / 1000000000)
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
- stats.sampled += 1
- obj._sample_time = update_time
+ obj = cls.create_object(cursor)
+ obj._qmf_agent_id = self.agent.id
+ obj._qmf_object_id = self.get_object_id()
+ obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+ obj._qmf_create_time = create_time
+ obj._qmf_update_time = update_time
+
+ object_columns = list()
+ sample_columns = list()
- if statements:
- text = "; ".join(statements)
+ table = cls.sql_table
- try:
- cursor.execute(text, obj.__dict__)
- except:
- log.exception("%s failed", self)
+ object_columns.append(table._id)
+ object_columns.append(table._qmf_agent_id)
+ object_columns.append(table._qmf_object_id)
+ object_columns.append(table._qmf_session_id)
+ object_columns.append(table._qmf_create_time)
+ object_columns.append(table._qmf_update_time)
- log.error("Sql text: %s", text)
- log.error("Sql values:")
+ self.process_properties(obj, object_columns, cursor)
+ self.process_statistics(obj, object_columns, sample_columns)
- for item in sorted(obj.__dict__.items()):
- log.error(" %-34s %r", *item)
+ statements = list()
- log.error("Sql object columns:")
+ sql = cls.sql_insert.emit(object_columns)
+ statements.append(sql)
- for item in sorted(object_columns):
- log.error(" %-34s", item)
+ if sample_columns:
+ sample_columns.append(cls.sql_samples_table._qmf_update_time)
- log.error("Sql sample columns:")
+ sql = cls.sql_samples_insert.emit(sample_columns)
+ statements.append(sql)
- for item in sorted(sample_columns):
- log.error(" %-34s", item)
+ obj._sample_time = datetime.now()
- log.error("Sql row count: %i", cursor.rowcount)
+ sql = "; ".join(statements)
+ self.execute_sql(cursor, sql, obj.__dict__)
- log.error("Qmf properties:")
+ obj._sync_time = datetime.now()
- for item in sorted(self.object.getProperties()):
- log.error(" %-34s %r", *item)
+ self.model.print_event(3, "Created %s", obj)
+ stats.created += 1
- log.error("Qmf statistics:")
+ return obj
- for item in sorted(self.object.getStatistics()):
- log.error(" %-34s %r", *item)
+ def update_object(self, cursor, stats, obj):
+ update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
- raise
+ obj._qmf_update_time = update_time
- obj._sync_time = now
+ object_columns = list()
+ sample_columns = list()
- self.add_object(obj)
- else:
- stats.dropped += 1
+ cls = obj._class
- def get_object(self, cursor):
- class_key = self.object.getClassKey()
+ self.process_properties(obj, object_columns, cursor)
+ self.process_statistics(obj, object_columns, sample_columns)
- name = class_key.getPackageName()
+ statements = list()
- try:
- pkg = self.model._packages_by_name[name]
- except KeyError:
- raise PackageUnknown(name)
+ if object_columns:
+ object_columns.append(cls.sql_table._qmf_update_time)
- name = class_key.getClassName()
+ sql = cls.sql_update.emit(object_columns)
+ statements.append(sql)
- try:
- cls = pkg._classes_by_lowercase_name[name.lower()]
- except KeyError:
- raise ClassUnknown(name)
+ if sample_columns:
+ sample_columns.append(cls.sql_samples_table._qmf_update_time)
- object_id = self.object.getObjectId().objectName
+ sql = cls.sql_samples_insert.emit(sample_columns)
+ statements.append(sql)
- obj = self.agent.get_object(cursor, cls, object_id)
-
- return cls, obj
+ obj._sample_time = datetime.now()
- def add_object(self, obj):
- self.agent.add_object(obj)
+ if not statements:
+ raise UpdateDropped()
- def process_headers(self, obj, columns):
- table = obj._class.sql_table
+ sql = "; ".join(statements)
+ self.execute_sql(cursor, sql, obj.__dict__)
- update_time, create_time, delete_time = self.object.getTimestamps()
- create_time = datetime.fromtimestamp(create_time / 1000000000)
+ obj._sync_time = datetime.now()
- if delete_time:
- delete_time = datetime.fromtimestamp(delete_time / 1000000000)
+ self.model.print_event(4, "Updated %s", obj)
+ stats.updated += 1
- obj._qmf_delete_time = delete_time
- columns.append(table._qmf_delete_time)
+ def delete_object(self, cursor, stats, obj):
+ obj.delete(cursor)
- if not obj._sync_time:
- # The object hasn't been written to the database yet
+ self.agent.delete_object(obj)
- obj._qmf_agent_id = self.agent.id
- obj._qmf_object_id = self.object.getObjectId().objectName
- obj._qmf_session_id = str(self.object.getObjectId().getSequence())
- obj._qmf_class_key = str(self.object.getClassKey())
- obj._qmf_create_time = create_time
+ self.model.print_event(3, "Deleted %s", obj)
+ stats.deleted += 1
- columns.append(table._id)
- columns.append(table._qmf_agent_id)
- columns.append(table._qmf_object_id)
- columns.append(table._qmf_session_id)
- columns.append(table._qmf_class_key)
- columns.append(table._qmf_create_time)
-
def process_properties(self, obj, columns, cursor):
cls = obj._class
@@ -373,11 +382,16 @@
except:
raise MappingException("Reference isn't an oid")
- that = self.agent.get_object(cursor, ref.that_cls, that_id)
+ # XXX obviously won't work across agents
+ that = self.agent.get_object_by_id(that_id)
- if not that._sync_time:
- msg = "Referenced object %s hasn't appeared yet"
- raise MappingException(msg % that)
+ if not that:
+ try:
+ that = ref.that_cls.get_object_by_qmf_id \
+ (cursor, self.agent.id, that_id)
+ except RosemaryNotFound:
+ msg = "Referenced object %s hasn't appeared yet"
+ raise MappingException(msg % that)
value = that._id
@@ -422,6 +436,32 @@
setattr(obj, col.name, value)
+ def execute_sql(self, cursor, text, args):
+ try:
+ cursor.execute(text, args)
+ except:
+ log.exception("%s failed", self)
+
+ log.error("Sql text: %s", text)
+ log.error("Sql values:")
+
+ for item in sorted(args.items()):
+ log.error(" %-34s %r", *item)
+
+ log.error("Sql row count: %i", cursor.rowcount)
+
+ log.error("Qmf properties:")
+
+ for item in sorted(self.object.getProperties()):
+ log.error(" %-34s %r", *item)
+
+ log.error("Qmf statistics:")
+
+ for item in sorted(self.object.getStatistics()):
+ log.error(" %-34s %r", *item)
+
+ raise
+
def __repr__(self):
name = self.__class__.__name__
cls = self.object.getClassKey().getClassName()
@@ -429,18 +469,6 @@
return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
-class ObjectDelete(ObjectUpdate):
- def do_process(self, cursor, stats):
- cls, obj = self.get_object(cursor)
-
- self.model.print_event(3, "Deleting %s, from %s", obj, self.agent)
-
- obj.delete(cursor)
-
- self.agent.delete_object(obj._qmf_object_id)
-
- stats.deleted += 1
-
class AgentDelete(Update):
def __init__(self, model, agent):
super(AgentDelete, self).__init__(model)
@@ -460,6 +488,9 @@
stats.deleted += 1
+class UpdateDropped(Exception):
+ pass
+
class UpdateException(Exception):
def __init__(self, name):
self.name = name
Modified: mgmt/newdata/rosemary/python/rosemary/model.py
===================================================================
--- mgmt/newdata/rosemary/python/rosemary/model.py 2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/rosemary/python/rosemary/model.py 2010-07-02 19:58:53 UTC (rev 4069)
@@ -213,10 +213,6 @@
self._qmf_session_id = RosemaryHeader(self, name, sql_text)
self._qmf_session_id.title = "Session ID"
- name = "_qmf_class_key"
- self._qmf_class_key = RosemaryHeader(self, name, sql_text)
- self._qmf_class_key.title = "Class Key"
-
name = "_qmf_create_time"
self._qmf_create_time = RosemaryHeader(self, name, sql_timestamp)
self._qmf_create_time.title = "Create Time"
@@ -377,13 +373,11 @@
return obj
def get_object_by_qmf_id(self, cursor, agent_id, object_id):
- assert isinstance(obj, RosemaryObject)
-
obj = RosemaryObject(self, None)
obj._qmf_agent_id = agent_id
obj._qmf_object_id = object_id
- self.load_object_qmf_id(cursor, obj)
+ self.load_object_by_qmf_id(cursor, obj)
return obj
@@ -692,7 +686,6 @@
self._qmf_object_id = str(datetime.now())
self._qmf_create_time = datetime.now()
self._qmf_update_time = datetime.now()
- self._qmf_class_key = "__rosemary__"
def get_title(self):
if self._class._object_title: