Author: justi9
Date: 2010-05-12 17:50:17 -0400 (Wed, 12 May 2010)
New Revision: 3966
Added:
mgmt/newdata/mint/python/mint/update.py
Removed:
mgmt/newdata/mint/python/mint/newupdate.py
Modified:
mgmt/newdata/mint/python/mint/expire.py
mgmt/newdata/mint/python/mint/main.py
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/vacuum.py
Log:
Rename the newupdate module to update, now that the old one is gone
Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py 2010-05-12 19:43:32 UTC (rev 3965)
+++ mgmt/newdata/mint/python/mint/expire.py 2010-05-12 21:50:17 UTC (rev 3966)
@@ -1,4 +1,4 @@
-from newupdate import *
+from update import *
from util import *
import mint
Modified: mgmt/newdata/mint/python/mint/main.py
===================================================================
--- mgmt/newdata/mint/python/mint/main.py 2010-05-12 19:43:32 UTC (rev 3965)
+++ mgmt/newdata/mint/python/mint/main.py 2010-05-12 21:50:17 UTC (rev 3966)
@@ -1,8 +1,8 @@
from database import MintDatabase
from expire import ExpireThread
from model import MintModel
-from newupdate import UpdateThread
from session import MintSession
+from update import UpdateThread
from vacuum import VacuumThread
from util import *
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-05-12 19:43:32 UTC (rev 3965)
+++ mgmt/newdata/mint/python/mint/model.py 2010-05-12 21:50:17 UTC (rev 3966)
@@ -1,6 +1,6 @@
from rosemary.model import *
-from newupdate import *
+from update import *
from util import *
log = logging.getLogger("mint.model")
Deleted: mgmt/newdata/mint/python/mint/newupdate.py
===================================================================
--- mgmt/newdata/mint/python/mint/newupdate.py 2010-05-12 19:43:32 UTC (rev 3965)
+++ mgmt/newdata/mint/python/mint/newupdate.py 2010-05-12 21:50:17 UTC (rev 3966)
@@ -1,403 +0,0 @@
-import pickle
-
-from psycopg2 import IntegrityError, TimestampFromTicks
-from rosemary.model import *
-from util import *
-
-log = logging.getLogger("mint.newupdate")
-
-class UpdateThread(MintDaemonThread):
- def __init__(self, app):
- super(UpdateThread, self).__init__(app)
-
- self.conn = None
- self.stats = None
-
- self.updates = ConcurrentQueue()
-
- self.halt_on_error = True
-
- def init(self):
- self.conn = self.app.database.get_connection()
- self.stats = UpdateStats()
-
- def enqueue(self, update):
- update.thread = self
-
- self.updates.put(update)
-
- if self.stats:
- self.stats.enqueued += 1
-
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
-
- if self.updates.qsize() > 1000:
- sleep(0.1)
-
- def run(self):
- while True:
- if self.stop_requested:
- break
-
- try:
- update = self.updates.get(True, 1)
- except Empty:
- continue
-
- if self.stats:
- self.stats.dequeued += 1
-
- update.process(self.conn, self.stats)
-
-class UpdateStats(object):
- def __init__(self):
- self.enqueued = 0
- self.dequeued = 0
-
- self.updated = 0
- self.deleted = 0
- self.dropped = 0
-
- self.samples_updated = 0
- self.samples_expired = 0
- self.samples_dropped = 0
-
-class Update(object):
- def __init__(self, model):
- self.model = model
-
- def process(self, conn, stats):
- log.debug("Processing %s", self)
-
- try:
- self.do_process(conn, stats)
-
- conn.commit()
- except UpdateException, e:
- log.info("Update could not be completed; %s", e)
-
- conn.rollback()
- except:
- log.exception("Update failed")
-
- conn.rollback()
-
- if self.model.app.update_thread.halt_on_error:
- raise
-
- def do_process(self, conn, stats):
- raise Exception("Not implemented")
-
- def __repr__(self):
- return self.__class__.__name__
-
-class ObjectUpdate(Update):
- def __init__(self, model, agent, obj):
- super(ObjectUpdate, self).__init__(model)
-
- self.agent = agent
- self.object = obj
-
- def do_process(self, conn, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
-
- columns = list()
-
- self.process_headers(obj, columns)
- self.process_properties(obj, columns)
-
- cursor = conn.cursor()
-
- try:
- obj.save(cursor, columns)
- finally:
- cursor.close()
-
- stats.updated += 1
-
- def get_class(self):
- class_key = self.object.getClassKey()
-
- name = class_key.getPackageName()
-
- try:
- pkg = self.model._packages_by_name[name]
- except KeyError:
- raise PackageUnknown(name)
-
- name = class_key.getClassName()
- name = name[0].upper() + name[1:] # /me shakes fist
-
- try:
- cls = pkg._classes_by_name[name]
- except KeyError:
- raise ClassUnknown(name)
-
- return cls
-
- def get_object(self, cls, object_id):
- try:
- return self.agent.objects_by_id[object_id]
- except KeyError:
- conn = self.model.app.database.get_connection()
- cursor = conn.cursor()
-
- obj = RosemaryObject(cls, None)
- obj._qmf_agent_id = self.agent.id
- obj._qmf_object_id = object_id
-
- try:
- try:
- cls.load_object_by_qmf_id(cursor, obj)
- except RosemaryNotFound:
- obj._id = cls.get_new_id(cursor)
- finally:
- cursor.close()
-
- self.agent.objects_by_id[object_id] = obj
-
- return obj
-
- def process_headers(self, obj, columns):
- table = obj._class.sql_table
-
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- update_time = datetime.fromtimestamp(update_time / 1000000000)
- create_time = datetime.fromtimestamp(create_time / 1000000000)
-
- if delete_time:
- delete_time = datetime.fromtimestamp(delete_time / 1000000000)
-
- if obj._sync_time:
- # This object is already in the database
-
- obj._qmf_update_time = update_time
- columns.append(table._qmf_update_time)
-
- # XXX session_id may have changed too?
- else:
- obj._qmf_agent_id = self.agent.id
- obj._qmf_object_id = self.object.getObjectId().objectName
- obj._qmf_session_id = str(self.object.getObjectId().getSequence())
- obj._qmf_class_key = str(self.object.getClassKey())
- obj._qmf_update_time = update_time
- obj._qmf_create_time = create_time
-
- columns.append(table._id)
- columns.append(table._qmf_agent_id)
- columns.append(table._qmf_object_id)
- columns.append(table._qmf_session_id)
- columns.append(table._qmf_class_key)
- columns.append(table._qmf_update_time)
- columns.append(table._qmf_create_time)
-
- def process_properties(self, obj, columns):
- cls = obj._class
-
- for prop, value in self.object.getProperties():
- try:
- if prop.type == 10:
- col, nvalue = self.process_reference(cls, prop, value)
- else:
- col, nvalue = self.process_value(cls, prop, value)
- except MappingException, e:
- log.debug(e)
- continue
-
- # XXX This optimization will be obsolete when QMF does it
- # instead
-
- if nvalue == getattr(obj, col.name):
- continue
-
- setattr(obj, col.name, nvalue)
- columns.append(col)
-
- def process_reference(self, cls, prop, value):
- try:
- ref = cls._references_by_name[prop.name]
- except KeyError:
- raise MappingException("Reference %s is unknown" % prop.name)
-
- if not ref.sql_column:
- raise MappingException("Reference %s has no column" % ref.name)
-
- col = ref.sql_column
-
- if value:
- try:
- that_id = str(value.objectName)
- except:
- raise MappingException("XXX ref isn't an oid")
-
- that = self.get_object(ref.that_cls, that_id)
-
- if not that._sync_time:
- msg = "Referenced object %s hasn't appeared yet"
- raise MappingException(msg % that)
-
- value = that._id
-
- return col, value
-
- def process_value(self, cls, prop, value):
- try:
- col = cls._properties_by_name[prop.name].sql_column
- except KeyError:
- raise MappingException("Property %s is unknown" % prop)
-
- if value is not None:
- value = self.transform_value(prop, value)
-
- return col, value
-
- def transform_value(self, attr, value):
- if attr.type == 8: # absTime
- if value == 0:
- value = None
- else:
- value = datetime.fromtimestamp(value / 1000000000)
- # XXX value = TimestampFromTicks(value / 1000000000)
- elif attr.type == 15: # map
- value = pickle.dumps(value)
- elif attr.type == 10: # objId
- value = str(value)
- elif attr.type == 14: # uuid
- value = str(value)
-
- return value
-
- def __repr__(self):
- name = self.__class__.__name__
- cls = self.object.getClassKey().getClassName()
- id = self.object.getObjectId().objectName
-
- return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
-
-class ObjectDelete(ObjectUpdate):
- def do_process(self, conn, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
-
- cursor = conn.cursor()
-
- try:
- cls.sql_delete.execute(cursor, (), obj.__dict__)
- finally:
- cursor.close()
-
- try:
- del self.agent.objects_by_id[self.object.getObjectId().objectName]
- except KeyError:
- pass
-
- stats.deleted += 1
-
-class ObjectAddSample(ObjectUpdate):
- def do_process(self, conn, stats):
- cls = self.get_class()
- obj = self.get_object(cls, self.object.getObjectId().objectName)
-
- if not cls._statistics:
- stats.samples_dropped += 1; return
-
- if not obj._sync_time:
- stats.samples_dropped += 1; return
-
- if stats.enqueued - stats.dequeued > 100:
- if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
- stats.samples_dropped += 1; return
-
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- update_time = datetime.fromtimestamp(update_time / 1000000000)
-
- update_columns = list()
- update_columns.append(cls.sql_table._qmf_update_time)
-
- insert_columns = list()
- insert_columns.append(cls.sql_samples_table._qmf_update_time)
-
- obj._qmf_update_time = update_time
-
- self.process_samples(obj, update_columns, insert_columns)
-
- cursor = conn.cursor()
-
- try:
- obj.save(cursor, update_columns)
-
- cls.sql_samples_insert.execute \
- (cursor, insert_columns, obj.__dict__)
- finally:
- cursor.close()
-
- stats.samples_updated += 1
-
- def process_samples(self, obj, update_columns, insert_columns):
- for stat, value in self.object.getStatistics():
- try:
- col = obj._class._statistics_by_name[stat.name].sql_column
- except KeyError:
- log.debug("Statistic %s is unknown", stat)
-
- continue
-
- if value is not None:
- value = self.transform_value(stat, value)
-
- # Don't write unchanged values
- #
- # XXX This optimization will be obsolete when QMF does it
- # instead
-
- if value != getattr(obj, col.name):
- update_columns.append(col)
-
- insert_columns.append(col)
-
- setattr(obj, col.name, value)
-
-class AgentDelete(Update):
- def __init__(self, model, agent):
- super(AgentDelete, self).__init__(model)
-
- self.agent = agent
-
- def do_process(self, conn, stats):
- print "Ahoy!"
-
- cursor = conn.cursor()
-
- id = self.agent.id
-
- try:
- for pkg in self.model._packages:
- for cls in pkg._classes:
- for obj in cls.get_selection(cursor, _qmf_agent_id=id):
- obj.delete(cursor)
- print "Bam!", obj
- finally:
- cursor.close()
-
-class UpdateException(Exception):
- def __init__(self, name):
- self.name = name
-
- def __str__(self):
- return "%s(%s)" % (self.__class__.__name__, self.name)
-
-class PackageUnknown(UpdateException):
- pass
-
-class ClassUnknown(UpdateException):
- pass
-
-class ObjectUnknown(UpdateException):
- pass
-
-class MappingException(Exception):
- pass
Copied: mgmt/newdata/mint/python/mint/update.py (from rev 3963,
mgmt/newdata/mint/python/mint/newupdate.py)
===================================================================
--- mgmt/newdata/mint/python/mint/update.py (rev 0)
+++ mgmt/newdata/mint/python/mint/update.py 2010-05-12 21:50:17 UTC (rev 3966)
@@ -0,0 +1,403 @@
+import pickle
+
+from psycopg2 import IntegrityError, TimestampFromTicks
+from rosemary.model import *
+from util import *
+
+log = logging.getLogger("mint.update")
+
+class UpdateThread(MintDaemonThread):
+ def __init__(self, app):
+ super(UpdateThread, self).__init__(app)
+
+ self.conn = None
+ self.stats = None
+
+ self.updates = ConcurrentQueue()
+
+ self.halt_on_error = True
+
+ def init(self):
+ self.conn = self.app.database.get_connection()
+ self.stats = UpdateStats()
+
+ def enqueue(self, update):
+ update.thread = self
+
+ self.updates.put(update)
+
+ if self.stats:
+ self.stats.enqueued += 1
+
+ # This is an attempt to yield from the enqueueing thread (this
+ # method's caller) to the update thread
+
+ if self.updates.qsize() > 1000:
+ sleep(0.1)
+
+ def run(self):
+ while True:
+ if self.stop_requested:
+ break
+
+ try:
+ update = self.updates.get(True, 1)
+ except Empty:
+ continue
+
+ if self.stats:
+ self.stats.dequeued += 1
+
+ update.process(self.conn, self.stats)
+
+class UpdateStats(object):
+ def __init__(self):
+ self.enqueued = 0
+ self.dequeued = 0
+
+ self.updated = 0
+ self.deleted = 0
+ self.dropped = 0
+
+ self.samples_updated = 0
+ self.samples_expired = 0
+ self.samples_dropped = 0
+
+class Update(object):
+ def __init__(self, model):
+ self.model = model
+
+ def process(self, conn, stats):
+ log.debug("Processing %s", self)
+
+ try:
+ self.do_process(conn, stats)
+
+ conn.commit()
+ except UpdateException, e:
+ log.info("Update could not be completed; %s", e)
+
+ conn.rollback()
+ except:
+ log.exception("Update failed")
+
+ conn.rollback()
+
+ if self.model.app.update_thread.halt_on_error:
+ raise
+
+ def do_process(self, conn, stats):
+ raise Exception("Not implemented")
+
+ def __repr__(self):
+ return self.__class__.__name__
+
+class ObjectUpdate(Update):
+ def __init__(self, model, agent, obj):
+ super(ObjectUpdate, self).__init__(model)
+
+ self.agent = agent
+ self.object = obj
+
+ def do_process(self, conn, stats):
+ cls = self.get_class()
+ obj = self.get_object(cls, self.object.getObjectId().objectName)
+
+ columns = list()
+
+ self.process_headers(obj, columns)
+ self.process_properties(obj, columns)
+
+ cursor = conn.cursor()
+
+ try:
+ obj.save(cursor, columns)
+ finally:
+ cursor.close()
+
+ stats.updated += 1
+
+ def get_class(self):
+ class_key = self.object.getClassKey()
+
+ name = class_key.getPackageName()
+
+ try:
+ pkg = self.model._packages_by_name[name]
+ except KeyError:
+ raise PackageUnknown(name)
+
+ name = class_key.getClassName()
+ name = name[0].upper() + name[1:] # /me shakes fist
+
+ try:
+ cls = pkg._classes_by_name[name]
+ except KeyError:
+ raise ClassUnknown(name)
+
+ return cls
+
+ def get_object(self, cls, object_id):
+ try:
+ return self.agent.objects_by_id[object_id]
+ except KeyError:
+ conn = self.model.app.database.get_connection()
+ cursor = conn.cursor()
+
+ obj = RosemaryObject(cls, None)
+ obj._qmf_agent_id = self.agent.id
+ obj._qmf_object_id = object_id
+
+ try:
+ try:
+ cls.load_object_by_qmf_id(cursor, obj)
+ except RosemaryNotFound:
+ obj._id = cls.get_new_id(cursor)
+ finally:
+ cursor.close()
+
+ self.agent.objects_by_id[object_id] = obj
+
+ return obj
+
+ def process_headers(self, obj, columns):
+ table = obj._class.sql_table
+
+ update_time, create_time, delete_time = self.object.getTimestamps()
+
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+ create_time = datetime.fromtimestamp(create_time / 1000000000)
+
+ if delete_time:
+ delete_time = datetime.fromtimestamp(delete_time / 1000000000)
+
+ if obj._sync_time:
+ # This object is already in the database
+
+ obj._qmf_update_time = update_time
+ columns.append(table._qmf_update_time)
+
+ # XXX session_id may have changed too?
+ else:
+ obj._qmf_agent_id = self.agent.id
+ obj._qmf_object_id = self.object.getObjectId().objectName
+ obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+ obj._qmf_class_key = str(self.object.getClassKey())
+ obj._qmf_update_time = update_time
+ obj._qmf_create_time = create_time
+
+ columns.append(table._id)
+ columns.append(table._qmf_agent_id)
+ columns.append(table._qmf_object_id)
+ columns.append(table._qmf_session_id)
+ columns.append(table._qmf_class_key)
+ columns.append(table._qmf_update_time)
+ columns.append(table._qmf_create_time)
+
+ def process_properties(self, obj, columns):
+ cls = obj._class
+
+ for prop, value in self.object.getProperties():
+ try:
+ if prop.type == 10:
+ col, nvalue = self.process_reference(cls, prop, value)
+ else:
+ col, nvalue = self.process_value(cls, prop, value)
+ except MappingException, e:
+ log.debug(e)
+ continue
+
+ # XXX This optimization will be obsolete when QMF does it
+ # instead
+
+ if nvalue == getattr(obj, col.name):
+ continue
+
+ setattr(obj, col.name, nvalue)
+ columns.append(col)
+
+ def process_reference(self, cls, prop, value):
+ try:
+ ref = cls._references_by_name[prop.name]
+ except KeyError:
+ raise MappingException("Reference %s is unknown" % prop.name)
+
+ if not ref.sql_column:
+ raise MappingException("Reference %s has no column" % ref.name)
+
+ col = ref.sql_column
+
+ if value:
+ try:
+ that_id = str(value.objectName)
+ except:
+ raise MappingException("XXX ref isn't an oid")
+
+ that = self.get_object(ref.that_cls, that_id)
+
+ if not that._sync_time:
+ msg = "Referenced object %s hasn't appeared yet"
+ raise MappingException(msg % that)
+
+ value = that._id
+
+ return col, value
+
+ def process_value(self, cls, prop, value):
+ try:
+ col = cls._properties_by_name[prop.name].sql_column
+ except KeyError:
+ raise MappingException("Property %s is unknown" % prop)
+
+ if value is not None:
+ value = self.transform_value(prop, value)
+
+ return col, value
+
+ def transform_value(self, attr, value):
+ if attr.type == 8: # absTime
+ if value == 0:
+ value = None
+ else:
+ value = datetime.fromtimestamp(value / 1000000000)
+ # XXX value = TimestampFromTicks(value / 1000000000)
+ elif attr.type == 15: # map
+ value = pickle.dumps(value)
+ elif attr.type == 10: # objId
+ value = str(value)
+ elif attr.type == 14: # uuid
+ value = str(value)
+
+ return value
+
+ def __repr__(self):
+ name = self.__class__.__name__
+ cls = self.object.getClassKey().getClassName()
+ id = self.object.getObjectId().objectName
+
+ return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
+
+class ObjectDelete(ObjectUpdate):
+ def do_process(self, conn, stats):
+ cls = self.get_class()
+ obj = self.get_object(cls, self.object.getObjectId().objectName)
+
+ cursor = conn.cursor()
+
+ try:
+ cls.sql_delete.execute(cursor, (), obj.__dict__)
+ finally:
+ cursor.close()
+
+ try:
+ del self.agent.objects_by_id[self.object.getObjectId().objectName]
+ except KeyError:
+ pass
+
+ stats.deleted += 1
+
+class ObjectAddSample(ObjectUpdate):
+ def do_process(self, conn, stats):
+ cls = self.get_class()
+ obj = self.get_object(cls, self.object.getObjectId().objectName)
+
+ if not cls._statistics:
+ stats.samples_dropped += 1; return
+
+ if not obj._sync_time:
+ stats.samples_dropped += 1; return
+
+ if stats.enqueued - stats.dequeued > 100:
+ if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
+ stats.samples_dropped += 1; return
+
+ update_time, create_time, delete_time = self.object.getTimestamps()
+
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+
+ update_columns = list()
+ update_columns.append(cls.sql_table._qmf_update_time)
+
+ insert_columns = list()
+ insert_columns.append(cls.sql_samples_table._qmf_update_time)
+
+ obj._qmf_update_time = update_time
+
+ self.process_samples(obj, update_columns, insert_columns)
+
+ cursor = conn.cursor()
+
+ try:
+ obj.save(cursor, update_columns)
+
+ cls.sql_samples_insert.execute \
+ (cursor, insert_columns, obj.__dict__)
+ finally:
+ cursor.close()
+
+ stats.samples_updated += 1
+
+ def process_samples(self, obj, update_columns, insert_columns):
+ for stat, value in self.object.getStatistics():
+ try:
+ col = obj._class._statistics_by_name[stat.name].sql_column
+ except KeyError:
+ log.debug("Statistic %s is unknown", stat)
+
+ continue
+
+ if value is not None:
+ value = self.transform_value(stat, value)
+
+ # Don't write unchanged values
+ #
+ # XXX This optimization will be obsolete when QMF does it
+ # instead
+
+ if value != getattr(obj, col.name):
+ update_columns.append(col)
+
+ insert_columns.append(col)
+
+ setattr(obj, col.name, value)
+
+class AgentDelete(Update):
+ def __init__(self, model, agent):
+ super(AgentDelete, self).__init__(model)
+
+ self.agent = agent
+
+ def do_process(self, conn, stats):
+ print "Ahoy!"
+
+ cursor = conn.cursor()
+
+ id = self.agent.id
+
+ try:
+ for pkg in self.model._packages:
+ for cls in pkg._classes:
+ for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ obj.delete(cursor)
+ print "Bam!", obj
+ finally:
+ cursor.close()
+
+class UpdateException(Exception):
+ def __init__(self, name):
+ self.name = name
+
+ def __str__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.name)
+
+class PackageUnknown(UpdateException):
+ pass
+
+class ClassUnknown(UpdateException):
+ pass
+
+class ObjectUnknown(UpdateException):
+ pass
+
+class MappingException(Exception):
+ pass
Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py 2010-05-12 19:43:32 UTC (rev 3965)
+++ mgmt/newdata/mint/python/mint/vacuum.py 2010-05-12 21:50:17 UTC (rev 3966)
@@ -1,4 +1,4 @@
-from newupdate import *
+from update import *
from util import *
log = logging.getLogger("mint.vacuum")