[rhmessaging-commits] rhmessaging commits: r4296 - mgmt/newdata/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Sep 15 17:56:29 EDT 2010


Author: justi9
Date: 2010-09-15 17:56:28 -0400 (Wed, 15 Sep 2010)
New Revision: 4296

Modified:
   mgmt/newdata/mint/python/mint/update.py
Log:
For bz 629988.  Turn write mitigation off in general, except for
broker objects.  Further instrumentation revealed that they are the
source of the droppable stat updates.

This change also enhances cumin-data's reporting.


Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-09-15 20:01:27 UTC (rev 4295)
+++ mgmt/newdata/mint/python/mint/update.py	2010-09-15 21:56:28 UTC (rev 4296)
@@ -11,8 +11,8 @@
 
 log = logging.getLogger("mint.update")
 
-minutes_ago = timedelta(minutes=5)
-seconds_ago = timedelta(seconds=60)
+sample_window_min = 60
+sample_window_max = 60 * 5
 
 class UpdateThread(MintDaemonThread):
     def __init__(self, app):
@@ -52,27 +52,49 @@
             update.process(self)
 
 class UpdateStats(object):
-    names = ("*Enqueued", "*Dequeued", "Depth", "*Created", "*Updated",
-             "*Deleted", "*Dropped", "*Sql Ops", "Errors", "Cpu (%)",
-             "Mem (M)")
-    headings = ("%10s  " * 11) % names
-    values_fmt = "%10.1f  %10.1f  %10i  %10.1f  %10.1f  %10.1f  " + \
-        "%10.1f  %10.1f  %10i  %10i  %10.1f"
+    group_names = ("Updates", "Agents", "Objects")
+    groups = "%43s | %32s | %32s |" % group_names
 
+    heading_names = \
+        ("Depth", "*Enqueued", "*Dequeued", "*Dropped",
+         "*Created", "*Updated", "*Deleted",
+         "*Created", "*Updated", "*Deleted",
+         "*Sql Ops", "Errors", "Cpu (%)", "Mem (M)")
+    headings_fmt = \
+        "%10s %10s %10s %10s | " + \
+        "%10s %10s %10s | " + \
+        "%10s %10s %10s | " + \
+        "%10s %10s %10s %10s"
+    headings = headings_fmt % heading_names
+
+
+    values_fmt = \
+        "%10i %10.1f %10.1f %10.1f | " + \
+        "%10.1f %10.1f %10.1f | " + \
+        "%10.1f %10.1f %10.1f | " + \
+        "%10.1f %10i %10i %10.1f"
+
     then = None
     now = None
 
     def __init__(self, app):
         self.enqueued = 0
         self.dequeued = 0
-
-        self.created = 0
-        self.updated = 0
-        self.deleted = 0
         self.dropped = 0
 
+        self.agents_created = 0
+        self.agents_updated = 0
+        self.agents_deleted = 0
+
+        self.objects_created = 0
+        self.objects_updated = 0
+        self.objects_deleted = 0
+
+        self.objects_created_by_class = defaultdict(int)
+        self.objects_updated_by_class = defaultdict(int)
+        self.objects_deleted_by_class = defaultdict(int)
+
         self.sql_ops = 0
-
         self.errors = 0
 
         self.time = None
@@ -101,6 +123,7 @@
         return int(line.split()[1])
 
     def print_headings(self):
+        print self.groups
         print self.headings
 
     def print_values(self):
@@ -111,22 +134,41 @@
 
         values = [self.now.enqueued - self.then.enqueued,
                   self.now.dequeued - self.then.dequeued,
-                  self.now.created - self.then.created,
-                  self.now.updated - self.then.updated,
-                  self.now.deleted - self.then.deleted,
                   self.now.dropped - self.then.dropped,
+                  self.now.agents_created - self.then.agents_created,
+                  self.now.agents_updated - self.then.agents_updated,
+                  self.now.agents_deleted - self.then.agents_deleted,
+                  self.now.objects_created - self.then.objects_created,
+                  self.now.objects_updated - self.then.objects_updated,
+                  self.now.objects_deleted - self.then.objects_deleted,
                   self.now.sql_ops - self.then.sql_ops]
 
+        # self.now.dropped - self.then.dropped,
+
         secs = self.now.time - self.then.time
         values = map(lambda x: x / secs, values)
 
-        values.insert(2, self.now.enqueued - self.now.dequeued)
+        values.insert(0, self.now.enqueued - self.now.dequeued)
 
         values.append(self.errors)
         values.append(int((self.now.cpu - self.then.cpu) / secs * 100))
         values.append(self.now.memory / 1000000.0)
 
         print self.values_fmt % tuple(values)
+
+    def print_values_by_class(self):
+        names = ("Class", "Created", "Updated", "Deleted")
+        print "%20s  %10s  %10s  %10s" % names
+
+        for pkg in mint.model._packages:
+            for cls in pkg._classes:
+                created = stats.created_by_class[cls]
+                updated = stats.updated_by_class[cls]
+                deleted = stats.deleted_by_class[cls]
+
+                if created or updated or deleted:
+                    args = (cls._name, created, updated, deleted)
+                    print "%-20s  %10i  %10i  %10i" % args
         
 class UpdateCursor(Cursor):
     def execute(self, sql, args=None):
@@ -145,6 +187,8 @@
             self.do_process(thread.cursor, thread.stats)
 
             thread.conn.commit()
+
+            # XXX UpdateCommitted
         except UpdateDropped:
             thread.stats.dropped += 1
         except UpdateException, e:
@@ -180,6 +224,8 @@
         agent_id = self.get_agent_id()
         object_id = self.get_object_id()
 
+        delete_time = self.qmf_object.getTimestamps()[2]
+
         try:
             agent = self.model.agents_by_id[agent_id]
         except KeyError:
@@ -188,49 +234,28 @@
         try:
             obj = agent.get_object(cursor, cls, object_id)
         except RosemaryNotFound:
-            obj = None
+            if not self.qmf_object.getProperties():
+                raise UpdateDropped()
 
-        update_time, create_time, delete_time = self.qmf_object.getTimestamps()
-
-        if obj:
             if delete_time != 0:
-                self.delete_object(cursor, stats, obj)
+                raise UpdateDropped()
 
-                agent.objects_by_id.pop(obj._qmf_object_id)
+            obj = self.create_object(cursor, stats, cls)
 
-                return
+            return
 
-            properties = self.qmf_object.getProperties()
-            statistics = self.qmf_object.getStatistics()
+        if delete_time != 0:
+            self.delete_object(cursor, stats, obj)
 
-            if not properties and statistics:
-                # Just stats; do we want it?
-                # if stats.enqueued - stats.dequeued > 500:
+            del agent.objects_by_id[obj._qmf_object_id]
 
-                now = datetime.now()
-                update = datetime.fromtimestamp(update_time / 1000000000)
-                sample = obj._sample_time
+            return
 
-                if update < now - minutes_ago:
-                    # The sample is too old
-                    raise UpdateDropped()
+        if cls._package is self.model.org_apache_qpid_broker:
+            self.maybe_drop_sample(obj)
 
-                if sample and sample > now - seconds_ago:
-                    # The samples are too fidelitous
-                    raise UpdateDropped()
+        self.update_object(cursor, stats, obj)
 
-            self.update_object(cursor, stats, obj)
-        else:
-            if not self.qmf_object.getProperties():
-                raise UpdateDropped()
-
-            if delete_time != 0:
-                raise UpdateDropped()
-
-            obj = self.create_object(cursor, stats, cls)
-
-        assert obj
-
     def get_agent_id(self):
         return make_agent_id(self.qmf_object.getAgent())
 
@@ -248,13 +273,34 @@
         try:
             cls = pkg._classes_by_lowercase_name[name.lower()]
         except KeyError:
+            # XXX UpdateDropped instead?
             raise ClassUnknown(name)
-        
+
         return cls
 
     def get_object_id(self):
         return self.qmf_object.getObjectId().objectName
 
+    def maybe_drop_sample(self, obj):
+        properties = self.qmf_object.getProperties()
+        statistics = self.qmf_object.getStatistics()
+
+        if not properties and statistics:
+            # Just stats; do we want it?
+            # if stats.enqueued - stats.dequeued > 500:
+
+            now = time.time()
+            update = self.qmf_object.getTimestamps()[0] / 1000000000
+            sample = obj._sample_time
+
+            if update < now - sample_window_max:
+                # The sample is too old
+                raise UpdateDropped()
+
+            if sample and sample > now - sample_window_min:
+                # The samples are too fidelitous
+                raise UpdateDropped()
+
     def create_object(self, cursor, stats, cls):
         update_time, create_time, delete_time = self.qmf_object.getTimestamps()
         create_time = datetime.fromtimestamp(create_time / 1000000000)
@@ -291,7 +337,7 @@
             sql = cls.sql_samples_insert.emit(sample_columns)
             statements.append(sql)
 
-            obj._sample_time = datetime.now()
+            obj._sample_time = time.time()
 
         sql = "; ".join(statements)
         self.execute_sql(cursor, sql, obj.__dict__)
@@ -301,8 +347,10 @@
         obj._sync_time = datetime.now()
 
         self.model.print_event(3, "Created %s", obj)
-        stats.created += 1
 
+        stats.objects_created += 1
+        stats.objects_created_by_class[cls] += 1
+
         return obj
 
     def process_deferred_links(self, cursor, obj):
@@ -327,12 +375,11 @@
         object_columns = list()
         sample_columns = list()
 
-        cls = obj._class
-
         self.process_properties(obj, object_columns, cursor)
         self.process_statistics(obj, object_columns, sample_columns)
 
         statements = list()
+        cls = obj._class
 
         if object_columns:
             object_columns.append(cls.sql_table._qmf_update_time)
@@ -346,7 +393,7 @@
             sql = cls.sql_samples_insert.emit(sample_columns)
             statements.append(sql)
 
-            obj._sample_time = datetime.now()
+            obj._sample_time = time.time()
 
         if not statements:
             raise UpdateDropped()
@@ -357,14 +404,18 @@
         obj._sync_time = datetime.now()
 
         self.model.print_event(4, "Updated %s", obj)
-        stats.updated += 1
 
+        stats.objects_updated += 1
+        stats.objects_updated_by_class[cls] += 1
+
     def delete_object(self, cursor, stats, obj):
         obj.delete(cursor)
 
         self.model.print_event(3, "Deleted %s", obj)
-        stats.deleted += 1
 
+        stats.objects_deleted += 1
+        stats.objects_deleted_by_class[obj._class] += 1
+
     def process_properties(self, obj, columns, cursor):
         cls = obj._class
 
@@ -527,14 +578,18 @@
         except KeyError:
             agent = MintAgent(self.model, agent_id)
 
+            stats.agents_created += 1
+
             self.delete_agent_objects(cursor, stats, agent)
 
+            return
+
         #timestamp = timestamp / 1000000000
         #agent.last_heartbeat = datetime.fromtimestamp(timestamp)
 
         agent.last_heartbeat = datetime.now()
 
-        stats.updated += 1
+        stats.agents_updated += 1
 
     def delete_agent_objects(self, cursor, stats, agent):
         for pkg in self.model._packages:
@@ -547,7 +602,8 @@
 
                 count = cls.delete_selection(cursor, _qmf_agent_id=agent.id)
 
-                stats.deleted += count
+                stats.objects_deleted += count
+                stats.objects_deleted_by_class[cls] += count
 
                 cursor.connection.commit()
 
@@ -562,6 +618,8 @@
 
         agent.delete()
 
+        stats.agents_deleted += 1
+
         self.delete_agent_objects(cursor, stats, agent)
 
 class UpdateDropped(Exception):



More information about the rhmessaging-commits mailing list