[rhmessaging-commits] rhmessaging commits: r3783 - mgmt/trunk/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jan 11 15:07:59 EST 2010


Author: justi9
Date: 2010-01-11 15:07:58 -0500 (Mon, 11 Jan 2010)
New Revision: 3783

Modified:
   mgmt/trunk/mint/python/mint/model.py
Log:
Isolate console callbacks from the model object

Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py	2010-01-11 19:13:53 UTC (rev 3782)
+++ mgmt/trunk/mint/python/mint/model.py	2010-01-11 20:07:58 UTC (rev 3783)
@@ -2,10 +2,10 @@
 from parsley.threadingex import Lifecycle
 from sqlobject import *
 
-from mint import update
 from mint.cache import MintCache
 from mint.schema import *
 from mint.schemalocal import *
+from mint.update import *
 from mint.util import *
 
 import mint.schema
@@ -14,7 +14,7 @@
 
 log = logging.getLogger("mint.model")
 
-class MintModel(Console, Lifecycle):
+class MintModel(Lifecycle):
   def __init__(self, app):
     self.log = log
 
@@ -34,24 +34,22 @@
 
     self.lock = RLock()
 
-    self.deferredObjectPropCalls = defaultdict(list)
-    self.deferredObjectStatCalls = defaultdict(list)
-
   def check(self):
     pass
 
   def do_init(self):
     assert self.qmfSession is None
 
-    self.qmfSession = Session \
-        (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+    self.qmfSession = Session(MintConsole(self),
+                              manageConnections=True,
+                              rcvObjects=self.app.updateEnabled)
 
   def do_start(self):
     # Clean up any transient objects that a previous instance may have
     # left behind in the DB; it's basically an unconstrained agent
     # disconnect update, for any agent
 
-    up = update.AgentDisconnectUpdate(None)
+    up = AgentDisconnectUpdate(None)
     self.app.updateThread.enqueue(up)
 
     uris = [x.strip() for x in self.app.config.qmf.split(",")]
@@ -73,36 +71,82 @@
     finally:
       self.lock.release()
 
-  def brokerConnected(self, qbroker):
-    self.log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+  def getMintAgent(self, qagent):
+    id = str(QmfAgentId.fromAgent(qagent))
+    return self.agents[id]
 
-  def brokerInfo(self, qbroker):
-    # Now we have a brokerId to use to generate fully qualified agent
-    # IDs
+class MintAgent(object):
+  def __init__(self, model, agent):
+    self.model = model
+    self.agent = agent
 
-    for qagent in qbroker.getAgents():
-      self.newMintAgent(qagent)
+    self.id = str(QmfAgentId.fromAgent(agent))
 
-  def brokerDisconnected(self, qbroker):
-    self.log.info \
-        ("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+    self.lastHeartbeat = None
 
-  def newMintAgent(self, qagent):
-    agent = MintAgent(self, qagent)
+    # qmfObjectId => int database id
+    self.databaseIds = MintCache()
 
-    self.lock.acquire()
+    # qmfObjectId => list of ModelUpdate objects
+    self.deferredUpdates = defaultdict(list)
+
+    self.model.lock.acquire()
     try:
-      assert agent.id not in self.agents
-      self.agents[agent.id] = agent
+      assert self.id not in self.model.agents
+      self.model.agents[self.id] = self
     finally:
-      self.lock.release()
+      self.model.lock.release()
 
-    return agent
+  def callMethod(self, mintObject, methodName, callback, args):
+    classKey = ClassKey(mintObject.qmfClassKey)
+    objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
 
-  def getMintAgent(self, qagent):
-    id = str(QmfAgentId.fromAgent(qagent))
-    return self.agents[id]
+    self.model.lock.acquire()
+    try:
+      broker = self.agent.getBroker()
 
+      seq = self.model.qmfSession._sendMethodRequest \
+          (broker, classKey, objectId, methodName, args)
+
+      if seq is not None:
+        self.model.outstandingMethodCalls[seq] = callback
+
+      return seq
+    finally:
+      self.model.lock.release()
+
+  def delete(self):
+    self.model.lock.acquire()
+    try:
+      del self.model.agents[agent.id]
+    finally:
+      self.model.lock.release()
+ 
+    self.model = None
+
+  def __repr__(self):
+    return "%s(%s)" % (self.__class__.__name__, self.id)
+
+class MintConsole(Console):
+  def __init__(self, model):
+    self.model = model
+
+    self.deferredObjectPropCalls = defaultdict(list)
+    self.deferredObjectStatCalls = defaultdict(list)
+
+  def brokerConnected(self, qbroker):
+    log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+
+  def brokerInfo(self, qbroker):
+    # Now we have a brokerId to use to generate fully qualified agent
+    # IDs
+
+    for qagent in qbroker.getAgents():
+      MintAgent(self.model, qagent)
+
+  def brokerDisconnected(self, qbroker):
+    log.info("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+
   def newAgent(self, qagent):
     log.info("Creating %s", qagent)
 
@@ -111,9 +155,9 @@
     # in brokerInfo.
 
     if qagent.getBroker().brokerId:
-      agent = self.newMintAgent(qagent)
+      agent = MintAgent(self.model, qagent)
 
-      # XXX This business is to handle an agent-vs-agent data ordering
+      # XXX This business is to handle an agent-vs-agent-data ordering
       # problem
 
       objectPropCalls = self.deferredObjectPropCalls[agent.id]
@@ -129,125 +173,75 @@
   def delAgent(self, qagent):
     log.info("Deleting %s", qagent)
 
-    self.lock.acquire()
-    try:
-      agent = self.getMintAgent(qagent)
-      agent.model = None
-      del self.agents[agent.id]
-    finally:
-      self.lock.release()
+    agent = self.model.getMintAgent(qagent)
+    agent.delete()
  
-    up = update.AgentDisconnectUpdate(agent)
+    up = AgentDisconnectUpdate(agent)
     self.app.updateThread.enqueue(up)
 
   def heartbeat(self, qagent, timestamp):
     timestamp = timestamp / 1000000000
 
-    self.lock.acquire()
-    try:
-      agent = self.getMintAgent(qagent)
-      agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
-    finally:
-      self.lock.release()
+    agent = self.model.getMintAgent(qagent)
+    agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
 
   def newPackage(self, name):
-    """ Invoked when a QMF package is discovered. """
-    pass
+    log.info("New package %s", name)
 
   def newClass(self, kind, classKey):
-    """ Invoked when a new class is discovered.  Session.getSchema can be
-    used to obtain details about the class."""
-    pass
+    log.info("New class %s", classKey)
 
   def objectProps(self, broker, object):
-    """ Invoked when an object is updated. """
-
-    if not self.app.updateThread.isAlive():
+    if not self.model.app.updateThread.isAlive():
       return
 
-    self.lock.acquire()
+    self.model.lock.acquire()
     try:
       id = str(QmfAgentId.fromObject(object))
 
       try:
-        agent = self.agents[id]
+        agent = self.model.agents[id]
       except KeyError:
         self.deferredObjectPropCalls[id].append((broker, object))
         return
     finally:
-      self.lock.release()
+      self.model.lock.release()
 
-    up = update.PropertyUpdate(agent, object)
-    self.app.updateThread.enqueue(up)
+    up = PropertyUpdate(agent, object)
+    self.model.app.updateThread.enqueue(up)
 
   def objectStats(self, broker, object):
     """ Invoked when an object is updated. """
 
-    if not self.app.updateThread.isAlive():
+    if not self.model.app.updateThread.isAlive():
       return
 
-    self.lock.acquire()
+    self.model.lock.acquire()
     try:
       id = str(QmfAgentId.fromObject(object))
 
       try:
-        agent = self.agents[id]
+        agent = self.model.agents[id]
       except KeyError:
         self.deferredObjectStatCalls[id].append((broker, object))
         return
     finally:
-      self.lock.release()
+      self.model.lock.release()
 
-    up = update.StatisticUpdate(agent, object)
-    self.app.updateThread.enqueue(up)
+    up = StatisticUpdate(agent, object)
+    self.model.app.updateThread.enqueue(up)
 
   def event(self, broker, event):
     """ Invoked when an event is raised. """
     pass
 
   def methodResponse(self, broker, seq, response):
-    log.debug("Method response for request %i received from %s", seq, broker)
+    log.info("Method response for request %i received from %s", seq, broker)
     log.debug("Response: %s", response)
 
-    self.lock.acquire()
+    self.model.lock.acquire()
     try:
-      methodCallback = self.outstandingMethodCalls.pop(seq)
+      methodCallback = self.model.outstandingMethodCalls.pop(seq)
       methodCallback(response.text, response.outArgs)
     finally:
-      self.lock.release()
-
-class MintAgent(object):
-  def __init__(self, model, agent):
-    self.model = model
-    self.agent = agent
-
-    self.id = str(QmfAgentId.fromAgent(agent))
-
-    self.lastHeartbeat = None
-
-    # qmfObjectId => int database id
-    self.databaseIds = MintCache()
-
-    # qmfObjectId => list of ModelUpdate objects
-    self.deferredUpdates = defaultdict(list)
-
-  def callMethod(self, mintObject, methodName, callback, args):
-    classKey = ClassKey(mintObject.qmfClassKey)
-    objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
-
-    self.model.lock.acquire()
-    try:
-      broker = self.agent.getBroker()
-
-      seq = self.model.qmfSession._sendMethodRequest \
-          (broker, classKey, objectId, methodName, args)
-
-      if seq is not None:
-        self.model.outstandingMethodCalls[seq] = callback
-
-      return seq
-    finally:
       self.model.lock.release()
-
-  def __repr__(self):
-    return "%s(%s)" % (self.__class__.__name__, self.id)



More information about the rhmessaging-commits mailing list