Author: justi9
Date: 2010-01-11 15:07:58 -0500 (Mon, 11 Jan 2010)
New Revision: 3783
Modified:
mgmt/trunk/mint/python/mint/model.py
Log:
Isolate console callbacks from the model object
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-11 19:13:53 UTC (rev 3782)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-11 20:07:58 UTC (rev 3783)
@@ -2,10 +2,10 @@
from parsley.threadingex import Lifecycle
from sqlobject import *
-from mint import update
from mint.cache import MintCache
from mint.schema import *
from mint.schemalocal import *
+from mint.update import *
from mint.util import *
import mint.schema
@@ -14,7 +14,7 @@
log = logging.getLogger("mint.model")
-class MintModel(Console, Lifecycle):
+class MintModel(Lifecycle):
def __init__(self, app):
self.log = log
@@ -34,24 +34,22 @@
self.lock = RLock()
- self.deferredObjectPropCalls = defaultdict(list)
- self.deferredObjectStatCalls = defaultdict(list)
-
def check(self):
pass
def do_init(self):
assert self.qmfSession is None
- self.qmfSession = Session \
- (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+ self.qmfSession = Session(MintConsole(self),
+ manageConnections=True,
+ rcvObjects=self.app.updateEnabled)
def do_start(self):
# Clean up any transient objects that a previous instance may have
# left behind in the DB; it's basically an unconstrained agent
# disconnect update, for any agent
- up = update.AgentDisconnectUpdate(None)
+ up = AgentDisconnectUpdate(None)
self.app.updateThread.enqueue(up)
uris = [x.strip() for x in self.app.config.qmf.split(",")]
@@ -73,36 +71,82 @@
finally:
self.lock.release()
- def brokerConnected(self, qbroker):
- self.log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+ def getMintAgent(self, qagent):
+ id = str(QmfAgentId.fromAgent(qagent))
+ return self.agents[id]
- def brokerInfo(self, qbroker):
- # Now we have a brokerId to use to generate fully qualified agent
- # IDs
+class MintAgent(object):
+ def __init__(self, model, agent):
+ self.model = model
+ self.agent = agent
- for qagent in qbroker.getAgents():
- self.newMintAgent(qagent)
+ self.id = str(QmfAgentId.fromAgent(agent))
- def brokerDisconnected(self, qbroker):
- self.log.info \
- ("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+ self.lastHeartbeat = None
- def newMintAgent(self, qagent):
- agent = MintAgent(self, qagent)
+ # qmfObjectId => int database id
+ self.databaseIds = MintCache()
- self.lock.acquire()
+ # qmfObjectId => list of ModelUpdate objects
+ self.deferredUpdates = defaultdict(list)
+
+ self.model.lock.acquire()
try:
- assert agent.id not in self.agents
- self.agents[agent.id] = agent
+ assert self.id not in self.model.agents
+ self.model.agents[self.id] = self
finally:
- self.lock.release()
+ self.model.lock.release()
- return agent
+ def callMethod(self, mintObject, methodName, callback, args):
+ classKey = ClassKey(mintObject.qmfClassKey)
+ objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
- def getMintAgent(self, qagent):
- id = str(QmfAgentId.fromAgent(qagent))
- return self.agents[id]
+ self.model.lock.acquire()
+ try:
+ broker = self.agent.getBroker()
+ seq = self.model.qmfSession._sendMethodRequest \
+ (broker, classKey, objectId, methodName, args)
+
+ if seq is not None:
+ self.model.outstandingMethodCalls[seq] = callback
+
+ return seq
+ finally:
+ self.model.lock.release()
+
+ def delete(self):
+ self.model.lock.acquire()
+ try:
+ del self.model.agents[agent.id]
+ finally:
+ self.model.lock.release()
+
+ self.model = None
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.id)
+
+class MintConsole(Console):
+ def __init__(self, model):
+ self.model = model
+
+ self.deferredObjectPropCalls = defaultdict(list)
+ self.deferredObjectStatCalls = defaultdict(list)
+
+ def brokerConnected(self, qbroker):
+ log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+
+ def brokerInfo(self, qbroker):
+ # Now we have a brokerId to use to generate fully qualified agent
+ # IDs
+
+ for qagent in qbroker.getAgents():
+ MintAgent(self.model, qagent)
+
+ def brokerDisconnected(self, qbroker):
+ log.info("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+
def newAgent(self, qagent):
log.info("Creating %s", qagent)
@@ -111,9 +155,9 @@
# in brokerInfo.
if qagent.getBroker().brokerId:
- agent = self.newMintAgent(qagent)
+ agent = MintAgent(self.model, qagent)
- # XXX This business is to handle an agent-vs-agent data ordering
+ # XXX This business is to handle an agent-vs-agent-data ordering
# problem
objectPropCalls = self.deferredObjectPropCalls[agent.id]
@@ -129,125 +173,75 @@
def delAgent(self, qagent):
log.info("Deleting %s", qagent)
- self.lock.acquire()
- try:
- agent = self.getMintAgent(qagent)
- agent.model = None
- del self.agents[agent.id]
- finally:
- self.lock.release()
+ agent = self.model.getMintAgent(qagent)
+ agent.delete()
- up = update.AgentDisconnectUpdate(agent)
+ up = AgentDisconnectUpdate(agent)
self.app.updateThread.enqueue(up)
def heartbeat(self, qagent, timestamp):
timestamp = timestamp / 1000000000
- self.lock.acquire()
- try:
- agent = self.getMintAgent(qagent)
- agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
- finally:
- self.lock.release()
+ agent = self.model.getMintAgent(qagent)
+ agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
def newPackage(self, name):
- """ Invoked when a QMF package is discovered. """
- pass
+ log.info("New package %s", name)
def newClass(self, kind, classKey):
- """ Invoked when a new class is discovered. Session.getSchema can be
- used to obtain details about the class."""
- pass
+ log.info("New class %s", classKey)
def objectProps(self, broker, object):
- """ Invoked when an object is updated. """
-
- if not self.app.updateThread.isAlive():
+ if not self.model.app.updateThread.isAlive():
return
- self.lock.acquire()
+ self.model.lock.acquire()
try:
id = str(QmfAgentId.fromObject(object))
try:
- agent = self.agents[id]
+ agent = self.model.agents[id]
except KeyError:
self.deferredObjectPropCalls[id].append((broker, object))
return
finally:
- self.lock.release()
+ self.model.lock.release()
- up = update.PropertyUpdate(agent, object)
- self.app.updateThread.enqueue(up)
+ up = PropertyUpdate(agent, object)
+ self.model.app.updateThread.enqueue(up)
def objectStats(self, broker, object):
""" Invoked when an object is updated. """
- if not self.app.updateThread.isAlive():
+ if not self.model.app.updateThread.isAlive():
return
- self.lock.acquire()
+ self.model.lock.acquire()
try:
id = str(QmfAgentId.fromObject(object))
try:
- agent = self.agents[id]
+ agent = self.model.agents[id]
except KeyError:
self.deferredObjectStatCalls[id].append((broker, object))
return
finally:
- self.lock.release()
+ self.model.lock.release()
- up = update.StatisticUpdate(agent, object)
- self.app.updateThread.enqueue(up)
+ up = StatisticUpdate(agent, object)
+ self.model.app.updateThread.enqueue(up)
def event(self, broker, event):
""" Invoked when an event is raised. """
pass
def methodResponse(self, broker, seq, response):
- log.debug("Method response for request %i received from %s", seq, broker)
+ log.info("Method response for request %i received from %s", seq, broker)
log.debug("Response: %s", response)
- self.lock.acquire()
+ self.model.lock.acquire()
try:
- methodCallback = self.outstandingMethodCalls.pop(seq)
+ methodCallback = self.model.outstandingMethodCalls.pop(seq)
methodCallback(response.text, response.outArgs)
finally:
- self.lock.release()
-
-class MintAgent(object):
- def __init__(self, model, agent):
- self.model = model
- self.agent = agent
-
- self.id = str(QmfAgentId.fromAgent(agent))
-
- self.lastHeartbeat = None
-
- # qmfObjectId => int database id
- self.databaseIds = MintCache()
-
- # qmfObjectId => list of ModelUpdate objects
- self.deferredUpdates = defaultdict(list)
-
- def callMethod(self, mintObject, methodName, callback, args):
- classKey = ClassKey(mintObject.qmfClassKey)
- objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
-
- self.model.lock.acquire()
- try:
- broker = self.agent.getBroker()
-
- seq = self.model.qmfSession._sendMethodRequest \
- (broker, classKey, objectId, methodName, args)
-
- if seq is not None:
- self.model.outstandingMethodCalls[seq] = callback
-
- return seq
- finally:
self.model.lock.release()
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, self.id)