[rhmessaging-commits] rhmessaging commits: r4025 - mgmt/newdata/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jun 14 15:58:25 EDT 2010


Author: justi9
Date: 2010-06-14 15:58:24 -0400 (Mon, 14 Jun 2010)
New Revision: 4025

Modified:
   mgmt/newdata/mint/python/mint/expire.py
   mgmt/newdata/mint/python/mint/session.py
   mgmt/newdata/mint/python/mint/update.py
   mgmt/newdata/mint/python/mint/vacuum.py
Log:
 * Reuse a single dedicated write cursor

 * Use RosemaryObject.delete

 * Keep track of creations versus updates

 * Track sql operation counts


Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py	2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/expire.py	2010-06-14 19:58:24 UTC (rev 4025)
@@ -33,7 +33,7 @@
             sleep(frequency)
 
 class ExpireUpdate(Update):
-    def do_process(self, conn, stats):
+    def do_process(self, cursor, stats):
         seconds = self.model.app.expire_threshold
 
         log.info("Expiring samples older than %i seconds", seconds)
@@ -42,24 +42,17 @@
 
         for pkg in self.model._packages:
             for cls in pkg._classes:
-                count += self.delete_samples(conn, cls, seconds)
+                count += self.delete_samples(cursor, cls, seconds)
 
-                conn.commit()
-
         log.info("Expired %i samples", count)
 
-    def delete_samples(self, conn, cls, seconds):
-        cursor = conn.cursor()
+    def delete_samples(self, cursor, cls, seconds):
+        cls.sql_samples_delete.execute(cursor, (), {"seconds": seconds})
 
-        try:
-            cls.sql_samples_delete.execute(cursor, (), {"seconds": seconds})
+        log.debug("Deleted %i %s", cursor.rowcount, cls)
 
-            log.debug("Deleted %i %s", cursor.rowcount, cls)
+        return cursor.rowcount
 
-            return cursor.rowcount
-        finally:
-            cursor.close()
-
 def convert_time_units(t):
     if t / (24 * 3600) >= 1:
         t_out = t / (24 * 3600)

Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py	2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/session.py	2010-06-14 19:58:24 UTC (rev 4025)
@@ -99,9 +99,6 @@
     def newClass(self, kind, classKey):
         log.info("New class %s", classKey)
 
-        # XXX I want to store class keys using this, but I can't,
-        # because I don't get any agent info; instead
-
     def objectProps(self, broker, obj):
         agent = self.model.get_agent(obj.getAgent())
 

Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/update.py	2010-06-14 19:58:24 UTC (rev 4025)
@@ -3,6 +3,7 @@
 import pickle
 
 from psycopg2 import IntegrityError, TimestampFromTicks
+from psycopg2.extensions import cursor as Cursor
 from rosemary.model import *
 from util import *
 
@@ -18,16 +19,21 @@
         self.updates = ConcurrentQueue()
         self.stats = UpdateStats(self.app)
 
-        self.thread = None
         self.conn = None
         self.read_cursor = None
+        self.write_cursor = None
 
         self.halt_on_error = False
 
     def init(self):
         self.conn = self.app.database.get_connection()
-        self.read_cursor = self.conn.cursor()
 
+        self.read_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+        self.write_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+
+        self.read_cursor.stats = self.stats
+        self.write_cursor.stats = self.stats
+
     def enqueue(self, update):
         self.updates.put(update)
 
@@ -51,17 +57,15 @@
 
             self.stats.dequeued += 1
 
-            update.thread = self
+            update.process(self)
 
-            update.process(self.conn, self.stats)
-
 class UpdateStats(object):
     names = ("*Enqueued", "*Dequeued", "Depth", "*Created", "*Updated",
-             "*Sampled", "*Deleted", "*Dropped",
-             "Errors", "CPU (%)", "Mem (M)")
-    headings = ("%10s  " * 11) % names
+             "*Sampled", "*Deleted", "*Dropped", "*Sql Ops",
+             "Errors", "Cpu (%)", "Mem (M)")
+    headings = ("%10s  " * 12) % names
     values_fmt = "%10.1f  %10.1f  %10i  %10.1f  %10.1f  %10.1f  %10.1f  " + \
-        "%10.1f  %10i  %10i  %10.1f"
+        "%10.1f  %10.1f  %10i  %10i  %10.1f"
 
     then = None
     now = None
@@ -76,6 +80,8 @@
         self.deleted = 0
         self.dropped = 0
 
+        self.sql_ops = 0
+
         self.errors = 0
 
         self.time = None
@@ -96,7 +102,10 @@
         UpdateStats.now = now
 
     def get_resident_pages(self):
-        line = open("/proc/%i/statm" % os.getpid()).read()
+        try:
+            line = open("/proc/%i/statm" % os.getpid()).read()
+        except:
+            return 0
 
         return int(line.split()[1])
 
@@ -115,7 +124,8 @@
                   self.now.updated - self.then.updated,
                   self.now.sampled - self.then.sampled,
                   self.now.deleted - self.then.deleted,
-                  self.now.dropped - self.then.dropped]
+                  self.now.dropped - self.then.dropped,
+                  self.now.sql_ops - self.then.sql_ops]
 
         secs = self.now.time - self.then.time
         values = map(lambda x: x / secs, values)
@@ -128,32 +138,41 @@
 
         print self.values_fmt % tuple(values)
         
+class UpdateCursor(Cursor):
+    def execute(self, sql, args=None):
+        super(UpdateCursor, self).execute(sql, args)
+
+        self.stats.sql_ops += 1
+
 class Update(object):
     def __init__(self, model):
         self.model = model
+        self.thread = None
 
-    def process(self, conn, stats):
+    def process(self, thread):
         log.debug("Processing %s", self)
 
         try:
-            self.do_process(conn, stats)
+            self.do_process(thread.write_cursor, thread.stats)
 
-            conn.commit()
+            thread.conn.commit()
         except UpdateException, e:
             log.info("Update could not be completed; %s", e)
 
-            conn.rollback()
+            thread.conn.rollback()
         except:
             log.exception("Update failed")
 
-            conn.rollback()
+            thread.conn.rollback()
 
-            stats.errors += 1
+            thread.stats.errors += 1
 
-            if self.thread.halt_on_error:
+            if thread.halt_on_error:
                 raise
 
-    def do_process(self, conn, stats):
+            #print_exc()
+
+    def do_process(self, cursor, stats):
         raise Exception("Not implemented")
 
     def __repr__(self):
@@ -166,7 +185,7 @@
         self.agent = agent
         self.object = obj
 
-    def do_process(self, conn, stats):
+    def do_process(self, cursor, stats):
         cls = self.get_class()
         obj = self.get_object(cls, self.object.getObjectId().objectName)
 
@@ -188,49 +207,44 @@
         self.process_properties(obj, object_columns)
         self.process_statistics(obj, object_columns, sample_columns)
 
-        cursor = conn.cursor()
+        if object_columns:
+            object_columns.append(cls.sql_table._qmf_update_time)
 
-        try:
-            if object_columns:
-                object_columns.append(cls.sql_table._qmf_update_time)
+            new = obj._sync_time is None
 
-                new = obj._sync_time is None
+            obj.save(cursor, object_columns)
 
-                obj.save(cursor, object_columns)
+            if new:
+                stats.created += 1
+            else:
+                stats.updated += 1
 
-                if new:
-                    stats.created += 1
-                else:
-                    stats.updated += 1
+        if sample_columns:
+            drop = False
 
-            if sample_columns:
-                drop = False
+            if stats.enqueued - stats.dequeued > 100:
+                # There's some pressure, so consider dropping samples
 
-                if stats.enqueued - stats.dequeued > 100:
-                    # There's some pressure, so consider dropping samples
+                now = datetime.now()
 
-                    now = datetime.now()
+                if update_time < now - minutes_ago:
+                    # The sample is too old
+                    drop = True
 
-                    if update_time < now - minutes_ago:
-                        # The sample is too old
-                        drop = True
+                if last_update_time and last_update_time > now - seconds_ago:
+                    # The samples are too fidelitous
+                    drop = True
 
-                    if last_update_time and last_update_time > now - seconds_ago:
-                        # The samples are too fidelitous
-                        drop = True
+            if drop:
+                stats.dropped += 1
+            else:
+                col = cls.sql_samples_table._qmf_update_time
+                sample_columns.append(col)
 
-                if drop:
-                    stats.dropped += 1
-                else:
-                    col = cls.sql_samples_table._qmf_update_time
-                    sample_columns.append(col)
+                obj.add_sample(cursor, sample_columns)
 
-                    obj.add_sample(cursor, sample_columns)
+                stats.sampled += 1
 
-                    stats.sampled += 1
-        finally:
-            cursor.close()
-
     def get_class(self):
         class_key = self.object.getClassKey()
 
@@ -255,7 +269,7 @@
         try:
             return self.agent.objects_by_id[object_id]
         except KeyError:
-            cursor = self.thread.read_cursor
+            cursor = self.model.app.update_thread.read_cursor
 
             obj = RosemaryObject(cls, None)
             obj._qmf_agent_id = self.agent.id
@@ -413,18 +427,13 @@
         return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
 
 class ObjectDelete(ObjectUpdate):
-    def do_process(self, conn, stats):
+    def do_process(self, cursor, stats):
         cls = self.get_class()
         obj = self.get_object(cls, self.object.getObjectId().objectName)
 
-        cursor = conn.cursor()
+        obj.delete(cursor)
 
         try:
-            cls.sql_delete.execute(cursor, (), obj.__dict__)
-        finally:
-            cursor.close()
-
-        try:
             del self.agent.objects_by_id[self.object.getObjectId().objectName]
         except KeyError:
             pass
@@ -437,18 +446,13 @@
 
         self.agent = agent
 
-    def do_process(self, conn, stats):
-        cursor = conn.cursor()
-
+    def do_process(self, cursor, stats):
         id = self.agent.id
 
-        try:
-            for pkg in self.model._packages:
-                for cls in pkg._classes:
-                    for obj in cls.get_selection(cursor, _qmf_agent_id=id):
-                        obj.delete(cursor)
-        finally:
-            cursor.close()
+        for pkg in self.model._packages:
+            for cls in pkg._classes:
+                for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+                    obj.delete(cursor)
 
 class UpdateException(Exception):
     def __init__(self, name):

Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py	2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/vacuum.py	2010-06-14 19:58:24 UTC (rev 4025)
@@ -15,26 +15,24 @@
             sleep(60 * 60 * 10)
 
 class VacuumUpdate(Update):
-    def do_process(self, conn, stats):
+    def do_process(self, cursor, stats):
         log.info("Vacumming tables")
 
+        conn = self.model.app.update_thread.conn
+
         level = conn.isolation_level
         conn.set_isolation_level(0)
 
         for pkg in self.model._packages:
             for cls in pkg._classes:
-                self.vacuum(conn, cls)
+                self.vacuum(cursor, cls)
 
         conn.set_isolation_level(level)
 
         log.info("Vacuumed tables")
 
-    def vacuum(self, conn, cls):
-        cursor = conn.cursor()
+    def vacuum(self, cursor, cls):
         sql = "vacuum verbose %s"
 
-        try:
-            cursor.execute(sql % cls.sql_table.identifier)
-            cursor.execute(sql % cls.sql_samples_table.identifier)
-        finally:
-            cursor.close()
+        cursor.execute(sql % cls.sql_table.identifier)
+        cursor.execute(sql % cls.sql_samples_table.identifier)



More information about the rhmessaging-commits mailing list