[rhmessaging-commits] rhmessaging commits: r4296 - mgmt/newdata/mint/python/mint.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Sep 15 17:56:29 EDT 2010
Author: justi9
Date: 2010-09-15 17:56:28 -0400 (Wed, 15 Sep 2010)
New Revision: 4296
Modified:
mgmt/newdata/mint/python/mint/update.py
Log:
For bz 629988. Turn write mitigation off in general, except for
broker objects. Further instrumentation revealed that they are the
source of the droppable stat updates.
This change also enhances cumin-data's reporting.
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-09-15 20:01:27 UTC (rev 4295)
+++ mgmt/newdata/mint/python/mint/update.py 2010-09-15 21:56:28 UTC (rev 4296)
@@ -11,8 +11,8 @@
log = logging.getLogger("mint.update")
-minutes_ago = timedelta(minutes=5)
-seconds_ago = timedelta(seconds=60)
+sample_window_min = 60
+sample_window_max = 60 * 5
class UpdateThread(MintDaemonThread):
def __init__(self, app):
@@ -52,27 +52,49 @@
update.process(self)
class UpdateStats(object):
- names = ("*Enqueued", "*Dequeued", "Depth", "*Created", "*Updated",
- "*Deleted", "*Dropped", "*Sql Ops", "Errors", "Cpu (%)",
- "Mem (M)")
- headings = ("%10s " * 11) % names
- values_fmt = "%10.1f %10.1f %10i %10.1f %10.1f %10.1f " + \
- "%10.1f %10.1f %10i %10i %10.1f"
+ group_names = ("Updates", "Agents", "Objects")
+ groups = "%43s | %32s | %32s |" % group_names
+ heading_names = \
+ ("Depth", "*Enqueued", "*Dequeued", "*Dropped",
+ "*Created", "*Updated", "*Deleted",
+ "*Created", "*Updated", "*Deleted",
+ "*Sql Ops", "Errors", "Cpu (%)", "Mem (M)")
+ headings_fmt = \
+ "%10s %10s %10s %10s | " + \
+ "%10s %10s %10s | " + \
+ "%10s %10s %10s | " + \
+ "%10s %10s %10s %10s"
+ headings = headings_fmt % heading_names
+
+
+ values_fmt = \
+ "%10i %10.1f %10.1f %10.1f | " + \
+ "%10.1f %10.1f %10.1f | " + \
+ "%10.1f %10.1f %10.1f | " + \
+ "%10.1f %10i %10i %10.1f"
+
then = None
now = None
def __init__(self, app):
self.enqueued = 0
self.dequeued = 0
-
- self.created = 0
- self.updated = 0
- self.deleted = 0
self.dropped = 0
+ self.agents_created = 0
+ self.agents_updated = 0
+ self.agents_deleted = 0
+
+ self.objects_created = 0
+ self.objects_updated = 0
+ self.objects_deleted = 0
+
+ self.objects_created_by_class = defaultdict(int)
+ self.objects_updated_by_class = defaultdict(int)
+ self.objects_deleted_by_class = defaultdict(int)
+
self.sql_ops = 0
-
self.errors = 0
self.time = None
@@ -101,6 +123,7 @@
return int(line.split()[1])
def print_headings(self):
+ print self.groups
print self.headings
def print_values(self):
@@ -111,22 +134,41 @@
values = [self.now.enqueued - self.then.enqueued,
self.now.dequeued - self.then.dequeued,
- self.now.created - self.then.created,
- self.now.updated - self.then.updated,
- self.now.deleted - self.then.deleted,
self.now.dropped - self.then.dropped,
+ self.now.agents_created - self.then.agents_created,
+ self.now.agents_updated - self.then.agents_updated,
+ self.now.agents_deleted - self.then.agents_deleted,
+ self.now.objects_created - self.then.objects_created,
+ self.now.objects_updated - self.then.objects_updated,
+ self.now.objects_deleted - self.then.objects_deleted,
self.now.sql_ops - self.then.sql_ops]
+ # self.now.dropped - self.then.dropped,
+
secs = self.now.time - self.then.time
values = map(lambda x: x / secs, values)
- values.insert(2, self.now.enqueued - self.now.dequeued)
+ values.insert(0, self.now.enqueued - self.now.dequeued)
values.append(self.errors)
values.append(int((self.now.cpu - self.then.cpu) / secs * 100))
values.append(self.now.memory / 1000000.0)
print self.values_fmt % tuple(values)
+
+ def print_values_by_class(self):
+ names = ("Class", "Created", "Updated", "Deleted")
+ print "%20s %10s %10s %10s" % names
+
+ for pkg in mint.model._packages:
+ for cls in pkg._classes:
+ created = stats.created_by_class[cls]
+ updated = stats.updated_by_class[cls]
+ deleted = stats.deleted_by_class[cls]
+
+ if created or updated or deleted:
+ args = (cls._name, created, updated, deleted)
+ print "%-20s %10i %10i %10i" % args
class UpdateCursor(Cursor):
def execute(self, sql, args=None):
@@ -145,6 +187,8 @@
self.do_process(thread.cursor, thread.stats)
thread.conn.commit()
+
+ # XXX UpdateCommitted
except UpdateDropped:
thread.stats.dropped += 1
except UpdateException, e:
@@ -180,6 +224,8 @@
agent_id = self.get_agent_id()
object_id = self.get_object_id()
+ delete_time = self.qmf_object.getTimestamps()[2]
+
try:
agent = self.model.agents_by_id[agent_id]
except KeyError:
@@ -188,49 +234,28 @@
try:
obj = agent.get_object(cursor, cls, object_id)
except RosemaryNotFound:
- obj = None
+ if not self.qmf_object.getProperties():
+ raise UpdateDropped()
- update_time, create_time, delete_time = self.qmf_object.getTimestamps()
-
- if obj:
if delete_time != 0:
- self.delete_object(cursor, stats, obj)
+ raise UpdateDropped()
- agent.objects_by_id.pop(obj._qmf_object_id)
+ obj = self.create_object(cursor, stats, cls)
- return
+ return
- properties = self.qmf_object.getProperties()
- statistics = self.qmf_object.getStatistics()
+ if delete_time != 0:
+ self.delete_object(cursor, stats, obj)
- if not properties and statistics:
- # Just stats; do we want it?
- # if stats.enqueued - stats.dequeued > 500:
+ del agent.objects_by_id[obj._qmf_object_id]
- now = datetime.now()
- update = datetime.fromtimestamp(update_time / 1000000000)
- sample = obj._sample_time
+ return
- if update < now - minutes_ago:
- # The sample is too old
- raise UpdateDropped()
+ if cls._package is self.model.org_apache_qpid_broker:
+ self.maybe_drop_sample(obj)
- if sample and sample > now - seconds_ago:
- # The samples are too fidelitous
- raise UpdateDropped()
+ self.update_object(cursor, stats, obj)
- self.update_object(cursor, stats, obj)
- else:
- if not self.qmf_object.getProperties():
- raise UpdateDropped()
-
- if delete_time != 0:
- raise UpdateDropped()
-
- obj = self.create_object(cursor, stats, cls)
-
- assert obj
-
def get_agent_id(self):
return make_agent_id(self.qmf_object.getAgent())
@@ -248,13 +273,34 @@
try:
cls = pkg._classes_by_lowercase_name[name.lower()]
except KeyError:
+ # XXX UpdateDropped instead?
raise ClassUnknown(name)
-
+
return cls
def get_object_id(self):
return self.qmf_object.getObjectId().objectName
+ def maybe_drop_sample(self, obj):
+ properties = self.qmf_object.getProperties()
+ statistics = self.qmf_object.getStatistics()
+
+ if not properties and statistics:
+ # Just stats; do we want it?
+ # if stats.enqueued - stats.dequeued > 500:
+
+ now = time.time()
+ update = self.qmf_object.getTimestamps()[0] / 1000000000
+ sample = obj._sample_time
+
+ if update < now - sample_window_max:
+ # The sample is too old
+ raise UpdateDropped()
+
+ if sample and sample > now - sample_window_min:
+ # The samples are too fidelitous
+ raise UpdateDropped()
+
def create_object(self, cursor, stats, cls):
update_time, create_time, delete_time = self.qmf_object.getTimestamps()
create_time = datetime.fromtimestamp(create_time / 1000000000)
@@ -291,7 +337,7 @@
sql = cls.sql_samples_insert.emit(sample_columns)
statements.append(sql)
- obj._sample_time = datetime.now()
+ obj._sample_time = time.time()
sql = "; ".join(statements)
self.execute_sql(cursor, sql, obj.__dict__)
@@ -301,8 +347,10 @@
obj._sync_time = datetime.now()
self.model.print_event(3, "Created %s", obj)
- stats.created += 1
+ stats.objects_created += 1
+ stats.objects_created_by_class[cls] += 1
+
return obj
def process_deferred_links(self, cursor, obj):
@@ -327,12 +375,11 @@
object_columns = list()
sample_columns = list()
- cls = obj._class
-
self.process_properties(obj, object_columns, cursor)
self.process_statistics(obj, object_columns, sample_columns)
statements = list()
+ cls = obj._class
if object_columns:
object_columns.append(cls.sql_table._qmf_update_time)
@@ -346,7 +393,7 @@
sql = cls.sql_samples_insert.emit(sample_columns)
statements.append(sql)
- obj._sample_time = datetime.now()
+ obj._sample_time = time.time()
if not statements:
raise UpdateDropped()
@@ -357,14 +404,18 @@
obj._sync_time = datetime.now()
self.model.print_event(4, "Updated %s", obj)
- stats.updated += 1
+ stats.objects_updated += 1
+ stats.objects_updated_by_class[cls] += 1
+
def delete_object(self, cursor, stats, obj):
obj.delete(cursor)
self.model.print_event(3, "Deleted %s", obj)
- stats.deleted += 1
+ stats.objects_deleted += 1
+ stats.objects_deleted_by_class[obj._class] += 1
+
def process_properties(self, obj, columns, cursor):
cls = obj._class
@@ -527,14 +578,18 @@
except KeyError:
agent = MintAgent(self.model, agent_id)
+ stats.agents_created += 1
+
self.delete_agent_objects(cursor, stats, agent)
+ return
+
#timestamp = timestamp / 1000000000
#agent.last_heartbeat = datetime.fromtimestamp(timestamp)
agent.last_heartbeat = datetime.now()
- stats.updated += 1
+ stats.agents_updated += 1
def delete_agent_objects(self, cursor, stats, agent):
for pkg in self.model._packages:
@@ -547,7 +602,8 @@
count = cls.delete_selection(cursor, _qmf_agent_id=agent.id)
- stats.deleted += count
+ stats.objects_deleted += count
+ stats.objects_deleted_by_class[cls] += count
cursor.connection.commit()
@@ -562,6 +618,8 @@
agent.delete()
+ stats.agents_deleted += 1
+
self.delete_agent_objects(cursor, stats, agent)
class UpdateDropped(Exception):
More information about the rhmessaging-commits
mailing list