[rhmessaging-commits] rhmessaging commits: r4288 - in mgmt/newdata: mint/python/mint and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Sep 14 16:55:51 EDT 2010
Author: justi9
Date: 2010-09-14 16:55:50 -0400 (Tue, 14 Sep 2010)
New Revision: 4288
Modified:
mgmt/newdata/cumin/python/cumin/session.py
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/mint/python/mint/util.py
Log:
For bz 632296. Qualify v1 agent ids using broker host and port. This
is to prevent v1 agent id collisions that cause data corruption.
This change also addresses a data ordering problem that prevented
brokers and other objects from appearing when they were updated
infrequently (or never).
Modified: mgmt/newdata/cumin/python/cumin/session.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/session.py 2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/cumin/python/cumin/session.py 2010-09-14 20:55:50 UTC (rev 4288)
@@ -1,5 +1,6 @@
from model import *
from util import *
+from mint.util import make_agent_id
from qmf.console import Console, Session, ClassKey, ObjectId
@@ -132,18 +133,22 @@
def newAgent(self, qmf_agent):
log.debug("New agent %s", qmf_agent)
+ agent_id = make_agent_id(qmf_agent)
+
self.session.lock.acquire()
try:
- self.session.qmf_agents[qmf_agent.getAgentBank()] = qmf_agent
+ self.session.qmf_agents[agent_id] = qmf_agent
finally:
self.session.lock.release()
def delAgent(self, qmf_agent):
log.debug("Deleting agent %s", qmf_agent)
+ agent_id = make_agent_id(qmf_agent)
+
self.session.lock.acquire()
try:
- del self.session.qmf_agents[qmf_agent.getAgentBank()]
+ del self.session.qmf_agents[agent_id]
finally:
self.session.lock.release()
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/mint/python/mint/model.py 2010-09-14 20:55:50 UTC (rev 4288)
@@ -16,7 +16,7 @@
# int seq => callable
self.outstanding_method_calls = dict()
- self.lock = RLock()
+ self.lock = Lock()
def check(self):
log.info("Checking %s", self)
@@ -49,6 +49,7 @@
self.last_heartbeat = None
self.objects_by_id = dict()
+ self.deferred_links_by_id = defaultdict(list)
assert self.id not in self.model.agents_by_id
self.model.agents_by_id[self.id] = self
@@ -60,5 +61,14 @@
self.model = None
+ def get_object(self, cursor, cls, object_id):
+ try:
+ obj = self.objects_by_id[object_id]
+ except KeyError:
+ obj = cls.get_object_by_qmf_id(cursor, self.id, object_id)
+ self.objects_by_id[object_id] = obj
+
+ return obj
+
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.id)
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/mint/python/mint/update.py 2010-09-14 20:55:50 UTC (rev 4288)
@@ -176,24 +176,19 @@
self.qmf_object = qmf_object
def do_process(self, cursor, stats):
+ cls = self.get_class()
agent_id = self.get_agent_id()
+ object_id = self.get_object_id()
try:
agent = self.model.agents_by_id[agent_id]
except KeyError:
raise UpdateDropped()
- cls = self.get_class()
- obj_id = self.get_object_id()
- obj = None
-
try:
- obj = agent.objects_by_id[obj_id]
- except KeyError:
- try:
- obj = cls.get_object_by_qmf_id(cursor, agent_id, obj_id)
- except RosemaryNotFound:
- pass
+ obj = agent.get_object(cursor, cls, object_id)
+ except RosemaryNotFound:
+ obj = None
update_time, create_time, delete_time = self.qmf_object.getTimestamps()
@@ -236,10 +231,8 @@
assert obj
- agent.objects_by_id[obj_id] = obj
-
def get_agent_id(self):
- return self.qmf_object.getObjectId().agentName
+ return make_agent_id(self.qmf_object.getAgent())
def get_class(self):
class_key = self.qmf_object.getClassKey()
@@ -272,7 +265,7 @@
obj._qmf_object_id = self.get_object_id()
obj._qmf_create_time = create_time
obj._qmf_update_time = update_time
-
+
object_columns = list()
sample_columns = list()
@@ -303,6 +296,8 @@
sql = "; ".join(statements)
self.execute_sql(cursor, sql, obj.__dict__)
+ self.process_deferred_links(cursor, obj)
+
obj._sync_time = datetime.now()
self.model.print_event(3, "Created %s", obj)
@@ -310,6 +305,15 @@
return obj
+ def process_deferred_links(self, cursor, obj):
+ agent = self.model.agents_by_id[obj._qmf_agent_id]
+ links = agent.deferred_links_by_id[obj._qmf_object_id]
+
+ for link in links:
+ link.realize(cursor, obj)
+
+ del agent.deferred_links_by_id[obj._qmf_object_id]
+
def update_object(self, cursor, stats, obj):
update_time, create_time, delete_time = self.qmf_object.getTimestamps()
update_time = datetime.fromtimestamp(update_time / 1000000000)
@@ -364,7 +368,7 @@
try:
if prop.type == 10:
col, nvalue = self.process_reference \
- (cls, prop, value, cursor)
+ (obj, prop, value, cursor)
else:
col, nvalue = self.process_value(cls, prop, value)
except MappingException, e:
@@ -380,36 +384,43 @@
setattr(obj, col.name, nvalue)
columns.append(col)
- def process_reference(self, cls, prop, value, cursor):
+ def process_reference(self, obj, prop, oid, cursor):
try:
- ref = cls._references_by_name[prop.name]
+ ref = obj._class._references_by_name[prop.name]
except KeyError:
raise MappingException("Reference %s is unknown" % prop.name)
if not ref.sql_column:
raise MappingException("Reference %s has no column" % ref.name)
- col = ref.sql_column
+ value = None
- if value:
+ if oid:
+ if oid.isV2:
+ agent_id = oid.agentName
+ else:
+ # Not much we can do but assume same agent
+ agent_id = self.get_agent_id()
+
try:
- agent = self.model.agents_by_id[value.agentName]
+ agent = self.model.agents_by_id[agent_id]
except KeyError:
- raise MappingException("Agent %s is unknown" % value.agentName)
+ raise MappingException("Agent %s is unknown" % agent_id)
+ object_id = oid.objectName
+
try:
- that = agent.objects_by_id[value.objectName]
- except KeyError:
- try:
- that = ref.that_cls.get_object_by_qmf_id \
- (cursor, agent.id, value.objectName)
- except RosemaryNotFound:
- msg = "Referenced object %s hasn't appeared yet"
- raise MappingException(msg % value.objectName)
+ that = agent.get_object(cursor, ref.that_cls, object_id)
+ except RosemaryNotFound:
+ link = DeferredLink(obj, ref)
+ agent.deferred_links_by_id[object_id].append(link)
+ msg = "Referenced object %s hasn't appeared yet"
+ raise MappingException(msg % object_id)
+
value = that._id
- return col, value
+ return ref.sql_column, value
def process_value(self, cls, prop, value):
try:
@@ -484,6 +495,17 @@
return "%s(%s,%s,%s)" % (name, agent_id, cls, obj_id)
+class DeferredLink(object):
+ def __init__(self, this, reference):
+ self.this = this
+ self.reference = reference
+
+ def realize(self, cursor, that):
+ column = self.reference.sql_column
+ values = {column.name: that._id, "_id": self.this._id}
+
+ self.this._class.sql_update_object.execute(cursor, values, (column,))
+
class AgentUpdate(Update):
def __init__(self, model, qmf_agent):
super(AgentUpdate, self).__init__(model)
@@ -491,7 +513,7 @@
self.qmf_agent = qmf_agent
def get_agent_id(self):
- return str(self.qmf_agent.getAgentBank())
+ return make_agent_id(self.qmf_agent)
def do_process(self, cursor, stats):
agent_id = self.get_agent_id()
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-09-14 20:43:08 UTC (rev 4287)
+++ mgmt/newdata/mint/python/mint/util.py 2010-09-14 20:55:50 UTC (rev 4288)
@@ -55,6 +55,14 @@
return password
+def make_agent_id(agent):
+ broker = agent.getBroker()
+
+ if not agent.isV2 or agent is broker.getBrokerAgent():
+ return "!!%s:%i!!%s" % (broker.host, broker.port, agent.getAgentBank())
+
+ return agent.agentBank
+
class QmfAgentId(object):
def __init__(self, brokerId, brokerBank, agentBank):
assert brokerId
More information about the rhmessaging-commits
mailing list