[rhmessaging-commits] rhmessaging commits: r4069 - in mgmt/newdata: rosemary/python/rosemary and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Jul 2 15:58:53 EDT 2010


Author: justi9
Date: 2010-07-02 15:58:53 -0400 (Fri, 02 Jul 2010)
New Revision: 4069

Modified:
   mgmt/newdata/mint/python/mint/model.py
   mgmt/newdata/mint/python/mint/session.py
   mgmt/newdata/mint/python/mint/update.py
   mgmt/newdata/rosemary/python/rosemary/model.py
Log:
 * Remove _qmf_class_key from the schema; the new qmf call
   implementation doesn't need it

 * Handle deletes as a type of update

 * Reorganize object update into three distinct cases: create, update,
   and delete; this fell out from the data corruption fix

 * Be more aggressive about dropping too-old or too-recent samples

 * Use an exception to signal a dropped update; this avoids any extra
   communication with the database in this case

 * Fix get_object_by_qmf_id in rosemary, and use it in update.py

 * Remove sampled from the stats; they are always accounted for in
   created or updated


Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py	2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/mint/python/mint/model.py	2010-07-02 19:58:53 UTC (rev 4069)
@@ -80,27 +80,18 @@
 
         self.model = None
 
-    def get_object(self, cursor, cls, object_id):
+    def get_object_by_id(self, object_id):
         try:
             return self.objects_by_id[object_id]
         except KeyError:
-            obj = RosemaryObject(cls, None)
-            obj._qmf_agent_id = self.id
-            obj._qmf_object_id = object_id
+            pass
 
-            try:
-                cls.load_object_by_qmf_id(cursor, obj)
-            except RosemaryNotFound:
-                obj._id = cls.get_new_id(cursor)
-
-            return obj
-
     def add_object(self, obj):
         self.objects_by_id[obj._qmf_object_id] = obj
 
-    def delete_object(self, object_id):
+    def delete_object(self, obj):
         try:
-            del self.objects_by_id[object_id]
+            del self.objects_by_id[obj._qmf_object_id]
         except KeyError:
             pass
 

Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py	2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/mint/python/mint/session.py	2010-07-02 19:58:53 UTC (rev 4069)
@@ -111,11 +111,7 @@
         if not self.model.app.update_thread.isAlive():
             return
 
-        if obj.getTimestamps()[2]:
-            up = ObjectDelete(self.model, agent, obj)
-        else:
-            up = ObjectUpdate(self.model, agent, obj)
-
+        up = ObjectUpdate(self.model, agent, obj)
         self.model.app.update_thread.enqueue(up)
 
     def objectStats(self, broker, obj):

Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/mint/python/mint/update.py	2010-07-02 19:58:53 UTC (rev 4069)
@@ -51,10 +51,10 @@
 
 class UpdateStats(object):
     names = ("*Enqueued", "*Dequeued", "Depth", "*Created", "*Updated",
-             "*Sampled", "*Deleted", "*Dropped", "*Sql Ops",
-             "Errors", "Cpu (%)", "Mem (M)")
-    headings = ("%10s  " * 12) % names
-    values_fmt = "%10.1f  %10.1f  %10i  %10.1f  %10.1f  %10.1f  %10.1f  " + \
+             "*Deleted", "*Dropped", "*Sql Ops", "Errors", "Cpu (%)",
+             "Mem (M)")
+    headings = ("%10s  " * 11) % names
+    values_fmt = "%10.1f  %10.1f  %10i  %10.1f  %10.1f  %10.1f  " + \
         "%10.1f  %10.1f  %10i  %10i  %10.1f"
 
     then = None
@@ -66,7 +66,6 @@
 
         self.created = 0
         self.updated = 0
-        self.sampled = 0
         self.deleted = 0
         self.dropped = 0
 
@@ -112,7 +111,6 @@
                   self.now.dequeued - self.then.dequeued,
                   self.now.created - self.then.created,
                   self.now.updated - self.then.updated,
-                  self.now.sampled - self.then.sampled,
                   self.now.deleted - self.then.deleted,
                   self.now.dropped - self.then.dropped,
                   self.now.sql_ops - self.then.sql_ops]
@@ -145,8 +143,10 @@
             self.do_process(thread.cursor, thread.stats)
 
             thread.conn.commit()
+        except UpdateDropped:
+            thread.stats.dropped += 1
         except UpdateException, e:
-            log.info("Update could not be completed; %s", e)
+            log.exception("Update could not be completed")
 
             thread.conn.rollback()
         except:
@@ -168,171 +168,180 @@
         return self.__class__.__name__
 
 class ObjectUpdate(Update):
-    def __init__(self, model, agent, obj):
+    def __init__(self, model, agent, object):
         super(ObjectUpdate, self).__init__(model)
 
         self.agent = agent
-        self.object = obj
+        self.object = object
 
     def do_process(self, cursor, stats):
-        cls, obj = self.get_object(cursor)
+        cls = self.get_class()
+        obj_id = self.get_object_id()
+        obj = self.agent.get_object_by_id(obj_id)
 
+        if not obj:
+            try:
+                obj = cls.get_object_by_qmf_id(cursor, self.agent.id, obj_id)
+            except RosemaryNotFound:
+                pass
+
         update_time, create_time, delete_time = self.object.getTimestamps()
-        update_time = datetime.fromtimestamp(update_time / 1000000000)
 
-        now = datetime.now()
+        if obj:
+            if delete_time != 0:
+                self.delete_object(cursor, stats, obj)
+                return
 
-        if self.object.getStatistics() and not self.object.getProperties():
-            # Just stats; do we want it?
+            if not self.object.getProperties() and self.object.getStatistics():
+                # Just stats; do we want it?
+                # if stats.enqueued - stats.dequeued > 500:
 
-            if not obj._sync_time:
-                # We don't have this object yet
-                stats.dropped += 1; return
+                now = datetime.now()
+                update = datetime.fromtimestamp(update_time / 1000000000)
+                sample = obj._sample_time
 
-            if stats.enqueued - stats.dequeued > 500:
-                if update_time < now - minutes_ago:
+                if update < now - minutes_ago:
                     # The sample is too old
-                    stats.dropped += 1; return
+                    raise UpdateDropped()
 
-                if obj._sample_time and obj._sample_time > now - seconds_ago:
+                if sample and sample > now - seconds_ago:
                     # The samples are too fidelitous
-                    stats.dropped += 1; return
+                    raise UpdateDropped()
 
-        obj._qmf_update_time = update_time
+            self.update_object(cursor, stats, obj)
+        else:
+            if not self.object.getProperties():
+                raise UpdateDropped()
 
-        object_columns = list()
-        sample_columns = list()
+            if delete_time != 0:
+                raise UpdateDropped()
 
-        self.process_headers(obj, object_columns)
-        self.process_properties(obj, object_columns, cursor)
-        self.process_statistics(obj, object_columns, sample_columns)
+            obj = self.create_object(cursor, stats, cls)
 
-        statements = list()
+        assert obj
 
-        if object_columns:
-            object_columns.append(cls.sql_table._qmf_update_time)
+        self.agent.add_object(obj)
 
-            if obj._sync_time:
-                sql = cls.sql_update.emit(object_columns)
-                stats.updated += 1
+    def get_class(self):
+        class_key = self.object.getClassKey()
+        name = class_key.getPackageName()
 
-                self.model.print_event(4, "Updating %s", obj)
-            else:
-                sql = cls.sql_insert.emit(object_columns)
-                stats.created += 1
+        try:
+            pkg = self.model._packages_by_name[name]
+        except KeyError:
+            raise PackageUnknown(name)
 
-                message = "Creating %s, from %s"
-                self.model.print_event(3, message, obj, self.agent)
+        name = class_key.getClassName()
 
-            statements.append(sql)
+        try:
+            cls = pkg._classes_by_lowercase_name[name.lower()]
+        except KeyError:
+            raise ClassUnknown(name)
+        
+        return cls
 
-        if sample_columns:
-            sample_columns.append(cls.sql_samples_table._qmf_update_time)
+    def get_object_id(self):
+        return self.object.getObjectId().objectName
 
-            sql = cls.sql_samples_insert.emit(sample_columns)
-            statements.append(sql)
+    def create_object(self, cursor, stats, cls):
+        update_time, create_time, delete_time = self.object.getTimestamps()
+        create_time = datetime.fromtimestamp(create_time / 1000000000)
+        update_time = datetime.fromtimestamp(update_time / 1000000000)
 
-            stats.sampled += 1
-            obj._sample_time = update_time
+        obj = cls.create_object(cursor)
+        obj._qmf_agent_id = self.agent.id
+        obj._qmf_object_id = self.get_object_id()
+        obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+        obj._qmf_create_time = create_time
+        obj._qmf_update_time = update_time
+            
+        object_columns = list()
+        sample_columns = list()
 
-        if statements:
-            text = "; ".join(statements)
+        table = cls.sql_table
 
-            try:
-                cursor.execute(text, obj.__dict__)
-            except:
-                log.exception("%s failed", self)
+        object_columns.append(table._id)
+        object_columns.append(table._qmf_agent_id)
+        object_columns.append(table._qmf_object_id)
+        object_columns.append(table._qmf_session_id)
+        object_columns.append(table._qmf_create_time)
+        object_columns.append(table._qmf_update_time)
 
-                log.error("Sql text: %s", text)
-                log.error("Sql values:")
+        self.process_properties(obj, object_columns, cursor)
+        self.process_statistics(obj, object_columns, sample_columns)
 
-                for item in sorted(obj.__dict__.items()):
-                    log.error("    %-34s  %r", *item)
+        statements = list()
 
-                log.error("Sql object columns:")
+        sql = cls.sql_insert.emit(object_columns)
+        statements.append(sql)
 
-                for item in sorted(object_columns):
-                    log.error("    %-34s", item)
+        if sample_columns:
+            sample_columns.append(cls.sql_samples_table._qmf_update_time)
 
-                log.error("Sql sample columns:")
+            sql = cls.sql_samples_insert.emit(sample_columns)
+            statements.append(sql)
 
-                for item in sorted(sample_columns):
-                    log.error("    %-34s", item)
+            obj._sample_time = datetime.now()
 
-                log.error("Sql row count: %i", cursor.rowcount)
+        sql = "; ".join(statements)
+        self.execute_sql(cursor, sql, obj.__dict__)
 
-                log.error("Qmf properties:")
+        obj._sync_time = datetime.now()
 
-                for item in sorted(self.object.getProperties()):
-                    log.error("    %-34s  %r", *item)
+        self.model.print_event(3, "Created %s", obj)
+        stats.created += 1
 
-                log.error("Qmf statistics:")
+        return obj
 
-                for item in sorted(self.object.getStatistics()):
-                    log.error("    %-34s  %r", *item)
+    def update_object(self, cursor, stats, obj):
+        update_time, create_time, delete_time = self.object.getTimestamps()
+        update_time = datetime.fromtimestamp(update_time / 1000000000)
 
-                raise
+        obj._qmf_update_time = update_time
 
-            obj._sync_time = now
+        object_columns = list()
+        sample_columns = list()
 
-            self.add_object(obj)
-        else:
-            stats.dropped += 1
+        cls = obj._class
 
-    def get_object(self, cursor):
-        class_key = self.object.getClassKey()
+        self.process_properties(obj, object_columns, cursor)
+        self.process_statistics(obj, object_columns, sample_columns)
 
-        name = class_key.getPackageName()
+        statements = list()
 
-        try:
-            pkg = self.model._packages_by_name[name]
-        except KeyError:
-            raise PackageUnknown(name)
+        if object_columns:
+            object_columns.append(cls.sql_table._qmf_update_time)
 
-        name = class_key.getClassName()
+            sql = cls.sql_update.emit(object_columns)
+            statements.append(sql)
 
-        try:
-            cls = pkg._classes_by_lowercase_name[name.lower()]
-        except KeyError:
-            raise ClassUnknown(name)
+        if sample_columns:
+            sample_columns.append(cls.sql_samples_table._qmf_update_time)
 
-        object_id = self.object.getObjectId().objectName
+            sql = cls.sql_samples_insert.emit(sample_columns)
+            statements.append(sql)
 
-        obj = self.agent.get_object(cursor, cls, object_id)
-        
-        return cls, obj
+            obj._sample_time = datetime.now()
 
-    def add_object(self, obj):
-        self.agent.add_object(obj)
+        if not statements:
+            raise UpdateDropped()
 
-    def process_headers(self, obj, columns):
-        table = obj._class.sql_table
+        sql = "; ".join(statements)
+        self.execute_sql(cursor, sql, obj.__dict__)
 
-        update_time, create_time, delete_time = self.object.getTimestamps()
-        create_time = datetime.fromtimestamp(create_time / 1000000000)
+        obj._sync_time = datetime.now()
 
-        if delete_time:
-            delete_time = datetime.fromtimestamp(delete_time / 1000000000)
+        self.model.print_event(4, "Updated %s", obj)
+        stats.updated += 1
 
-            obj._qmf_delete_time = delete_time
-            columns.append(table._qmf_delete_time)
+    def delete_object(self, cursor, stats, obj):
+        obj.delete(cursor)
 
-        if not obj._sync_time:
-            # The object hasn't been written to the database yet
+        self.agent.delete_object(obj)
 
-            obj._qmf_agent_id = self.agent.id
-            obj._qmf_object_id = self.object.getObjectId().objectName
-            obj._qmf_session_id = str(self.object.getObjectId().getSequence())
-            obj._qmf_class_key = str(self.object.getClassKey())
-            obj._qmf_create_time = create_time
+        self.model.print_event(3, "Deleted %s", obj)
+        stats.deleted += 1
 
-            columns.append(table._id)
-            columns.append(table._qmf_agent_id)
-            columns.append(table._qmf_object_id)
-            columns.append(table._qmf_session_id)
-            columns.append(table._qmf_class_key)
-            columns.append(table._qmf_create_time)
-
     def process_properties(self, obj, columns, cursor):
         cls = obj._class
 
@@ -373,11 +382,16 @@
             except:
                 raise MappingException("Reference isn't an oid")
 
-            that = self.agent.get_object(cursor, ref.that_cls, that_id)
+            # XXX obviously won't work across agents
+            that = self.agent.get_object_by_id(that_id)
 
-            if not that._sync_time:
-                msg = "Referenced object %s hasn't appeared yet"
-                raise MappingException(msg % that)
+            if not that:
+                try:
+                    that = ref.that_cls.get_object_by_qmf_id \
+                        (cursor, self.agent.id, that_id)
+                except RosemaryNotFound:
+                    msg = "Referenced object %s hasn't appeared yet"
+                    raise MappingException(msg % that)
 
             value = that._id
 
@@ -422,6 +436,32 @@
 
             setattr(obj, col.name, value)
 
+    def execute_sql(self, cursor, text, args):
+        try:
+            cursor.execute(text, args)
+        except:
+            log.exception("%s failed", self)
+
+            log.error("Sql text: %s", text)
+            log.error("Sql values:")
+
+            for item in sorted(args.items()):
+                log.error("    %-34s  %r", *item)
+
+            log.error("Sql row count: %i", cursor.rowcount)
+
+            log.error("Qmf properties:")
+
+            for item in sorted(self.object.getProperties()):
+                log.error("    %-34s  %r", *item)
+
+            log.error("Qmf statistics:")
+
+            for item in sorted(self.object.getStatistics()):
+                log.error("    %-34s  %r", *item)
+
+            raise
+
     def __repr__(self):
         name = self.__class__.__name__
         cls = self.object.getClassKey().getClassName()
@@ -429,18 +469,6 @@
 
         return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
 
-class ObjectDelete(ObjectUpdate):
-    def do_process(self, cursor, stats):
-        cls, obj = self.get_object(cursor)
-
-        self.model.print_event(3, "Deleting %s, from %s", obj, self.agent)
-
-        obj.delete(cursor)
-
-        self.agent.delete_object(obj._qmf_object_id)
-
-        stats.deleted += 1
-
 class AgentDelete(Update):
     def __init__(self, model, agent):
         super(AgentDelete, self).__init__(model)
@@ -460,6 +488,9 @@
 
                     stats.deleted += 1
 
+class UpdateDropped(Exception):
+    pass
+
 class UpdateException(Exception):
     def __init__(self, name):
         self.name = name

Modified: mgmt/newdata/rosemary/python/rosemary/model.py
===================================================================
--- mgmt/newdata/rosemary/python/rosemary/model.py	2010-07-02 19:37:43 UTC (rev 4068)
+++ mgmt/newdata/rosemary/python/rosemary/model.py	2010-07-02 19:58:53 UTC (rev 4069)
@@ -213,10 +213,6 @@
         self._qmf_session_id = RosemaryHeader(self, name, sql_text)
         self._qmf_session_id.title = "Session ID"
 
-        name = "_qmf_class_key"
-        self._qmf_class_key = RosemaryHeader(self, name, sql_text)
-        self._qmf_class_key.title = "Class Key"
-
         name = "_qmf_create_time"
         self._qmf_create_time = RosemaryHeader(self, name, sql_timestamp)
         self._qmf_create_time.title = "Create Time"
@@ -377,13 +373,11 @@
         return obj
 
     def get_object_by_qmf_id(self, cursor, agent_id, object_id):
-        assert isinstance(obj, RosemaryObject)
-
         obj = RosemaryObject(self, None)
         obj._qmf_agent_id = agent_id
         obj._qmf_object_id = object_id
 
-        self.load_object_qmf_id(cursor, obj)
+        self.load_object_by_qmf_id(cursor, obj)
 
         return obj
 
@@ -692,7 +686,6 @@
         self._qmf_object_id = str(datetime.now())
         self._qmf_create_time = datetime.now()
         self._qmf_update_time = datetime.now()
-        self._qmf_class_key = "__rosemary__"
 
     def get_title(self):
         if self._class._object_title:



More information about the rhmessaging-commits mailing list