[rhmessaging-commits] rhmessaging commits: r2915 - mgmt/trunk/mint/python/mint.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Dec 3 11:29:12 EST 2008
Author: justi9
Date: 2008-12-03 11:29:12 -0500 (Wed, 03 Dec 2008)
New Revision: 2915
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
Log:
Remove getSession on MintModel and instead implement addBroker and
delBroker directly there.
This was done in order to get some more accounting for qmf broker
conns, by id and url. These are needed to implement a
registration-based reconnect.
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-12-03 16:18:20 UTC (rev 2914)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-12-03 16:29:12 UTC (rev 2915)
@@ -216,7 +216,7 @@
name = StringCol(length=1000, default=None, unique=True, notNone=True)
url = StringCol(length=1000, default=None)
- qmfBroker = None
+ mintBroker = None
broker = ForeignKey("Broker", cascade="null", default=None)
groups = SQLRelatedJoin("BrokerGroup",
intermediateTable="broker_group_mapping",
@@ -229,7 +229,7 @@
def connect(self, model):
log.info("Connecting to broker '%s' at %s" % (self.name, self.url))
try:
- self.qmfBroker = model.getSession().addBroker(self.url)
+ self.mintBroker = model.addBroker(self.url)
log.info("Connection succeeded")
except Exception, e:
log.info("Connection failed: %s ", e.message)
@@ -238,17 +238,14 @@
def disconnect(self, model):
log.info("Disconnecting from broker '%s' at %s" % (self.name, self.url))
try:
- model.getSession().delBroker(self.qmfBroker)
+ model.delBroker(self.mintBroker)
log.info("Disconnection succeeded")
except Exception, e:
log.info("Disconnection failed: %s ", e.message)
print_exc()
def getBrokerId(self):
- if self.qmfBroker is not None:
- return str(self.qmfBroker.getBrokerId())
- else:
- return None
+ return self.mintBroker.qmfId
def getDefaultVhost(self):
if self.broker:
@@ -297,14 +294,18 @@
properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
class MintBroker(object):
- def __init__(self, qmfBroker):
+ def __init__(self, url, qmfBroker):
+ self.url = url
self.qmfBroker = qmfBroker
- self.qmfId = str(self.qmfBroker.getBrokerId())
+ self.qmfId = None
self.databaseId = None
+
self.objectDatabaseIds = MintCache() # database ids by qmf object id
self.orphans = dict() # updates by qmf object id
+ self.connected = False
+
def getAmqpSession(self):
return self.qmfBroker.getAmqpSession()
@@ -321,8 +322,13 @@
assert MintModel.staticInstance is None
MintModel.staticInstance = self
- self.mintBrokers = dict() # MintBrokers by qmfId
+ # Lookup tables used for recovering MintBroker objects, which have
+ # mint-specific accounting and wrap a qmf broker object
+ self.mintBrokersByQmfBroker = dict()
+ self.mintBrokersById = dict()
+ self.mintBrokersByUrl = dict()
+
self.__lock = RLock()
self.dbConn = None
@@ -359,13 +365,10 @@
self.updateThread.stop()
self.dbExpireThread.stop()
- def getSession(self):
- return self.mgmtSession
-
def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
self.lock()
try:
- broker = self.mintBrokers[brokerId]
+ broker = self.mintBrokersById[brokerId]
finally:
self.unlock()
@@ -380,23 +383,77 @@
self.unlock()
return seq
- def brokerConnected(self, broker):
+ def addBroker(self, url):
+ self.lock()
+ try:
+ qbroker = self.mgmtSession.addBroker(url)
+ mbroker = MintBroker(url, qbroker)
+
+ # Can't add the by-id mapping here, as the id is not set yet;
+ # instead we add it when brokerConnected is called
+
+ self.mintBrokersByQmfBroker[qbroker] = mbroker
+ self.mintBrokersByUrl[url] = mbroker
+
+ return mbroker
+ finally:
+ self.unlock()
+
+ def delBroker(self, mbroker):
+ assert isinstance(mbroker, MintBroker)
+
+ self.lock()
+ try:
+ self.mgmtSession.delBroker(mbroker.qmfBroker)
+
+ del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
+ del self.mintBrokersById[mbroker.qmfId]
+ del self.mintBrokersByUrl[mbroker.url]
+ finally:
+ self.unlock()
+
+ def brokerConnected(self, qbroker):
""" Invoked when a connection is established to a broker """
self.lock()
try:
- mbroker = MintBroker(broker)
- self.mintBrokers[mbroker.qmfId] = mbroker
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ assert mbroker.connected is False
+
+ mbroker.connected = True
finally:
self.unlock()
- def brokerDisconnected(self, broker):
+ def brokerInfo(self, qbroker):
+ self.lock()
+ try:
+ id = str(qbroker.getBrokerId())
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ mbroker.qmfId = id
+ self.mintBrokersById[id] = mbroker
+ finally:
+ self.unlock()
+
+ def brokerDisconnected(self, qbroker):
""" Invoked when the connection to a broker is lost """
self.lock()
try:
- del self.mintBrokers[str(broker.getBrokerId())]
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ assert mbroker.connected is True
+
+ mbroker.connected = False
finally:
self.unlock()
+ def getMintBrokerByQmfBroker(self, qbroker):
+ self.lock()
+ try:
+ return self.mintBrokersByQmfBroker[qbroker]
+ finally:
+ self.unlock()
+
def newPackage(self, name):
""" Invoked when a QMF package is discovered. """
pass
@@ -417,7 +474,8 @@
def objectProps(self, broker, record):
""" Invoked when an object is updated. """
- mbroker = self.mintBrokers[str(broker.getBrokerId())]
+ mbroker = self.getMintBrokerByQmfBroker(broker)
+
up = update.PropertyUpdate(self, mbroker, record)
if record.getClassKey().getClassName() == "job":
@@ -428,7 +486,7 @@
def objectStats(self, broker, record):
""" Invoked when an object is updated. """
- mbroker = self.mintBrokers[str(broker.getBrokerId())]
+ mbroker = self.getMintBrokerByQmfBroker(broker)
up = update.StatisticUpdate(self, mbroker, record)
if record.getClassKey().getClassName() == "job":
@@ -443,17 +501,8 @@
def heartbeat(self, agent, timestamp):
pass
- def brokerInfo(self, broker):
- # XXX why do we do this?
- self.lock()
- try:
- mbroker = MintBroker(broker)
- self.mintBrokers[mbroker.qmfId] = mbroker
- finally:
- self.unlock()
-
def methodResponse(self, broker, seq, response):
- mbroker = self.mintBrokers[str(broker.getBrokerId())]
+ mbroker = self.getMintBrokerByQmfBroker(broker)
up = update.MethodUpdate(self, mbroker, seq, response)
self.updateThread.enqueue(up)
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2008-12-03 16:18:20 UTC (rev 2914)
+++ mgmt/trunk/mint/python/mint/tools.py 2008-12-03 16:29:12 UTC (rev 2915)
@@ -89,7 +89,7 @@
try:
for arg in args[1:]:
- model.getSession().addBroker(arg)
+ model.addBroker(arg)
enq_last = 0
deq_last = 0
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-12-03 16:18:20 UTC (rev 2914)
+++ mgmt/trunk/mint/python/mint/update.py 2008-12-03 16:29:12 UTC (rev 2915)
@@ -69,19 +69,19 @@
conn.commit()
- for broker in self.model.mintBrokers.values():
+ for broker in self.model.mintBrokersByQmfBroker.values():
broker.objectDatabaseIds.commit()
profile.commitTime += clock() - start
else:
conn.commit()
- for broker in self.model.mintBrokers.values():
+ for broker in self.model.mintBrokersByQmfBroker.values():
broker.objectDatabaseIds.commit()
except:
conn.rollback()
- for broker in self.model.mintBrokers.values():
+ for broker in self.model.mintBrokersByQmfBroker.values():
broker.objectDatabaseIds.rollback()
log.exception("Update failed")
@@ -203,7 +203,7 @@
attrs["qmfScopeId"] = oid.first
attrs["qmfObjectId"] = oid.second
attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfBrokerId"] = self.broker.qmfId
+ attrs["qmfBrokerId"] = str(self.broker.qmfBroker.getBrokerId())
cursor = conn.cursor()
@@ -272,26 +272,24 @@
pass
def processBroker(self, cursor, id):
- try:
- broker = self.model.mintBrokers[self.broker.qmfId]
- except KeyError:
- # XXX what does this mean?
- return
-
- if broker.databaseId is None:
+ if self.broker.databaseId is None:
op = SqlGetBrokerRegistration()
- op.execute(cursor, {"url": self.broker.getFullUrl()})
+ op.execute(cursor, {"url": self.broker.url})
rec = cursor.fetchone()
+ #print op.text, {"url": self.broker.url}
+
if rec:
rid = rec[0]
op = SqlAttachBroker()
op.execute(cursor, {"id": id, "registrationId": rid})
- broker.databaseId = id
+ #print op.text, {"id": id, "registrationId": rid}
+ self.broker.databaseId = id
+
class StatisticUpdate(ModelUpdate):
def process(self, conn):
try:
More information about the rhmessaging-commits
mailing list