[rhmessaging-commits] rhmessaging commits: r4024 - in mgmt/newdata: mint/python/mint and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jun 14 11:55:33 EDT 2010


Author: justi9
Date: 2010-06-14 11:55:33 -0400 (Mon, 14 Jun 2010)
New Revision: 4024

Modified:
   mgmt/newdata/cumin/bin/cumin-data
   mgmt/newdata/mint/python/mint/session.py
   mgmt/newdata/mint/python/mint/update.py
   mgmt/newdata/mint/python/mint/util.py
Log:
 * Use a single update class to handle prop and stat updates

 * Improve reporting from cumin-data --print-stats

 * Reuse a read cursor for get_object, a notable scalability gain

 * Put in a workaround for some unexpectedly null data

 * Quiet the debug logging a little



Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data	2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/cumin/bin/cumin-data	2010-06-14 15:55:33 UTC (rev 4024)
@@ -38,7 +38,7 @@
         count = 0
 
         if opts.print_stats:
-            print "[Reported values are the number of events per second]"
+            print "[Starred columns are the number of events per second]"
 
             while True:
                 if count % 24 == 0:
@@ -46,7 +46,7 @@
 
                 count += 1
 
-                stats.print_rates()
+                stats.print_values()
 
                 sleep(5)
         else:

Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py	2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/session.py	2010-06-14 15:55:33 UTC (rev 4024)
@@ -108,12 +108,6 @@
         if not self.model.app.update_thread.isAlive():
             return
 
-        # XXX objectProps is getting called even if no properties are
-        # set
-
-        if not obj.getProperties():
-            return
-
         if obj.getTimestamps()[2]:
             up = ObjectDelete(self.model, agent, obj)
         else:
@@ -127,7 +121,7 @@
         if not self.model.app.update_thread.isAlive():
             return
 
-        up = ObjectAddSample(self.model, agent, obj)
+        up = ObjectUpdate(self.model, agent, obj)
         self.model.app.update_thread.enqueue(up)
 
     def event(self, broker, event):

Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/update.py	2010-06-14 15:55:33 UTC (rev 4024)
@@ -1,4 +1,5 @@
 import copy
+import resource
 import pickle
 
 from psycopg2 import IntegrityError, TimestampFromTicks
@@ -7,33 +8,36 @@
 
 log = logging.getLogger("mint.update")
 
+minutes_ago = timedelta(minutes=5)
+seconds_ago = timedelta(seconds=60)
+
 class UpdateThread(MintDaemonThread):
     def __init__(self, app):
         super(UpdateThread, self).__init__(app)
 
         self.updates = ConcurrentQueue()
+        self.stats = UpdateStats(self.app)
 
-        self.stats = UpdateStats(self.app)
+        self.thread = None
         self.conn = None
+        self.read_cursor = None
 
-        self.halt_on_error = True
+        self.halt_on_error = False
 
     def init(self):
         self.conn = self.app.database.get_connection()
+        self.read_cursor = self.conn.cursor()
 
     def enqueue(self, update):
-        update.thread = self
-
         self.updates.put(update)
 
-        if self.stats:
-            self.stats.enqueued += 1
+        self.stats.enqueued += 1
 
         # This is an attempt to yield from the enqueueing thread (this
         # method's caller) to the update thread
 
         if self.updates.qsize() > 1000:
-            sleep(0.1)
+            sleep(0)
 
     def run(self):
         while True:
@@ -45,15 +49,19 @@
             except Empty:
                 continue
 
-            if self.stats:
-                self.stats.dequeued += 1
+            self.stats.dequeued += 1
 
+            update.thread = self
+
             update.process(self.conn, self.stats)
 
 class UpdateStats(object):
-    names = ("Enqueued", "Dequeued", "Updated", "Deleted", "Dropped")
-    headings = ("%8s  " * 5) % names
-    rates_fmt = ("%8.1f  " * 5)
+    names = ("*Enqueued", "*Dequeued", "Depth", "*Created", "*Updated",
+             "*Sampled", "*Deleted", "*Dropped",
+             "Errors", "CPU (%)", "Mem (M)")
+    headings = ("%10s  " * 11) % names
+    values_fmt = "%10.1f  %10.1f  %10i  %10.1f  %10.1f  %10.1f  %10.1f  " + \
+        "%10.1f  %10i  %10i  %10.1f"
 
     then = None
     now = None
@@ -62,27 +70,40 @@
         self.enqueued = 0
         self.dequeued = 0
 
+        self.created = 0
         self.updated = 0
+        self.sampled = 0
         self.deleted = 0
         self.dropped = 0
 
-        self.samples_updated = 0
-        self.samples_expired = 0
-        self.samples_dropped = 0
+        self.errors = 0
 
         self.time = None
+        self.cpu = 0
+        self.memory = 0
 
     def capture(self):
         now = copy.copy(self)
+
         now.time = time.time()
 
+        rusage = resource.getrusage(resource.RUSAGE_SELF) 
+
+        now.cpu = rusage[0] + rusage[1]
+        now.memory = self.get_resident_pages() * resource.getpagesize()
+
         UpdateStats.then = UpdateStats.now
         UpdateStats.now = now
 
+    def get_resident_pages(self):
+        line = open("/proc/%i/statm" % os.getpid()).read()
+
+        return int(line.split()[1])
+
     def print_headings(self):
         print self.headings
 
-    def print_rates(self):
+    def print_values(self):
         self.capture()
 
         if not self.then:
@@ -90,18 +111,22 @@
 
         values = [self.now.enqueued - self.then.enqueued,
                   self.now.dequeued - self.then.dequeued,
+                  self.now.created - self.then.created,
                   self.now.updated - self.then.updated,
+                  self.now.sampled - self.then.sampled,
                   self.now.deleted - self.then.deleted,
                   self.now.dropped - self.then.dropped]
 
-        # XXX
-        values[2] += self.now.samples_updated - self.then.samples_updated
-        values[4] += self.now.samples_dropped - self.then.samples_dropped
-
         secs = self.now.time - self.then.time
-        rates = map(lambda x: x / secs, values)
+        values = map(lambda x: x / secs, values)
 
-        print self.rates_fmt % tuple(rates)
+        values.insert(2, self.now.enqueued - self.now.dequeued)
+
+        values.append(self.errors)
+        values.append(int((self.now.cpu - self.then.cpu) / secs * 100))
+        values.append(self.now.memory / 1000000.0)
+
+        print self.values_fmt % tuple(values)
         
 class Update(object):
     def __init__(self, model):
@@ -123,7 +148,9 @@
 
             conn.rollback()
 
-            if self.model.app.update_thread.halt_on_error:
+            stats.errors += 1
+
+            if self.thread.halt_on_error:
                 raise
 
     def do_process(self, conn, stats):
@@ -143,20 +170,67 @@
         cls = self.get_class()
         obj = self.get_object(cls, self.object.getObjectId().objectName)
 
-        columns = list()
+        if not obj._sync_time and not self.object.getProperties():
+            # This is a sample for an object we don't have yet
+            stats.dropped += 1; return
 
-        self.process_headers(obj, columns)
-        self.process_properties(obj, columns)
+        update_time, create_time, delete_time = self.object.getTimestamps()
+        update_time = datetime.fromtimestamp(update_time / 1000000000)
 
+        last_update_time = obj._qmf_update_time
+
+        obj._qmf_update_time = update_time
+
+        object_columns = list()
+        sample_columns = list()
+
+        self.process_headers(obj, object_columns)
+        self.process_properties(obj, object_columns)
+        self.process_statistics(obj, object_columns, sample_columns)
+
         cursor = conn.cursor()
 
         try:
-            obj.save(cursor, columns)
+            if object_columns:
+                object_columns.append(cls.sql_table._qmf_update_time)
+
+                new = obj._sync_time is None
+
+                obj.save(cursor, object_columns)
+
+                if new:
+                    stats.created += 1
+                else:
+                    stats.updated += 1
+
+            if sample_columns:
+                drop = False
+
+                if stats.enqueued - stats.dequeued > 100:
+                    # There's some pressure, so consider dropping samples
+
+                    now = datetime.now()
+
+                    if update_time < now - minutes_ago:
+                        # The sample is too old
+                        drop = True
+
+                    if last_update_time and last_update_time > now - seconds_ago:
+                        # The samples are too fidelitous
+                        drop = True
+
+                if drop:
+                    stats.dropped += 1
+                else:
+                    col = cls.sql_samples_table._qmf_update_time
+                    sample_columns.append(col)
+
+                    obj.add_sample(cursor, sample_columns)
+
+                    stats.sampled += 1
         finally:
             cursor.close()
 
-        stats.updated += 1
-
     def get_class(self):
         class_key = self.object.getClassKey()
 
@@ -181,20 +255,19 @@
         try:
             return self.agent.objects_by_id[object_id]
         except KeyError:
-            conn = self.model.app.database.get_connection()
-            cursor = conn.cursor()
+            cursor = self.thread.read_cursor
 
             obj = RosemaryObject(cls, None)
             obj._qmf_agent_id = self.agent.id
             obj._qmf_object_id = object_id
 
+            #try:
             try:
-                try:
-                    cls.load_object_by_qmf_id(cursor, obj)
-                except RosemaryNotFound:
-                    obj._id = cls.get_new_id(cursor)
-            finally:
-                cursor.close()
+                cls.load_object_by_qmf_id(cursor, obj)
+            except RosemaryNotFound:
+                obj._id = cls.get_new_id(cursor)
+            #finally:
+            #    cursor.close()
 
             self.agent.objects_by_id[object_id] = obj
 
@@ -204,26 +277,21 @@
         table = obj._class.sql_table
 
         update_time, create_time, delete_time = self.object.getTimestamps()
-
-        update_time = datetime.fromtimestamp(update_time / 1000000000)
         create_time = datetime.fromtimestamp(create_time / 1000000000)
 
         if delete_time:
             delete_time = datetime.fromtimestamp(delete_time / 1000000000)
 
-        if obj._sync_time:
-            # This object is already in the database
+            obj._qmf_delete_time = delete_time
+            columns.append(table._qmf_delete_time)
 
-            obj._qmf_update_time = update_time
-            columns.append(table._qmf_update_time)
+        if not obj._sync_time:
+            # The object hasn't been written to the database yet
 
-            # XXX session_id may have changed too?
-        else:
             obj._qmf_agent_id = self.agent.id
             obj._qmf_object_id = self.object.getObjectId().objectName
             obj._qmf_session_id = str(self.object.getObjectId().getSequence())
             obj._qmf_class_key = str(self.object.getClassKey())
-            obj._qmf_update_time = update_time
             obj._qmf_create_time = create_time
 
             columns.append(table._id)
@@ -231,7 +299,6 @@
             columns.append(table._qmf_object_id)
             columns.append(table._qmf_session_id)
             columns.append(table._qmf_class_key)
-            columns.append(table._qmf_update_time)
             columns.append(table._qmf_create_time)
 
     def process_properties(self, obj, columns):
@@ -244,7 +311,7 @@
                 else:
                     col, nvalue = self.process_value(cls, prop, value)
             except MappingException, e:
-                log.debug(e)
+                #log.debug(e)
                 continue
 
             # XXX This optimization will be obsolete when QMF does it
@@ -310,6 +377,34 @@
 
         return value
 
+    def process_statistics(self, obj, update_columns, insert_columns):
+        for stat, value in self.object.getStatistics():
+            try:
+                col = obj._class._statistics_by_name[stat.name].sql_column
+            except KeyError:
+                log.debug("Statistic %s is unknown", stat)
+
+                continue
+
+            if value is not None:
+                value = self.transform_value(stat, value)
+
+            # XXX hack workaround
+            if col.name == "MonitorSelfTime":
+                value = datetime.now()
+
+            # Don't write unchanged values
+            #
+            # XXX This optimization will be obsolete when QMF does it
+            # instead
+
+            if value != getattr(obj, col.name):
+                update_columns.append(col)
+
+            insert_columns.append(col)
+
+            setattr(obj, col.name, value)
+
     def __repr__(self):
         name = self.__class__.__name__
         cls = self.object.getClassKey().getClassName()
@@ -336,71 +431,6 @@
 
         stats.deleted += 1
 
-class ObjectAddSample(ObjectUpdate):
-    def do_process(self, conn, stats):
-        cls = self.get_class()
-        obj = self.get_object(cls, self.object.getObjectId().objectName)
-
-        if not cls._statistics:
-            stats.samples_dropped += 1; return
-
-        if not obj._sync_time:
-            stats.samples_dropped += 1; return
-
-        if stats.enqueued - stats.dequeued > 100:
-            if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
-                stats.samples_dropped += 1; return
-
-        update_time, create_time, delete_time = self.object.getTimestamps()
-
-        update_time = datetime.fromtimestamp(update_time / 1000000000)
-
-        update_columns = list()
-        update_columns.append(cls.sql_table._qmf_update_time)
-
-        insert_columns = list()
-        insert_columns.append(cls.sql_samples_table._qmf_update_time)
-
-        obj._qmf_update_time = update_time
-
-        self.process_samples(obj, update_columns, insert_columns)
-
-        cursor = conn.cursor()
-
-        try:
-            obj.save(cursor, update_columns)
-
-            cls.sql_samples_insert.execute \
-                (cursor, insert_columns, obj.__dict__)
-        finally:
-            cursor.close()
-
-        stats.samples_updated += 1
-
-    def process_samples(self, obj, update_columns, insert_columns):
-        for stat, value in self.object.getStatistics():
-            try:
-                col = obj._class._statistics_by_name[stat.name].sql_column
-            except KeyError:
-                log.debug("Statistic %s is unknown", stat)
-
-                continue
-
-            if value is not None:
-                value = self.transform_value(stat, value)
-
-            # Don't write unchanged values
-            #
-            # XXX This optimization will be obsolete when QMF does it
-            # instead
-
-            if value != getattr(obj, col.name):
-                update_columns.append(col)
-
-            insert_columns.append(col)
-
-            setattr(obj, col.name, value)
-
 class AgentDelete(Update):
     def __init__(self, model, agent):
         super(AgentDelete, self).__init__(model)
@@ -408,8 +438,6 @@
         self.agent = agent
 
     def do_process(self, conn, stats):
-        return # XXX don't delete until we stop getting unexpected deletes
-
         cursor = conn.cursor()
 
         id = self.agent.id

Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py	2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/util.py	2010-06-14 15:55:33 UTC (rev 4024)
@@ -7,6 +7,7 @@
 from Queue import Queue as ConcurrentQueue, Full, Empty
 from crypt import crypt
 from datetime import datetime, timedelta
+from pprint import pprint
 from getpass import getpass
 from qmf.console import ObjectId
 from random import sample



More information about the rhmessaging-commits mailing list