[rhmessaging-commits] rhmessaging commits: r4047 - mgmt/newdata/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Jun 24 09:34:15 EDT 2010


Author: justi9
Date: 2010-06-24 09:34:15 -0400 (Thu, 24 Jun 2010)
New Revision: 4047

Modified:
   mgmt/newdata/mint/python/mint/model.py
   mgmt/newdata/mint/python/mint/session.py
   mgmt/newdata/mint/python/mint/update.py
Log:
 * Move get_object to MintAgent; add a delete_object there

 * Set a max queue size for the update thread; this obviates the yield
   logic that was present before

 * Use just one cursor, and pipe it through to functions where needed

 * Use the incoming qmf update time to reckon whether a sample is too
   fidelitous

 * A more performant method for dispatching to value transform
   functions

 * Move model agent delete to after the AgentDelete update is
   completed


Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py	2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/model.py	2010-06-24 13:34:15 UTC (rev 4047)
@@ -78,5 +78,28 @@
 
         self.model = None
 
+    def get_object(self, cursor, cls, object_id):
+        try:
+            return self.objects_by_id[object_id]
+        except KeyError:
+            obj = RosemaryObject(cls, None)
+            obj._qmf_agent_id = self.id
+            obj._qmf_object_id = object_id
+
+            try:
+                cls.load_object_by_qmf_id(cursor, obj)
+            except RosemaryNotFound:
+                obj._id = cls.get_new_id(cursor)
+
+            self.objects_by_id[object_id] = obj
+
+            return obj
+
+    def delete_object(self, object_id):
+        try:
+            del self.objects_by_id[object_id]
+        except KeyError:
+            pass
+
     def __repr__(self):
         return "%s(%s)" % (self.__class__.__name__, self.id)

Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py	2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/session.py	2010-06-24 13:34:15 UTC (rev 4047)
@@ -78,8 +78,6 @@
         except KeyError:
             return
 
-        agent.delete()
-
         if not self.model.app.update_thread.isAlive():
             return
 

Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-06-21 21:51:13 UTC (rev 4046)
+++ mgmt/newdata/mint/python/mint/update.py	2010-06-24 13:34:15 UTC (rev 4047)
@@ -16,35 +16,25 @@
     def __init__(self, app):
         super(UpdateThread, self).__init__(app)
 
-        self.updates = ConcurrentQueue()
+        self.updates = ConcurrentQueue(maxsize=1000)
         self.stats = UpdateStats(self.app)
 
         self.conn = None
-        self.read_cursor = None
-        self.write_cursor = None
+        self.cursor = None
 
         self.halt_on_error = False
 
     def init(self):
         self.conn = self.app.database.get_connection()
 
-        self.read_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
-        self.write_cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+        self.cursor = self.conn.cursor(cursor_factory=UpdateCursor)
+        self.cursor.stats = self.stats
 
-        self.read_cursor.stats = self.stats
-        self.write_cursor.stats = self.stats
-
     def enqueue(self, update):
         self.updates.put(update)
 
         self.stats.enqueued += 1
 
-        # This is an attempt to yield from the enqueueing thread (this
-        # method's caller) to the update thread
-
-        if self.updates.qsize() > 1000:
-            sleep(0)
-
     def run(self):
         while True:
             if self.stop_requested:
@@ -152,7 +142,7 @@
         log.debug("Processing %s", self)
 
         try:
-            self.do_process(thread.write_cursor, thread.stats)
+            self.do_process(thread.cursor, thread.stats)
 
             thread.conn.commit()
         except UpdateException, e:
@@ -185,8 +175,7 @@
         self.object = obj
 
     def do_process(self, cursor, stats):
-        cls = self.get_class()
-        obj = self.get_object(cls, self.object.getObjectId().objectName)
+        cls, obj = self.get_object(cursor)
 
         update_time, create_time, delete_time = self.object.getTimestamps()
         update_time = datetime.fromtimestamp(update_time / 1000000000)
@@ -200,7 +189,7 @@
                 # We don't have this object yet
                 stats.dropped += 1; return
 
-            if stats.enqueued - stats.dequeued > 1000:
+            if stats.enqueued - stats.dequeued > 500:
                 if update_time < now - minutes_ago:
                     # The sample is too old
                     stats.dropped += 1; return
@@ -215,7 +204,7 @@
         sample_columns = list()
 
         self.process_headers(obj, object_columns)
-        self.process_properties(obj, object_columns)
+        self.process_properties(obj, object_columns, cursor)
         self.process_statistics(obj, object_columns, sample_columns)
 
         statements = list()
@@ -246,7 +235,7 @@
             statements.append(sql)
 
             stats.sampled += 1
-            obj._sample_time = now
+            obj._sample_time = update_time
 
         if statements:
             text = "; ".join(statements)
@@ -268,7 +257,7 @@
         else:
             stats.dropped += 1
 
-    def get_class(self):
+    def get_object(self, cursor):
         class_key = self.object.getClassKey()
 
         name = class_key.getPackageName()
@@ -284,31 +273,13 @@
             cls = pkg._classes_by_lowercase_name[name.lower()]
         except KeyError:
             raise ClassUnknown(name)
-        
-        return cls
 
-    def get_object(self, cls, object_id):
-        try:
-            return self.agent.objects_by_id[object_id]
-        except KeyError:
-            cursor = self.model.app.update_thread.read_cursor
+        object_id = self.object.getObjectId().objectName
 
-            obj = RosemaryObject(cls, None)
-            obj._qmf_agent_id = self.agent.id
-            obj._qmf_object_id = object_id
+        obj = self.agent.get_object(cursor, cls, object_id)
+        
+        return cls, obj
 
-            #try:
-            try:
-                cls.load_object_by_qmf_id(cursor, obj)
-            except RosemaryNotFound:
-                obj._id = cls.get_new_id(cursor)
-            #finally:
-            #    cursor.close()
-
-            self.agent.objects_by_id[object_id] = obj
-
-            return obj
-
     def process_headers(self, obj, columns):
         table = obj._class.sql_table
 
@@ -337,13 +308,14 @@
             columns.append(table._qmf_class_key)
             columns.append(table._qmf_create_time)
 
-    def process_properties(self, obj, columns):
+    def process_properties(self, obj, columns, cursor):
         cls = obj._class
 
         for prop, value in self.object.getProperties():
             try:
                 if prop.type == 10:
-                    col, nvalue = self.process_reference(cls, prop, value)
+                    col, nvalue = self.process_reference \
+                        (cls, prop, value, cursor)
                 else:
                     col, nvalue = self.process_value(cls, prop, value)
             except MappingException, e:
@@ -359,7 +331,7 @@
             setattr(obj, col.name, nvalue)
             columns.append(col)
 
-    def process_reference(self, cls, prop, value):
+    def process_reference(self, cls, prop, value, cursor):
         try:
             ref = cls._references_by_name[prop.name]
         except KeyError:
@@ -374,9 +346,9 @@
             try:
                 that_id = str(value.objectName)
             except:
-                raise MappingException("XXX ref isn't an oid")
+                raise MappingException("Reference isn't an oid")
 
-            that = self.get_object(ref.that_cls, that_id)
+            that = self.agent.get_object(cursor, ref.that_cls, that_id)
 
             if not that._sync_time:
                 msg = "Referenced object %s hasn't appeared yet"
@@ -393,26 +365,10 @@
             raise MappingException("Property %s is unknown" % prop)
 
         if value is not None:
-            value = self.transform_value(prop, value)
+            value = transform_value(prop, value)
 
         return col, value
 
-    def transform_value(self, attr, value):
-        if attr.type == 8: # absTime
-            if value == 0:
-                value = None
-            else:
-                value = datetime.fromtimestamp(value / 1000000000)
-                # XXX value = TimestampFromTicks(value / 1000000000)
-        elif attr.type == 15: # map
-            value = pickle.dumps(value)
-        elif attr.type == 10: # objId
-            value = str(value)
-        elif attr.type == 14: # uuid
-            value = str(value)
-
-        return value
-
     def process_statistics(self, obj, update_columns, insert_columns):
         for stat, value in self.object.getStatistics():
             try:
@@ -423,7 +379,7 @@
                 continue
 
             if value is not None:
-                value = self.transform_value(stat, value)
+                value = transform_value(stat, value)
 
             # XXX hack workaround
             if col.name == "MonitorSelfTime":
@@ -450,17 +406,13 @@
 
 class ObjectDelete(ObjectUpdate):
     def do_process(self, cursor, stats):
-        cls = self.get_class()
-        obj = self.get_object(cls, self.object.getObjectId().objectName)
+        cls, obj = self.get_object(cursor)
 
         self.model.print_event(3, "Deleting %s, from %s", obj, self.agent)
 
         obj.delete(cursor)
 
-        try:
-            del self.agent.objects_by_id[self.object.getObjectId().objectName]
-        except KeyError:
-            pass
+        self.agent.delete_object(obj._qmf_object_id)
 
         stats.deleted += 1
 
@@ -483,6 +435,8 @@
 
                     stats.deleted += 1
 
+        self.agent.delete()
+
 class UpdateException(Exception):
     def __init__(self, name):
         self.name = name
@@ -501,3 +455,23 @@
 
 class MappingException(Exception):
     pass
+
+def transform_default(value):
+    return value
+
+def transform_timestamp(value):
+    if value != 0:
+        return datetime.fromtimestamp(value / 1000000000)
+
+def transform_pickle(value):
+    return pickle.dumps(x)
+
+transformers = list([transform_default for x in range(32)])
+
+transformers[8] = transform_timestamp
+transformers[10] = str
+transformers[14] = str
+transformers[15] = transform_pickle
+
+def transform_value(attr, value):
+    return transformers[attr.type](value)



More information about the rhmessaging-commits mailing list