[rhmessaging-commits] rhmessaging commits: r4288 - in mgmt/newdata: mint/python/mint and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Sep 14 16:55:51 EDT 2010


Author: justi9
Date: 2010-09-14 16:55:50 -0400 (Tue, 14 Sep 2010)
New Revision: 4288

Modified:
   mgmt/newdata/cumin/python/cumin/session.py
   mgmt/newdata/mint/python/mint/model.py
   mgmt/newdata/mint/python/mint/update.py
   mgmt/newdata/mint/python/mint/util.py
Log:
For bz 632296.  Qualify v1 agent ids using broker host and port.  This
is to prevent v1 agent id collisions that cause data corruption.

This change also addresses a data ordering problem that prevented
brokers and other objects from appearing when they were updated
infrequently (or never).


Modified: mgmt/newdata/cumin/python/cumin/session.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/session.py	2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/cumin/python/cumin/session.py	2010-09-14 20:55:50 UTC (rev 4288)
@@ -1,5 +1,6 @@
 from model import *
 from util import *
+from mint.util import make_agent_id
 
 from qmf.console import Console, Session, ClassKey, ObjectId
 
@@ -132,18 +133,22 @@
     def newAgent(self, qmf_agent):
         log.debug("New agent %s", qmf_agent)
 
+        agent_id = make_agent_id(qmf_agent)
+
         self.session.lock.acquire()
         try:
-            self.session.qmf_agents[qmf_agent.getAgentBank()] = qmf_agent
+            self.session.qmf_agents[agent_id] = qmf_agent
         finally:
             self.session.lock.release()
 
     def delAgent(self, qmf_agent):
         log.debug("Deleting agent %s", qmf_agent)
 
+        agent_id = make_agent_id(qmf_agent)
+
         self.session.lock.acquire()
         try:
-            del self.session.qmf_agents[qmf_agent.getAgentBank()]
+            del self.session.qmf_agents[agent_id]
         finally:
             self.session.lock.release()
 

Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py	2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/mint/python/mint/model.py	2010-09-14 20:55:50 UTC (rev 4288)
@@ -16,7 +16,7 @@
         # int seq => callable
         self.outstanding_method_calls = dict()
 
-        self.lock = RLock()
+        self.lock = Lock()
 
     def check(self):
         log.info("Checking %s", self)
@@ -49,6 +49,7 @@
         self.last_heartbeat = None
 
         self.objects_by_id = dict()
+        self.deferred_links_by_id = defaultdict(list)
 
         assert self.id not in self.model.agents_by_id
         self.model.agents_by_id[self.id] = self
@@ -60,5 +61,14 @@
 
         self.model = None
 
+    def get_object(self, cursor, cls, object_id):
+        try:
+            obj = self.objects_by_id[object_id]
+        except KeyError:
+            obj = cls.get_object_by_qmf_id(cursor, self.id, object_id)
+            self.objects_by_id[object_id] = obj
+
+        return obj
+
     def __repr__(self):
         return "%s(%s)" % (self.__class__.__name__, self.id)

Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/mint/python/mint/update.py	2010-09-14 20:55:50 UTC (rev 4288)
@@ -176,24 +176,19 @@
         self.qmf_object = qmf_object
 
     def do_process(self, cursor, stats):
+        cls = self.get_class()
         agent_id = self.get_agent_id()
+        object_id = self.get_object_id()
 
         try:
             agent = self.model.agents_by_id[agent_id]
         except KeyError:
             raise UpdateDropped()
         
-        cls = self.get_class()
-        obj_id = self.get_object_id()
-        obj = None
-
         try:
-            obj = agent.objects_by_id[obj_id]
-        except KeyError:
-            try:
-                obj = cls.get_object_by_qmf_id(cursor, agent_id, obj_id)
-            except RosemaryNotFound:
-                pass
+            obj = agent.get_object(cursor, cls, object_id)
+        except RosemaryNotFound:
+            obj = None
 
         update_time, create_time, delete_time = self.qmf_object.getTimestamps()
 
@@ -236,10 +231,8 @@
 
         assert obj
 
-        agent.objects_by_id[obj_id] = obj
-
     def get_agent_id(self):
-        return self.qmf_object.getObjectId().agentName
+        return make_agent_id(self.qmf_object.getAgent())
 
     def get_class(self):
         class_key = self.qmf_object.getClassKey()
@@ -272,7 +265,7 @@
         obj._qmf_object_id = self.get_object_id()
         obj._qmf_create_time = create_time
         obj._qmf_update_time = update_time
-            
+
         object_columns = list()
         sample_columns = list()
 
@@ -303,6 +296,8 @@
         sql = "; ".join(statements)
         self.execute_sql(cursor, sql, obj.__dict__)
 
+        self.process_deferred_links(cursor, obj)
+
         obj._sync_time = datetime.now()
 
         self.model.print_event(3, "Created %s", obj)
@@ -310,6 +305,15 @@
 
         return obj
 
+    def process_deferred_links(self, cursor, obj):
+        agent = self.model.agents_by_id[obj._qmf_agent_id]
+        links = agent.deferred_links_by_id[obj._qmf_object_id]
+
+        for link in links:
+            link.realize(cursor, obj)
+
+        del agent.deferred_links_by_id[obj._qmf_object_id]
+
     def update_object(self, cursor, stats, obj):
         update_time, create_time, delete_time = self.qmf_object.getTimestamps()
         update_time = datetime.fromtimestamp(update_time / 1000000000)
@@ -364,7 +368,7 @@
             try:
                 if prop.type == 10:
                     col, nvalue = self.process_reference \
-                        (cls, prop, value, cursor)
+                        (obj, prop, value, cursor)
                 else:
                     col, nvalue = self.process_value(cls, prop, value)
             except MappingException, e:
@@ -380,36 +384,43 @@
             setattr(obj, col.name, nvalue)
             columns.append(col)
 
-    def process_reference(self, cls, prop, value, cursor):
+    def process_reference(self, obj, prop, oid, cursor):
         try:
-            ref = cls._references_by_name[prop.name]
+            ref = obj._class._references_by_name[prop.name]
         except KeyError:
             raise MappingException("Reference %s is unknown" % prop.name)
 
         if not ref.sql_column:
             raise MappingException("Reference %s has no column" % ref.name)
 
-        col = ref.sql_column
+        value = None
 
-        if value:
+        if oid:
+            if oid.isV2:
+                agent_id = oid.agentName
+            else:
+                # Not much we can do but assume same agent
+                agent_id = self.get_agent_id()
+
             try:
-                agent = self.model.agents_by_id[value.agentName]
+                agent = self.model.agents_by_id[agent_id]
             except KeyError:
-                raise MappingException("Agent %s is unknown" % value.agentName)
+                raise MappingException("Agent %s is unknown" % agent_id)
 
+            object_id = oid.objectName
+
             try:
-                that = agent.objects_by_id[value.objectName]
-            except KeyError:
-                try:
-                    that = ref.that_cls.get_object_by_qmf_id \
-                        (cursor, agent.id, value.objectName)
-                except RosemaryNotFound:
-                    msg = "Referenced object %s hasn't appeared yet"
-                    raise MappingException(msg % value.objectName)
+                that = agent.get_object(cursor, ref.that_cls, object_id)
+            except RosemaryNotFound:
+                link = DeferredLink(obj, ref)
+                agent.deferred_links_by_id[object_id].append(link)
 
+                msg = "Referenced object %s hasn't appeared yet"
+                raise MappingException(msg % object_id)
+
             value = that._id
 
-        return col, value
+        return ref.sql_column, value
 
     def process_value(self, cls, prop, value):
         try:
@@ -484,6 +495,17 @@
 
         return "%s(%s,%s,%s)" % (name, agent_id, cls, obj_id)
 
+class DeferredLink(object):
+    def __init__(self, this, reference):
+        self.this = this
+        self.reference = reference
+
+    def realize(self, cursor, that):
+        column = self.reference.sql_column
+        values = {column.name: that._id, "_id": self.this._id}
+
+        self.this._class.sql_update_object.execute(cursor, values, (column,))
+
 class AgentUpdate(Update):
     def __init__(self, model, qmf_agent):
         super(AgentUpdate, self).__init__(model)
@@ -491,7 +513,7 @@
         self.qmf_agent = qmf_agent
 
     def get_agent_id(self):
-        return str(self.qmf_agent.getAgentBank())
+        return make_agent_id(self.qmf_agent)
 
     def do_process(self, cursor, stats):
         agent_id = self.get_agent_id()

Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py	2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/mint/python/mint/util.py	2010-09-14 20:55:50 UTC (rev 4288)
@@ -55,6 +55,14 @@
 
     return password
 
+def make_agent_id(agent):
+    broker = agent.getBroker()
+
+    if not agent.isV2 or agent is broker.getBrokerAgent():
+        return "!!%s:%i!!%s" % (broker.host, broker.port, agent.getAgentBank())
+
+    return agent.agentBank
+
 class QmfAgentId(object):
     def __init__(self, brokerId, brokerBank, agentBank):
         assert brokerId



More information about the rhmessaging-commits mailing list