[rhmessaging-commits] rhmessaging commits: r2880 - in mgmt/trunk: mint/python/mint and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Nov 25 15:02:24 EST 2008
Author: justi9
Date: 2008-11-25 15:02:24 -0500 (Tue, 25 Nov 2008)
New Revision: 2880
Modified:
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/update.py
Log:
Use a MintBroker object as a proxy to the underlying qmf broker object.
On it we keep per broker id mappings and orphan tracking.
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-25 20:00:55 UTC (rev 2879)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-25 20:02:24 UTC (rev 2880)
@@ -255,9 +255,9 @@
def get_session_by_registration(self, reg):
assert reg.broker
- assert reg.broker.managedBroker in self.model.data.managedBrokers
+ assert reg.broker.managedBroker in self.model.data.mintBrokers
- broker = self.model.data.managedBrokers[reg.broker.managedBroker][0]
+ broker = self.model.data.mintBrokers[reg.broker.managedBroker]
return broker.getAmqpSession()
class CuminActionInvocation(object):
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-11-25 20:00:55 UTC (rev 2879)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-11-25 20:02:24 UTC (rev 2880)
@@ -294,6 +294,18 @@
brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
+class MintBroker(object):
+ def __init__(self, qmfBroker):
+ self.qmfBroker = qmfBroker
+
+ self.qmfId = str(self.qmfBroker.getBrokerId())
+ self.databaseId = None
+ self.objectDatabaseIds = MintCache() # database ids by qmf object id
+ self.orphans = dict() # updates by qmf object id
+
+ def getAmqpSession(self):
+ return self.qmfBroker.getAmqpSession()
+
class MintModel(qmf.console.Console):
staticInstance = None
@@ -304,28 +316,15 @@
assert MintModel.staticInstance is None
MintModel.staticInstance = self
- self.connCloseListener = None
+ self.mintBrokers = dict() # MintBrokers by qmfId
+
self.__lock = RLock()
- self.dbStyle = MixedCaseUnderscoreStyle()
self.dbConn = None
- # map containing updateObjects that have a missing parent
- # dependency, for deferred insertion (missing_class,
- # missing_id.first, missing_id.second) -> [updateObject, ...,
- # updateObject]
- self.orphanObjectMap = dict()
-
- self.orphans = dict()
-
self.updateThread = update.ModelUpdateThread(self)
- self.mgmtSession = qmf.console.Session(self)
+ self.mgmtSession = qmf.console.Session(self, manageConnections=True)
self.outstandingMethodCalls = dict()
- self.managedBrokers = dict()
- # cache contains mapping between qmf ids and database ids
- # (idFirst, idSecond) -> dbId
- self.cache = MintCache()
-
if self.debug:
log.setLevel(logging.DEBUG)
@@ -352,20 +351,18 @@
def stop(self):
self.updateThread.stop()
- def setCloseListener(self, connCloseListener):
- self.connCloseListener = connCloseListener
-
def getSession(self):
return self.mgmtSession
- def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
+ def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
self.lock()
try:
- broker, dbObjId = self.managedBrokers[managedBroker]
+ broker = self.mintBrokers[brokerId]
finally:
self.unlock()
- seq = self.mgmtSession._sendMethodRequest(broker, ClassKey(classKey), objId, methodName, args)
+ seq = self.mgmtSession._sendMethodRequest \
+ (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
if seq is not None:
self.lock()
@@ -379,8 +376,8 @@
""" Invoked when a connection is established to a broker """
self.lock()
try:
- self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
- broker.idCache = MintCache()
+ mbroker = MintBroker(broker)
+ self.mintBrokers[mbroker.qmfId] = mbroker
finally:
self.unlock()
@@ -388,11 +385,9 @@
""" Invoked when the connection to a broker is lost """
self.lock()
try:
- del self.managedBrokers[str(broker.getBrokerId())]
+ del self.mintBrokers[str(broker.getBrokerId())]
finally:
self.unlock()
- if (self.connCloseListener != None):
- self.connCloseListener(broker)
def newPackage(self, name):
""" Invoked when a QMF package is discovered. """
@@ -414,7 +409,8 @@
def objectProps(self, broker, record):
""" Invoked when an object is updated. """
- up = update.PropertyUpdate(self, broker, record)
+ mbroker = self.mintBrokers[str(broker.getBrokerId())]
+ up = update.PropertyUpdate(self, mbroker, record)
if record.getClassKey().getClassName() == "job":
up.priority = 1
@@ -424,7 +420,8 @@
def objectStats(self, broker, record):
""" Invoked when an object is updated. """
- up = update.StatisticUpdate(self, broker, record)
+ mbroker = self.mintBrokers[str(broker.getBrokerId())]
+ up = update.StatisticUpdate(self, mbroker, record)
if record.getClassKey().getClassName() == "job":
up.priority = 1
@@ -439,11 +436,16 @@
pass
def brokerInfo(self, broker):
+ # XXX why do we do this?
self.lock()
try:
- self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
+ mbroker = MintBroker(broker)
+ self.mintBrokers[mbroker.qmfId] = mbroker
finally:
self.unlock()
def methodResponse(self, broker, seq, response):
- self.updateThread.enqueue(update.MethodUpdate(broker, seq, response))
+ mbroker = self.mintBrokers[str(broker.getBrokerId())]
+ up = update.MethodUpdate(self, mbroker, seq, response)
+
+ self.updateThread.enqueue(up)
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-11-25 20:00:55 UTC (rev 2879)
+++ mgmt/trunk/mint/python/mint/update.py 2008-11-25 20:02:24 UTC (rev 2880)
@@ -4,6 +4,7 @@
import types
import pickle
import psycopg2
+import mint
from Queue import Queue as ConcurrentQueue, Full, Empty
from threading import Thread
from traceback import print_exc
@@ -68,20 +69,20 @@
conn.commit()
- for broker, id in self.model.managedBrokers.values():
- broker.idCache.commit()
+ for broker in self.model.mintBrokers.values():
+ broker.objectDatabaseIds.commit()
profile.commitTime += clock() - start
else:
conn.commit()
- for broker, id in self.model.managedBrokers.values():
- broker.idCache.commit()
+ for broker in self.model.mintBrokers.values():
+ broker.objectDatabaseIds.commit()
except:
- conn.rollback()
+ conn.rollback()
- for broker, id in self.model.managedBrokers.values():
- broker.idCache.rollback()
+ for broker in self.model.mintBrokers.values():
+ broker.objectDatabaseIds.rollback()
log.exception("Update failed")
@@ -151,7 +152,7 @@
foreignKey = name + "_id"
- id = self.broker.idCache.get(oid)
+ id = self.broker.objectDatabaseIds.get(oid)
if id is None:
raise ReferenceException(oid)
@@ -182,10 +183,10 @@
log.info("Referenced object %r not found", e.sought)
try:
- orphans = self.model.orphans[oid]
+ orphans = self.broker.orphans[oid]
orphans.append(self)
except KeyError:
- self.model.orphans[oid] = list((self,))
+ self.broker.orphans[oid] = list((self,))
return
@@ -202,7 +203,7 @@
attrs["sourceScopeId"] = oid.first
attrs["sourceObjectId"] = oid.second
attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["managedBroker"] = str(self.broker.getBrokerId())
+ attrs["managedBroker"] = self.broker.qmfId
cursor = conn.cursor()
@@ -212,7 +213,7 @@
# 2. Object is in mint's db, but id is not yet cached
# 3. Object is in mint's db, and id is cached
- id = self.broker.idCache.get(oid)
+ id = self.broker.objectDatabaseIds.get(oid)
if id is None:
# Case 1 or 2
@@ -247,7 +248,7 @@
assert cursor.rowcount == 1
- self.broker.idCache.set(oid, id)
+ self.broker.objectDatabaseIds.set(oid, id)
else:
# Case 3
@@ -259,7 +260,7 @@
assert cursor.rowcount == 1
try:
- orphans = self.model.orphans.pop(oid)
+ orphans = self.broker.orphans.pop(oid)
if orphans:
log.info("Re-enqueueing %i orphans whose creation had been deferred",
@@ -271,25 +272,26 @@
pass
def processBroker(self, cursor, id):
- brokerId = str(self.broker.getBrokerId())
+ try:
+ broker = self.model.mintBrokers[self.broker.qmfId]
+ except KeyError:
+ # XXX what does this mean?
+ return
- if brokerId in self.model.managedBrokers:
- broker, dbId = self.model.managedBrokers[brokerId]
+ if broker.databaseId is None:
+ op = SqlGetBrokerRegistration()
+ op.execute(cursor, {"url": self.broker.getFullUrl()})
- if dbId == 0:
- op = SqlGetBrokerRegistration()
- op.execute(cursor, {"url": self.broker.getFullUrl()})
+ rec = cursor.fetchone()
- rec = cursor.fetchone()
+ if rec:
+ rid = rec[0]
- if rec:
- rid = rec[0]
+ op = SqlAttachBroker()
+ op.execute(cursor, {"id": id, "registrationId": rid})
- op = SqlAttachBroker()
- op.execute(cursor, {"id": id, "registrationId": rid})
+ broker.databaseId = id
- self.model.managedBrokers[brokerId] = (broker, id)
-
class StatisticUpdate(ModelUpdate):
def process(self, conn):
try:
@@ -301,7 +303,7 @@
statsCls = getattr(mint, "%sStats" % cls.__name__)
oid = self.object.getObjectId()
- id = self.broker.idCache.get(oid)
+ id = self.broker.objectDatabaseIds.get(oid)
if id is None:
# Just drop it; we'll get more stats later
@@ -332,8 +334,8 @@
op.execute(cursor, {"statsId": statsId, "id": id})
class MethodUpdate(ModelUpdate):
- def __init__(self, broker, seq, response):
- super(MethodUpdate, self).__init__(broker, response)
+ def __init__(self, model, broker, seq, response):
+ super(MethodUpdate, self).__init__(model, broker, response)
self.seq = seq
More information about the rhmessaging-commits
mailing list