Author: justi9
Date: 2010-06-14 11:55:33 -0400 (Mon, 14 Jun 2010)
New Revision: 4024
Modified:
mgmt/newdata/cumin/bin/cumin-data
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/mint/python/mint/util.py
Log:
* Use a single update class to handle prop and stat updates
* Improve reporting from cumin-data --print-stats
* Reuse a read cursor for get_object, a notable scalability gain
* Put in a workaround for some unexpectedly null data
* Quiet the debug logging a little
Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/cumin/bin/cumin-data 2010-06-14 15:55:33 UTC (rev 4024)
@@ -38,7 +38,7 @@
count = 0
if opts.print_stats:
- print "[Reported values are the number of events per second]"
+ print "[Starred columns are the number of events per second]"
while True:
if count % 24 == 0:
@@ -46,7 +46,7 @@
count += 1
- stats.print_rates()
+ stats.print_values()
sleep(5)
else:
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/session.py 2010-06-14 15:55:33 UTC (rev 4024)
@@ -108,12 +108,6 @@
if not self.model.app.update_thread.isAlive():
return
- # XXX objectProps is getting called even if no properties are
- # set
-
- if not obj.getProperties():
- return
-
if obj.getTimestamps()[2]:
up = ObjectDelete(self.model, agent, obj)
else:
@@ -127,7 +121,7 @@
if not self.model.app.update_thread.isAlive():
return
- up = ObjectAddSample(self.model, agent, obj)
+ up = ObjectUpdate(self.model, agent, obj)
self.model.app.update_thread.enqueue(up)
def event(self, broker, event):
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/update.py 2010-06-14 15:55:33 UTC (rev 4024)
@@ -1,4 +1,5 @@
import copy
+import resource
import pickle
from psycopg2 import IntegrityError, TimestampFromTicks
@@ -7,33 +8,36 @@
log = logging.getLogger("mint.update")
+minutes_ago = timedelta(minutes=5)
+seconds_ago = timedelta(seconds=60)
+
class UpdateThread(MintDaemonThread):
def __init__(self, app):
super(UpdateThread, self).__init__(app)
self.updates = ConcurrentQueue()
+ self.stats = UpdateStats(self.app)
- self.stats = UpdateStats(self.app)
+ self.thread = None
self.conn = None
+ self.read_cursor = None
- self.halt_on_error = True
+ self.halt_on_error = False
def init(self):
self.conn = self.app.database.get_connection()
+ self.read_cursor = self.conn.cursor()
def enqueue(self, update):
- update.thread = self
-
self.updates.put(update)
- if self.stats:
- self.stats.enqueued += 1
+ self.stats.enqueued += 1
# This is an attempt to yield from the enqueueing thread (this
# method's caller) to the update thread
if self.updates.qsize() > 1000:
- sleep(0.1)
+ sleep(0)
def run(self):
while True:
@@ -45,15 +49,19 @@
except Empty:
continue
- if self.stats:
- self.stats.dequeued += 1
+ self.stats.dequeued += 1
+ update.thread = self
+
update.process(self.conn, self.stats)
class UpdateStats(object):
- names = ("Enqueued", "Dequeued", "Updated",
"Deleted", "Dropped")
- headings = ("%8s " * 5) % names
- rates_fmt = ("%8.1f " * 5)
+ names = ("*Enqueued", "*Dequeued", "Depth",
"*Created", "*Updated",
+ "*Sampled", "*Deleted", "*Dropped",
+ "Errors", "CPU (%)", "Mem (M)")
+ headings = ("%10s " * 11) % names
+ values_fmt = "%10.1f %10.1f %10i %10.1f %10.1f %10.1f %10.1f " + \
+ "%10.1f %10i %10i %10.1f"
then = None
now = None
@@ -62,27 +70,40 @@
self.enqueued = 0
self.dequeued = 0
+ self.created = 0
self.updated = 0
+ self.sampled = 0
self.deleted = 0
self.dropped = 0
- self.samples_updated = 0
- self.samples_expired = 0
- self.samples_dropped = 0
+ self.errors = 0
self.time = None
+ self.cpu = 0
+ self.memory = 0
def capture(self):
now = copy.copy(self)
+
now.time = time.time()
+ rusage = resource.getrusage(resource.RUSAGE_SELF)
+
+ now.cpu = rusage[0] + rusage[1]
+ now.memory = self.get_resident_pages() * resource.getpagesize()
+
UpdateStats.then = UpdateStats.now
UpdateStats.now = now
+ def get_resident_pages(self):
+ line = open("/proc/%i/statm" % os.getpid()).read()
+
+ return int(line.split()[1])
+
def print_headings(self):
print self.headings
- def print_rates(self):
+ def print_values(self):
self.capture()
if not self.then:
@@ -90,18 +111,22 @@
values = [self.now.enqueued - self.then.enqueued,
self.now.dequeued - self.then.dequeued,
+ self.now.created - self.then.created,
self.now.updated - self.then.updated,
+ self.now.sampled - self.then.sampled,
self.now.deleted - self.then.deleted,
self.now.dropped - self.then.dropped]
- # XXX
- values[2] += self.now.samples_updated - self.then.samples_updated
- values[4] += self.now.samples_dropped - self.then.samples_dropped
-
secs = self.now.time - self.then.time
- rates = map(lambda x: x / secs, values)
+ values = map(lambda x: x / secs, values)
- print self.rates_fmt % tuple(rates)
+ values.insert(2, self.now.enqueued - self.now.dequeued)
+
+ values.append(self.errors)
+ values.append(int((self.now.cpu - self.then.cpu) / secs * 100))
+ values.append(self.now.memory / 1000000.0)
+
+ print self.values_fmt % tuple(values)
class Update(object):
def __init__(self, model):
@@ -123,7 +148,9 @@
conn.rollback()
- if self.model.app.update_thread.halt_on_error:
+ stats.errors += 1
+
+ if self.thread.halt_on_error:
raise
def do_process(self, conn, stats):
@@ -143,20 +170,67 @@
cls = self.get_class()
obj = self.get_object(cls, self.object.getObjectId().objectName)
- columns = list()
+ if not obj._sync_time and not self.object.getProperties():
+ # This is a sample for an object we don't have yet
+ stats.dropped += 1; return
- self.process_headers(obj, columns)
- self.process_properties(obj, columns)
+ update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+ last_update_time = obj._qmf_update_time
+
+ obj._qmf_update_time = update_time
+
+ object_columns = list()
+ sample_columns = list()
+
+ self.process_headers(obj, object_columns)
+ self.process_properties(obj, object_columns)
+ self.process_statistics(obj, object_columns, sample_columns)
+
cursor = conn.cursor()
try:
- obj.save(cursor, columns)
+ if object_columns:
+ object_columns.append(cls.sql_table._qmf_update_time)
+
+ new = obj._sync_time is None
+
+ obj.save(cursor, object_columns)
+
+ if new:
+ stats.created += 1
+ else:
+ stats.updated += 1
+
+ if sample_columns:
+ drop = False
+
+ if stats.enqueued - stats.dequeued > 100:
+ # There's some pressure, so consider dropping samples
+
+ now = datetime.now()
+
+ if update_time < now - minutes_ago:
+ # The sample is too old
+ drop = True
+
+ if last_update_time and last_update_time > now - seconds_ago:
+ # The samples are too fidelitous
+ drop = True
+
+ if drop:
+ stats.dropped += 1
+ else:
+ col = cls.sql_samples_table._qmf_update_time
+ sample_columns.append(col)
+
+ obj.add_sample(cursor, sample_columns)
+
+ stats.sampled += 1
finally:
cursor.close()
- stats.updated += 1
-
def get_class(self):
class_key = self.object.getClassKey()
@@ -181,20 +255,19 @@
try:
return self.agent.objects_by_id[object_id]
except KeyError:
- conn = self.model.app.database.get_connection()
- cursor = conn.cursor()
+ cursor = self.thread.read_cursor
obj = RosemaryObject(cls, None)
obj._qmf_agent_id = self.agent.id
obj._qmf_object_id = object_id
+ #try:
try:
- try:
- cls.load_object_by_qmf_id(cursor, obj)
- except RosemaryNotFound:
- obj._id = cls.get_new_id(cursor)
- finally:
- cursor.close()
+ cls.load_object_by_qmf_id(cursor, obj)
+ except RosemaryNotFound:
+ obj._id = cls.get_new_id(cursor)
+ #finally:
+ # cursor.close()
self.agent.objects_by_id[object_id] = obj
@@ -204,26 +277,21 @@
table = obj._class.sql_table
update_time, create_time, delete_time = self.object.getTimestamps()
-
- update_time = datetime.fromtimestamp(update_time / 1000000000)
create_time = datetime.fromtimestamp(create_time / 1000000000)
if delete_time:
delete_time = datetime.fromtimestamp(delete_time / 1000000000)
- if obj._sync_time:
- # This object is already in the database
+ obj._qmf_delete_time = delete_time
+ columns.append(table._qmf_delete_time)
- obj._qmf_update_time = update_time
- columns.append(table._qmf_update_time)
+ if not obj._sync_time:
+ # The object hasn't been written to the database yet
- # XXX session_id may have changed too?
- else:
obj._qmf_agent_id = self.agent.id
obj._qmf_object_id = self.object.getObjectId().objectName
obj._qmf_session_id = str(self.object.getObjectId().getSequence())
obj._qmf_class_key = str(self.object.getClassKey())
- obj._qmf_update_time = update_time
obj._qmf_create_time = create_time
columns.append(table._id)
@@ -231,7 +299,6 @@
columns.append(table._qmf_object_id)
columns.append(table._qmf_session_id)
columns.append(table._qmf_class_key)
- columns.append(table._qmf_update_time)
columns.append(table._qmf_create_time)
def process_properties(self, obj, columns):
@@ -244,7 +311,7 @@
else:
col, nvalue = self.process_value(cls, prop, value)
except MappingException, e:
- log.debug(e)
+ #log.debug(e)
continue
# XXX This optimization will be obsolete when QMF does it
@@ -310,6 +377,34 @@
return value
+ def process_statistics(self, obj, update_columns, insert_columns):
+ for stat, value in self.object.getStatistics():
+ try:
+ col = obj._class._statistics_by_name[stat.name].sql_column
+ except KeyError:
+ log.debug("Statistic %s is unknown", stat)
+
+ continue
+
+ if value is not None:
+ value = self.transform_value(stat, value)
+
+ # XXX hack workaround
+ if col.name == "MonitorSelfTime":
+ value = datetime.now()
+
+ # Don't write unchanged values
+ #
+ # XXX This optimization will be obsolete when QMF does it
+ # instead
+
+ if value != getattr(obj, col.name):
+ update_columns.append(col)
+
+ insert_columns.append(col)
+
+ setattr(obj, col.name, value)
+
def __repr__(self):
name = self.__class__.__name__
cls = self.object.getClassKey().getClassName()
@@ -336,71 +431,6 @@
stats.deleted += 1
-class ObjectAddSample(ObjectUpdate):
- def do_process(self, conn, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
-
- if not cls._statistics:
- stats.samples_dropped += 1; return
-
- if not obj._sync_time:
- stats.samples_dropped += 1; return
-
- if stats.enqueued - stats.dequeued > 100:
- if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
- stats.samples_dropped += 1; return
-
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- update_time = datetime.fromtimestamp(update_time / 1000000000)
-
- update_columns = list()
- update_columns.append(cls.sql_table._qmf_update_time)
-
- insert_columns = list()
- insert_columns.append(cls.sql_samples_table._qmf_update_time)
-
- obj._qmf_update_time = update_time
-
- self.process_samples(obj, update_columns, insert_columns)
-
- cursor = conn.cursor()
-
- try:
- obj.save(cursor, update_columns)
-
- cls.sql_samples_insert.execute \
- (cursor, insert_columns, obj.__dict__)
- finally:
- cursor.close()
-
- stats.samples_updated += 1
-
- def process_samples(self, obj, update_columns, insert_columns):
- for stat, value in self.object.getStatistics():
- try:
- col = obj._class._statistics_by_name[stat.name].sql_column
- except KeyError:
- log.debug("Statistic %s is unknown", stat)
-
- continue
-
- if value is not None:
- value = self.transform_value(stat, value)
-
- # Don't write unchanged values
- #
- # XXX This optimization will be obsolete when QMF does it
- # instead
-
- if value != getattr(obj, col.name):
- update_columns.append(col)
-
- insert_columns.append(col)
-
- setattr(obj, col.name, value)
-
class AgentDelete(Update):
def __init__(self, model, agent):
super(AgentDelete, self).__init__(model)
@@ -408,8 +438,6 @@
self.agent = agent
def do_process(self, conn, stats):
- return # XXX don't delete until we stop getting unexpected deletes
-
cursor = conn.cursor()
id = self.agent.id
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-06-14 12:54:06 UTC (rev 4023)
+++ mgmt/newdata/mint/python/mint/util.py 2010-06-14 15:55:33 UTC (rev 4024)
@@ -7,6 +7,7 @@
from Queue import Queue as ConcurrentQueue, Full, Empty
from crypt import crypt
from datetime import datetime, timedelta
+from pprint import pprint
from getpass import getpass
from qmf.console import ObjectId
from random import sample