[rhmessaging-commits] rhmessaging commits: r4154 - mgmt/newdata/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Jul 27 16:31:59 EDT 2010


Author: justi9
Date: 2010-07-27 16:31:59 -0400 (Tue, 27 Jul 2010)
New Revision: 4154

Modified:
   mgmt/newdata/mint/python/mint/model.py
   mgmt/newdata/mint/python/mint/session.py
   mgmt/newdata/mint/python/mint/update.py
Log:
 * Add delete policies based on new agent state

 * Change all agent callbacks into queued agent updates; this fixes
   some data errors that arose from ordering problems


Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py	2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/model.py	2010-07-27 20:31:59 UTC (rev 4154)
@@ -1,6 +1,5 @@
 from rosemary.model import *
 
-from update import *
 from util import *
 
 log = logging.getLogger("mint.model")

Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py	2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/session.py	2010-07-27 20:31:59 UTC (rev 4154)
@@ -1,4 +1,4 @@
-from model import *
+from update import *
 from util import *
 
 from qmf.console import Console, Session
@@ -73,36 +73,27 @@
     def newAgent(self, qmf_agent):
         self.model.print_event(3, "Creating %s", qmf_agent)
 
-        MintAgent(self.model, qmf_agent)
+        up = AgentUpdate(self.model, qmf_agent)
+        self.model.app.update_thread.enqueue(up)
 
     def delAgent(self, qmf_agent):
         self.model.print_event(3, "Deleting %s", qmf_agent)
 
-        try:
-            agent = self.model.get_agent(qmf_agent)
-        except KeyError:
-            return
-
-        agent.delete()
-
         if not self.model.app.update_thread.isAlive():
             return
 
-        up = AgentDelete(self.model, agent)
+        up = AgentDelete(self.model, qmf_agent)
         self.model.app.update_thread.enqueue(up)
 
     def heartbeat(self, qmf_agent, timestamp):
         message = "Heartbeat from %s at %s"
         self.model.print_event(5, message, qmf_agent, timestamp)
 
-        timestamp = timestamp / 1000000000
-
-        try:
-            agent = self.model.get_agent(qmf_agent)
-        except KeyError:
+        if not self.model.app.update_thread.isAlive():
             return
 
-        agent.last_heartbeat = datetime.fromtimestamp(timestamp)
+        up = AgentUpdate(self.model, qmf_agent)
+        self.model.app.update_thread.enqueue(up)
 
     def newPackage(self, name):
         self.model.print_event(2, "New package %s", name)
@@ -110,22 +101,18 @@
     def newClass(self, kind, classKey):
         self.model.print_event(2, "New class %s", classKey)
 
-    def objectProps(self, broker, obj):
-        agent = self.model.get_agent(obj.getAgent())
-
+    def objectProps(self, broker, qmf_object):
         if not self.model.app.update_thread.isAlive():
             return
 
-        up = ObjectUpdate(self.model, agent, obj)
+        up = ObjectUpdate(self.model, qmf_object)
         self.model.app.update_thread.enqueue(up)
 
-    def objectStats(self, broker, obj):
-        agent = self.model.get_agent(obj.getAgent())
-
+    def objectStats(self, broker, qmf_object):
         if not self.model.app.update_thread.isAlive():
             return
 
-        up = ObjectUpdate(self.model, agent, obj)
+        up = ObjectUpdate(self.model, qmf_object)
         self.model.app.update_thread.enqueue(up)
 
     def event(self, broker, event):

Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/update.py	2010-07-27 20:31:59 UTC (rev 4154)
@@ -5,6 +5,8 @@
 from psycopg2 import IntegrityError, TimestampFromTicks
 from psycopg2.extensions import cursor as Cursor
 from rosemary.model import *
+
+from model import *
 from util import *
 
 log = logging.getLogger("mint.update")
@@ -156,11 +158,11 @@
 
             thread.stats.errors += 1
 
+            #print_exc()
+
             if thread.halt_on_error:
                 raise
 
-            #print_exc()
-
     def do_process(self, cursor, stats):
         raise Exception("Not implemented")
 
@@ -168,13 +170,18 @@
         return self.__class__.__name__
 
 class ObjectUpdate(Update):
-    def __init__(self, model, agent, object):
+    def __init__(self, model, qmf_object):
         super(ObjectUpdate, self).__init__(model)
 
-        self.agent = agent
-        self.object = object
+        self.qmf_object = qmf_object
+        self.agent = None
 
     def do_process(self, cursor, stats):
+        try:
+            self.agent = self.model.get_agent(self.qmf_object.getAgent())
+        except KeyError:
+            raise UpdateDropped()
+        
         cls = self.get_class()
         obj_id = self.get_object_id()
         obj = self.agent.get_object_by_id(obj_id)
@@ -185,14 +192,17 @@
             except RosemaryNotFound:
                 pass
 
-        update_time, create_time, delete_time = self.object.getTimestamps()
+        update_time, create_time, delete_time = self.qmf_object.getTimestamps()
 
         if obj:
             if delete_time != 0:
                 self.delete_object(cursor, stats, obj)
                 return
 
-            if not self.object.getProperties() and self.object.getStatistics():
+            properties = self.qmf_object.getProperties()
+            statistics = self.qmf_object.getStatistics()
+
+            if not properties and statistics:
                 # Just stats; do we want it?
                 # if stats.enqueued - stats.dequeued > 500:
 
@@ -210,7 +220,7 @@
 
             self.update_object(cursor, stats, obj)
         else:
-            if not self.object.getProperties():
+            if not self.qmf_object.getProperties():
                 raise UpdateDropped()
 
             if delete_time != 0:
@@ -223,7 +233,7 @@
         self.agent.add_object(obj)
 
     def get_class(self):
-        class_key = self.object.getClassKey()
+        class_key = self.qmf_object.getClassKey()
         name = class_key.getPackageName()
 
         try:
@@ -241,17 +251,17 @@
         return cls
 
     def get_object_id(self):
-        return self.object.getObjectId().objectName
+        return self.qmf_object.getObjectId().objectName
 
     def create_object(self, cursor, stats, cls):
-        update_time, create_time, delete_time = self.object.getTimestamps()
+        update_time, create_time, delete_time = self.qmf_object.getTimestamps()
         create_time = datetime.fromtimestamp(create_time / 1000000000)
         update_time = datetime.fromtimestamp(update_time / 1000000000)
 
         obj = cls.create_object(cursor)
         obj._qmf_agent_id = self.agent.id
         obj._qmf_object_id = self.get_object_id()
-        obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+        obj._qmf_session_id = str(self.qmf_object.getObjectId().getSequence())
         obj._qmf_create_time = create_time
         obj._qmf_update_time = update_time
             
@@ -294,7 +304,7 @@
         return obj
 
     def update_object(self, cursor, stats, obj):
-        update_time, create_time, delete_time = self.object.getTimestamps()
+        update_time, create_time, delete_time = self.qmf_object.getTimestamps()
         update_time = datetime.fromtimestamp(update_time / 1000000000)
 
         obj._qmf_update_time = update_time
@@ -345,7 +355,7 @@
     def process_properties(self, obj, columns, cursor):
         cls = obj._class
 
-        for prop, value in self.object.getProperties():
+        for prop, value in self.qmf_object.getProperties():
             try:
                 if prop.type == 10:
                     col, nvalue = self.process_reference \
@@ -409,7 +419,7 @@
         return col, value
 
     def process_statistics(self, obj, update_columns, insert_columns):
-        for stat, value in self.object.getStatistics():
+        for stat, value in self.qmf_object.getStatistics():
             try:
                 col = obj._class._statistics_by_name[stat.name].sql_column
             except KeyError:
@@ -452,43 +462,71 @@
 
             log.error("Qmf properties:")
 
-            for item in sorted(self.object.getProperties()):
+            for item in sorted(self.qmf_object.getProperties()):
                 log.error("    %-34s  %r", *item)
 
             log.error("Qmf statistics:")
 
-            for item in sorted(self.object.getStatistics()):
+            for item in sorted(self.qmf_object.getStatistics()):
                 log.error("    %-34s  %r", *item)
 
             raise
 
     def __repr__(self):
         name = self.__class__.__name__
-        cls = self.object.getClassKey().getClassName()
-        id = self.object.getObjectId().objectName
+        cls = self.qmf_object.getClassKey().getClassName()
+        id = self.qmf_object.getObjectId().objectName
 
         return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
 
-class AgentDelete(Update):
-    def __init__(self, model, agent):
-        super(AgentDelete, self).__init__(model)
+class AgentUpdate(Update):
+    def __init__(self, model, qmf_agent):
+        super(AgentUpdate, self).__init__(model)
 
-        self.agent = agent
+        self.qmf_agent = qmf_agent
 
     def do_process(self, cursor, stats):
-        id = self.agent.id
+        try:
+            agent = self.model.get_agent(self.qmf_agent)
+        except KeyError:
+            agent_id = self.qmf_agent.getAgentBank()
 
+            self.delete_agent_objects(cursor, stats, agent_id)
+
+            agent = MintAgent(self.model, self.qmf_agent)
+
+        #timestamp = timestamp / 1000000000
+        #agent.last_heartbeat = datetime.fromtimestamp(timestamp)
+
+        agent.last_heartbeat = datetime.now()
+
+        stats.updated += 1
+
+        # XXX Add periodic update of update_time
+
+    def delete_agent_objects(self, cursor, stats, agent_id):
         for pkg in self.model._packages:
             for cls in pkg._classes:
                 if cls._storage == "none":
                     continue
 
-                for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+                for obj in cls.get_selection(cursor, _qmf_agent_id=agent_id):
                     obj.delete(cursor)
 
                     self.model.print_event(3, "Deleted %s", obj)
                     stats.deleted += 1
 
+class AgentDelete(AgentUpdate):
+    def do_process(self, cursor, stats):
+        try:
+            agent = self.model.get_agent(qmf_agent)
+        except KeyError:
+            raise UpdateDropped()
+
+        agent.delete()
+
+        self.delete_agent_objects(cursor, stats, agent)
+
 class UpdateDropped(Exception):
     pass
 



More information about the rhmessaging-commits mailing list