Author: justi9
Date: 2010-06-14 15:58:24 -0400 (Mon, 14 Jun 2010)
New Revision: 4025
Modified:
mgmt/newdata/mint/python/mint/expire.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/mint/python/mint/vacuum.py
Log:
* Reuse a single dedicated write cursor
* Use RosemaryObject.delete
* Keep track of creations versus updates
* Track sql operation counts
Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/expire.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -33,7 +33,7 @@
sleep(frequency)
class ExpireUpdate(Update):
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
seconds = self.model.app.expire_threshold
log.info("Expiring samples older than %i seconds", seconds)
@@ -42,24 +42,17 @@
for pkg in self.model._packages:
for cls in pkg._classes:
- count += self.delete_samples(conn, cls, seconds)
+ count += self.delete_samples(cursor, cls, seconds)
- conn.commit()
-
log.info("Expired %i samples", count)
- def delete_samples(self, conn, cls, seconds):
- cursor = conn.cursor()
+ def delete_samples(self, cursor, cls, seconds):
+ cls.sql_samples_delete.execute(cursor, (), {"seconds": seconds})
- try:
- cls.sql_samples_delete.execute(cursor, (), {"seconds": seconds})
+ log.debug("Deleted %i %s", cursor.rowcount, cls)
- log.debug("Deleted %i %s", cursor.rowcount, cls)
+ return cursor.rowcount
- return cursor.rowcount
- finally:
- cursor.close()
-
def convert_time_units(t):
if t / (24 * 3600) >= 1:
t_out = t / (24 * 3600)
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/session.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -99,9 +99,6 @@
def newClass(self, kind, classKey):
log.info("New class %s", classKey)
- # XXX I want to store class keys using this, but I can't,
- # because I don't get any agent info; instead
-
def objectProps(self, broker, obj):
agent = self.model.get_agent(obj.getAgent())
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -3,6 +3,7 @@
import pickle
from psycopg2 import IntegrityError, TimestampFromTicks
+from psycopg2.extensions import cursor as Cursor
from rosemary.model import *
from util import *
@@ -18,16 +19,21 @@
self.updates = ConcurrentQueue()
self.stats = UpdateStats(self.app)
- self.thread = None
self.conn = None
self.read_cursor = None
+ self.write_cursor = None
self.halt_on_error = False
def init(self):
self.conn = self.app.database.get_connection()
- self.read_cursor = self.conn.cursor()
+ self.read_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+ self.write_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+
+ self.read_cursor.stats = self.stats
+ self.write_cursor.stats = self.stats
+
def enqueue(self, update):
self.updates.put(update)
@@ -51,17 +57,15 @@
self.stats.dequeued += 1
- update.thread = self
+ update.process(self)
- update.process(self.conn, self.stats)
-
class UpdateStats(object):
names = ("*Enqueued", "*Dequeued", "Depth",
"*Created", "*Updated",
- "*Sampled", "*Deleted", "*Dropped",
- "Errors", "CPU (%)", "Mem (M)")
- headings = ("%10s " * 11) % names
+ "*Sampled", "*Deleted", "*Dropped", "*Sql
Ops",
+ "Errors", "Cpu (%)", "Mem (M)")
+ headings = ("%10s " * 12) % names
values_fmt = "%10.1f %10.1f %10i %10.1f %10.1f %10.1f %10.1f " + \
- "%10.1f %10i %10i %10.1f"
+ "%10.1f %10.1f %10i %10i %10.1f"
then = None
now = None
@@ -76,6 +80,8 @@
self.deleted = 0
self.dropped = 0
+ self.sql_ops = 0
+
self.errors = 0
self.time = None
@@ -96,7 +102,10 @@
UpdateStats.now = now
def get_resident_pages(self):
- line = open("/proc/%i/statm" % os.getpid()).read()
+ try:
+ line = open("/proc/%i/statm" % os.getpid()).read()
+ except:
+ return 0
return int(line.split()[1])
@@ -115,7 +124,8 @@
self.now.updated - self.then.updated,
self.now.sampled - self.then.sampled,
self.now.deleted - self.then.deleted,
- self.now.dropped - self.then.dropped]
+ self.now.dropped - self.then.dropped,
+ self.now.sql_ops - self.then.sql_ops]
secs = self.now.time - self.then.time
values = map(lambda x: x / secs, values)
@@ -128,32 +138,41 @@
print self.values_fmt % tuple(values)
+class UpdateCursor(Cursor):
+ def execute(self, sql, args=None):
+ super(UpdateCursor, self).execute(sql, args)
+
+ self.stats.sql_ops += 1
+
class Update(object):
def __init__(self, model):
self.model = model
+ self.thread = None
- def process(self, conn, stats):
+ def process(self, thread):
log.debug("Processing %s", self)
try:
- self.do_process(conn, stats)
+ self.do_process(thread.write_cursor, thread.stats)
- conn.commit()
+ thread.conn.commit()
except UpdateException, e:
log.info("Update could not be completed; %s", e)
- conn.rollback()
+ thread.conn.rollback()
except:
log.exception("Update failed")
- conn.rollback()
+ thread.conn.rollback()
- stats.errors += 1
+ thread.stats.errors += 1
- if self.thread.halt_on_error:
+ if thread.halt_on_error:
raise
- def do_process(self, conn, stats):
+ #print_exc()
+
+ def do_process(self, cursor, stats):
raise Exception("Not implemented")
def __repr__(self):
@@ -166,7 +185,7 @@
self.agent = agent
self.object = obj
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
cls = self.get_class()
obj = self.get_object(cls, self.object.getObjectId().objectName)
@@ -188,49 +207,44 @@
self.process_properties(obj, object_columns)
self.process_statistics(obj, object_columns, sample_columns)
- cursor = conn.cursor()
+ if object_columns:
+ object_columns.append(cls.sql_table._qmf_update_time)
- try:
- if object_columns:
- object_columns.append(cls.sql_table._qmf_update_time)
+ new = obj._sync_time is None
- new = obj._sync_time is None
+ obj.save(cursor, object_columns)
- obj.save(cursor, object_columns)
+ if new:
+ stats.created += 1
+ else:
+ stats.updated += 1
- if new:
- stats.created += 1
- else:
- stats.updated += 1
+ if sample_columns:
+ drop = False
- if sample_columns:
- drop = False
+ if stats.enqueued - stats.dequeued > 100:
+ # There's some pressure, so consider dropping samples
- if stats.enqueued - stats.dequeued > 100:
- # There's some pressure, so consider dropping samples
+ now = datetime.now()
- now = datetime.now()
+ if update_time < now - minutes_ago:
+ # The sample is too old
+ drop = True
- if update_time < now - minutes_ago:
- # The sample is too old
- drop = True
+ if last_update_time and last_update_time > now - seconds_ago:
+ # The samples are too fidelitous
+ drop = True
- if last_update_time and last_update_time > now - seconds_ago:
- # The samples are too fidelitous
- drop = True
+ if drop:
+ stats.dropped += 1
+ else:
+ col = cls.sql_samples_table._qmf_update_time
+ sample_columns.append(col)
- if drop:
- stats.dropped += 1
- else:
- col = cls.sql_samples_table._qmf_update_time
- sample_columns.append(col)
+ obj.add_sample(cursor, sample_columns)
- obj.add_sample(cursor, sample_columns)
+ stats.sampled += 1
- stats.sampled += 1
- finally:
- cursor.close()
-
def get_class(self):
class_key = self.object.getClassKey()
@@ -255,7 +269,7 @@
try:
return self.agent.objects_by_id[object_id]
except KeyError:
- cursor = self.thread.read_cursor
+ cursor = self.model.app.update_thread.read_cursor
obj = RosemaryObject(cls, None)
obj._qmf_agent_id = self.agent.id
@@ -413,18 +427,13 @@
return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
class ObjectDelete(ObjectUpdate):
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
cls = self.get_class()
obj = self.get_object(cls, self.object.getObjectId().objectName)
- cursor = conn.cursor()
+ obj.delete(cursor)
try:
- cls.sql_delete.execute(cursor, (), obj.__dict__)
- finally:
- cursor.close()
-
- try:
del self.agent.objects_by_id[self.object.getObjectId().objectName]
except KeyError:
pass
@@ -437,18 +446,13 @@
self.agent = agent
- def do_process(self, conn, stats):
- cursor = conn.cursor()
-
+ def do_process(self, cursor, stats):
id = self.agent.id
- try:
- for pkg in self.model._packages:
- for cls in pkg._classes:
- for obj in cls.get_selection(cursor, _qmf_agent_id=id):
- obj.delete(cursor)
- finally:
- cursor.close()
+ for pkg in self.model._packages:
+ for cls in pkg._classes:
+ for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ obj.delete(cursor)
class UpdateException(Exception):
def __init__(self, name):
Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py 2010-06-14 15:55:33 UTC (rev 4024)
+++ mgmt/newdata/mint/python/mint/vacuum.py 2010-06-14 19:58:24 UTC (rev 4025)
@@ -15,26 +15,24 @@
sleep(60 * 60 * 10)
class VacuumUpdate(Update):
- def do_process(self, conn, stats):
+ def do_process(self, cursor, stats):
log.info("Vacumming tables")
+ conn = self.model.app.update_thread.conn
+
level = conn.isolation_level
conn.set_isolation_level(0)
for pkg in self.model._packages:
for cls in pkg._classes:
- self.vacuum(conn, cls)
+ self.vacuum(cursor, cls)
conn.set_isolation_level(level)
log.info("Vacuumed tables")
- def vacuum(self, conn, cls):
- cursor = conn.cursor()
+ def vacuum(self, cursor, cls):
sql = "vacuum verbose %s"
- try:
- cursor.execute(sql % cls.sql_table.identifier)
- cursor.execute(sql % cls.sql_samples_table.identifier)
- finally:
- cursor.close()
+ cursor.execute(sql % cls.sql_table.identifier)
+ cursor.execute(sql % cls.sql_samples_table.identifier)