[rhmessaging-commits] rhmessaging commits: r2880 - in mgmt/trunk: mint/python/mint and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Nov 25 15:02:24 EST 2008


Author: justi9
Date: 2008-11-25 15:02:24 -0500 (Tue, 25 Nov 2008)
New Revision: 2880

Modified:
   mgmt/trunk/cumin/python/cumin/model.py
   mgmt/trunk/mint/python/mint/__init__.py
   mgmt/trunk/mint/python/mint/update.py
Log:
Use a MintBroker object as a proxy to the underlying qmf broker object.

On it we keep per broker id mappings and orphan tracking.



Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py	2008-11-25 20:00:55 UTC (rev 2879)
+++ mgmt/trunk/cumin/python/cumin/model.py	2008-11-25 20:02:24 UTC (rev 2880)
@@ -255,9 +255,9 @@
 
     def get_session_by_registration(self, reg):
         assert reg.broker
-        assert reg.broker.managedBroker in self.model.data.managedBrokers
+        assert reg.broker.managedBroker in self.model.data.mintBrokers
 
-        broker = self.model.data.managedBrokers[reg.broker.managedBroker][0]
+        broker = self.model.data.mintBrokers[reg.broker.managedBroker]
         return broker.getAmqpSession()
 
 class CuminActionInvocation(object):

Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py	2008-11-25 20:00:55 UTC (rev 2879)
+++ mgmt/trunk/mint/python/mint/__init__.py	2008-11-25 20:02:24 UTC (rev 2880)
@@ -294,6 +294,18 @@
   brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
   properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
 
+class MintBroker(object):
+  def __init__(self, qmfBroker):
+    self.qmfBroker = qmfBroker
+
+    self.qmfId = str(self.qmfBroker.getBrokerId())
+    self.databaseId = None
+    self.objectDatabaseIds = MintCache() # database ids by qmf object id
+    self.orphans = dict() # updates by qmf object id
+
+  def getAmqpSession(self):
+    return self.qmfBroker.getAmqpSession()
+
 class MintModel(qmf.console.Console):
   staticInstance = None
 
@@ -304,28 +316,15 @@
     assert MintModel.staticInstance is None
     MintModel.staticInstance = self
 
-    self.connCloseListener = None
+    self.mintBrokers = dict() # MintBrokers by qmfId
+
     self.__lock = RLock()
-    self.dbStyle = MixedCaseUnderscoreStyle()
     self.dbConn = None
 
-    # map containing updateObjects that have a missing parent
-    # dependency, for deferred insertion (missing_class,
-    # missing_id.first, missing_id.second) -> [updateObject, ...,
-    # updateObject]
-    self.orphanObjectMap = dict()
-
-    self.orphans = dict()
-
     self.updateThread = update.ModelUpdateThread(self)
-    self.mgmtSession = qmf.console.Session(self)
+    self.mgmtSession = qmf.console.Session(self, manageConnections=True)
     self.outstandingMethodCalls = dict()
-    self.managedBrokers = dict()
 
-    # cache contains mapping between qmf ids and database ids
-    # (idFirst, idSecond) -> dbId
-    self.cache = MintCache()
-
     if self.debug:
       log.setLevel(logging.DEBUG)
 
@@ -352,20 +351,18 @@
   def stop(self):
     self.updateThread.stop()
 
-  def setCloseListener(self, connCloseListener):
-    self.connCloseListener = connCloseListener
-
   def getSession(self):
     return self.mgmtSession
 
-  def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
+  def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
     self.lock()
     try:
-      broker, dbObjId = self.managedBrokers[managedBroker]
+      broker = self.mintBrokers[brokerId]
     finally:
       self.unlock()
 
-    seq = self.mgmtSession._sendMethodRequest(broker, ClassKey(classKey), objId, methodName, args)
+    seq = self.mgmtSession._sendMethodRequest \
+        (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
 
     if seq is not None:
       self.lock()
@@ -379,8 +376,8 @@
     """ Invoked when a connection is established to a broker """
     self.lock()
     try:
-      self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
-      broker.idCache = MintCache()
+      mbroker = MintBroker(broker)
+      self.mintBrokers[mbroker.qmfId] = mbroker
     finally:
       self.unlock()
 
@@ -388,11 +385,9 @@
     """ Invoked when the connection to a broker is lost """
     self.lock()
     try:
-      del self.managedBrokers[str(broker.getBrokerId())]
+      del self.mintBrokers[str(broker.getBrokerId())]
     finally:
       self.unlock()
-    if (self.connCloseListener != None):
-      self.connCloseListener(broker)
 
   def newPackage(self, name):
     """ Invoked when a QMF package is discovered. """
@@ -414,7 +409,8 @@
   def objectProps(self, broker, record):
     """ Invoked when an object is updated. """
 
-    up = update.PropertyUpdate(self, broker, record)
+    mbroker = self.mintBrokers[str(broker.getBrokerId())]
+    up = update.PropertyUpdate(self, mbroker, record)
 
     if record.getClassKey().getClassName() == "job":
       up.priority = 1
@@ -424,7 +420,8 @@
   def objectStats(self, broker, record):
     """ Invoked when an object is updated. """
 
-    up = update.StatisticUpdate(self, broker, record)
+    mbroker = self.mintBrokers[str(broker.getBrokerId())]
+    up = update.StatisticUpdate(self, mbroker, record)
 
     if record.getClassKey().getClassName() == "job":
       up.priority = 1
@@ -439,11 +436,16 @@
     pass
 
   def brokerInfo(self, broker):
+    # XXX why do we do this?
     self.lock()
     try:
-      self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
+      mbroker = MintBroker(broker)
+      self.mintBrokers[mbroker.qmfId] = mbroker
     finally:
       self.unlock()
 
   def methodResponse(self, broker, seq, response):
-    self.updateThread.enqueue(update.MethodUpdate(broker, seq, response))
+    mbroker = self.mintBrokers[str(broker.getBrokerId())]
+    up = update.MethodUpdate(self, mbroker, seq, response)
+
+    self.updateThread.enqueue(up)

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2008-11-25 20:00:55 UTC (rev 2879)
+++ mgmt/trunk/mint/python/mint/update.py	2008-11-25 20:02:24 UTC (rev 2880)
@@ -4,6 +4,7 @@
 import types
 import pickle
 import psycopg2
+import mint
 from Queue import Queue as ConcurrentQueue, Full, Empty
 from threading import Thread
 from traceback import print_exc
@@ -68,20 +69,20 @@
 
             conn.commit()
 
-            for broker, id in self.model.managedBrokers.values():
-              broker.idCache.commit()
+            for broker in self.model.mintBrokers.values():
+              broker.objectDatabaseIds.commit()
 
             profile.commitTime += clock() - start
           else:
             conn.commit()
 
-            for broker, id in self.model.managedBrokers.values():
-              broker.idCache.commit()
+            for broker in self.model.mintBrokers.values():
+              broker.objectDatabaseIds.commit()
       except:
-        conn.rollback()  
+        conn.rollback()
 
-        for broker, id in self.model.managedBrokers.values():
-          broker.idCache.rollback()
+        for broker in self.model.mintBrokers.values():
+          broker.objectDatabaseIds.rollback()
 
         log.exception("Update failed")
 
@@ -151,7 +152,7 @@
 
     foreignKey = name + "_id"
 
-    id = self.broker.idCache.get(oid)
+    id = self.broker.objectDatabaseIds.get(oid)
 
     if id is None:
       raise ReferenceException(oid)
@@ -182,10 +183,10 @@
       log.info("Referenced object %r not found", e.sought)
 
       try:
-        orphans = self.model.orphans[oid]
+        orphans = self.broker.orphans[oid]
         orphans.append(self)
       except KeyError:
-        self.model.orphans[oid] = list((self,))
+        self.broker.orphans[oid] = list((self,))
 
       return
 
@@ -202,7 +203,7 @@
     attrs["sourceScopeId"] = oid.first
     attrs["sourceObjectId"] = oid.second
     attrs["qmfClassKey"] = str(self.object.getClassKey())
-    attrs["managedBroker"] = str(self.broker.getBrokerId())
+    attrs["managedBroker"] = self.broker.qmfId
 
     cursor = conn.cursor()
 
@@ -212,7 +213,7 @@
     # 2. Object is in mint's db, but id is not yet cached
     # 3. Object is in mint's db, and id is cached
 
-    id = self.broker.idCache.get(oid)
+    id = self.broker.objectDatabaseIds.get(oid)
 
     if id is None:
       # Case 1 or 2
@@ -247,7 +248,7 @@
 
         assert cursor.rowcount == 1
 
-      self.broker.idCache.set(oid, id)
+      self.broker.objectDatabaseIds.set(oid, id)
     else:
       # Case 3
 
@@ -259,7 +260,7 @@
       assert cursor.rowcount == 1
 
     try:
-      orphans = self.model.orphans.pop(oid)
+      orphans = self.broker.orphans.pop(oid)
 
       if orphans:
         log.info("Re-enqueueing %i orphans whose creation had been deferred",
@@ -271,25 +272,26 @@
       pass
 
   def processBroker(self, cursor, id):
-    brokerId = str(self.broker.getBrokerId())
+    try:
+      broker = self.model.mintBrokers[self.broker.qmfId]
+    except KeyError:
+      # XXX what does this mean?
+      return
 
-    if brokerId in self.model.managedBrokers:
-      broker, dbId = self.model.managedBrokers[brokerId]
+    if broker.databaseId is None:
+      op = SqlGetBrokerRegistration()
+      op.execute(cursor, {"url": self.broker.getFullUrl()})
 
-      if dbId == 0:
-        op = SqlGetBrokerRegistration()
-        op.execute(cursor, {"url": self.broker.getFullUrl()})
+      rec = cursor.fetchone()
 
-        rec = cursor.fetchone()
+      if rec:
+        rid = rec[0]
 
-        if rec:
-          rid = rec[0]
+        op = SqlAttachBroker()
+        op.execute(cursor, {"id": id, "registrationId": rid})
 
-          op = SqlAttachBroker()
-          op.execute(cursor, {"id": id, "registrationId": rid})
+      broker.databaseId = id
 
-        self.model.managedBrokers[brokerId] = (broker, id)
-
 class StatisticUpdate(ModelUpdate):
   def process(self, conn):
     try:
@@ -301,7 +303,7 @@
     statsCls = getattr(mint, "%sStats" % cls.__name__)
 
     oid = self.object.getObjectId()
-    id = self.broker.idCache.get(oid)
+    id = self.broker.objectDatabaseIds.get(oid)
 
     if id is None:
       # Just drop it; we'll get more stats later
@@ -332,8 +334,8 @@
     op.execute(cursor, {"statsId": statsId, "id": id})
 
 class MethodUpdate(ModelUpdate):
-  def __init__(self, broker, seq, response):
-    super(MethodUpdate, self).__init__(broker, response)
+  def __init__(self, model, broker, seq, response):
+    super(MethodUpdate, self).__init__(model, broker, response)
 
     self.seq = seq
 




More information about the rhmessaging-commits mailing list