[rhmessaging-commits] rhmessaging commits: r2915 - mgmt/trunk/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Dec 3 11:29:12 EST 2008


Author: justi9
Date: 2008-12-03 11:29:12 -0500 (Wed, 03 Dec 2008)
New Revision: 2915

Modified:
   mgmt/trunk/mint/python/mint/__init__.py
   mgmt/trunk/mint/python/mint/tools.py
   mgmt/trunk/mint/python/mint/update.py
Log:
Remove getSession on MintModel and instead implement addBroker and
delBroker directly there.

This was done in order to get some more accounting for qmf broker
conns, by id and url.  These are needed to implement a
registration-based reconnect.



Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py	2008-12-03 16:18:20 UTC (rev 2914)
+++ mgmt/trunk/mint/python/mint/__init__.py	2008-12-03 16:29:12 UTC (rev 2915)
@@ -216,7 +216,7 @@
 
   name = StringCol(length=1000, default=None, unique=True, notNone=True)
   url = StringCol(length=1000, default=None)
-  qmfBroker = None
+  mintBroker = None
   broker = ForeignKey("Broker", cascade="null", default=None)
   groups = SQLRelatedJoin("BrokerGroup",
                           intermediateTable="broker_group_mapping",
@@ -229,7 +229,7 @@
   def connect(self, model):
     log.info("Connecting to broker '%s' at %s" % (self.name, self.url))
     try:
-      self.qmfBroker = model.getSession().addBroker(self.url)
+      self.mintBroker = model.addBroker(self.url)
       log.info("Connection succeeded")
     except Exception, e:
       log.info("Connection failed: %s ", e.message)
@@ -238,17 +238,14 @@
   def disconnect(self, model):
     log.info("Disconnecting from broker '%s' at %s" % (self.name, self.url))
     try:
-      model.getSession().delBroker(self.qmfBroker)
+      model.delBroker(self.mintBroker)
       log.info("Disconnection succeeded")
     except Exception, e:
       log.info("Disconnection failed: %s ", e.message)
       print_exc()
 
   def getBrokerId(self):
-    if self.qmfBroker is not None:
-      return str(self.qmfBroker.getBrokerId())
-    else:
-      return None
+    return self.mintBroker.qmfId
 
   def getDefaultVhost(self):
     if self.broker:
@@ -297,14 +294,18 @@
   properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
 
 class MintBroker(object):
-  def __init__(self, qmfBroker):
+  def __init__(self, url, qmfBroker):
+    self.url = url
     self.qmfBroker = qmfBroker
 
-    self.qmfId = str(self.qmfBroker.getBrokerId())
+    self.qmfId = None
     self.databaseId = None
+
     self.objectDatabaseIds = MintCache() # database ids by qmf object id
     self.orphans = dict() # updates by qmf object id
 
+    self.connected = False
+
   def getAmqpSession(self):
     return self.qmfBroker.getAmqpSession()
 
@@ -321,8 +322,13 @@
     assert MintModel.staticInstance is None
     MintModel.staticInstance = self
 
-    self.mintBrokers = dict() # MintBrokers by qmfId
+    # Lookup tables used for recovering MintBroker objects, which have
+    # mint-specific accounting and wrap a qmf broker object
 
+    self.mintBrokersByQmfBroker = dict()
+    self.mintBrokersById = dict()
+    self.mintBrokersByUrl = dict() 
+
     self.__lock = RLock()
     self.dbConn = None
 
@@ -359,13 +365,10 @@
     self.updateThread.stop()
     self.dbExpireThread.stop()
 
-  def getSession(self):
-    return self.mgmtSession
-
   def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
     self.lock()
     try:
-      broker = self.mintBrokers[brokerId]
+      broker = self.mintBrokersById[brokerId]
     finally:
       self.unlock()
 
@@ -380,23 +383,77 @@
         self.unlock()
     return seq
 
-  def brokerConnected(self, broker):
+  def addBroker(self, url):
+    self.lock()
+    try:
+      qbroker = self.mgmtSession.addBroker(url)
+      mbroker = MintBroker(url, qbroker)
+
+      # Can't add the by-id mapping here, as the id is not set yet;
+      # instead we add it when brokerConnected is called
+
+      self.mintBrokersByQmfBroker[qbroker] = mbroker
+      self.mintBrokersByUrl[url] = mbroker
+
+      return mbroker
+    finally:
+      self.unlock()
+
+  def delBroker(self, mbroker):
+    assert isinstance(mbroker, MintBroker)
+
+    self.lock()
+    try:
+      self.mgmtSession.delBroker(mbroker.qmfBroker)
+
+      del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
+      del self.mintBrokersById[mbroker.qmfId]
+      del self.mintBrokersByUrl[mbroker.url]
+    finally:
+      self.unlock()
+
+  def brokerConnected(self, qbroker):
     """ Invoked when a connection is established to a broker """
     self.lock()
     try:
-      mbroker = MintBroker(broker)
-      self.mintBrokers[mbroker.qmfId] = mbroker
+      mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+      assert mbroker.connected is False
+
+      mbroker.connected = True
     finally:
       self.unlock()
 
-  def brokerDisconnected(self, broker):
+  def brokerInfo(self, qbroker):
+    self.lock()
+    try:
+      id = str(qbroker.getBrokerId())
+      mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+      mbroker.qmfId = id
+      self.mintBrokersById[id] = mbroker
+    finally:
+      self.unlock()
+
+  def brokerDisconnected(self, qbroker):
     """ Invoked when the connection to a broker is lost """
     self.lock()
     try:
-      del self.mintBrokers[str(broker.getBrokerId())]
+      mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+      assert mbroker.connected is True
+
+      mbroker.connected = False
     finally:
       self.unlock()
 
+  def getMintBrokerByQmfBroker(self, qbroker):
+    self.lock()
+    try:
+      return self.mintBrokersByQmfBroker[qbroker]
+    finally:
+      self.unlock()
+
   def newPackage(self, name):
     """ Invoked when a QMF package is discovered. """
     pass
@@ -417,7 +474,8 @@
   def objectProps(self, broker, record):
     """ Invoked when an object is updated. """
 
-    mbroker = self.mintBrokers[str(broker.getBrokerId())]
+    mbroker = self.getMintBrokerByQmfBroker(broker)
+
     up = update.PropertyUpdate(self, mbroker, record)
 
     if record.getClassKey().getClassName() == "job":
@@ -428,7 +486,7 @@
   def objectStats(self, broker, record):
     """ Invoked when an object is updated. """
 
-    mbroker = self.mintBrokers[str(broker.getBrokerId())]
+    mbroker = self.getMintBrokerByQmfBroker(broker)
     up = update.StatisticUpdate(self, mbroker, record)
 
     if record.getClassKey().getClassName() == "job":
@@ -443,17 +501,8 @@
   def heartbeat(self, agent, timestamp):
     pass
 
-  def brokerInfo(self, broker):
-    # XXX why do we do this?
-    self.lock()
-    try:
-      mbroker = MintBroker(broker)
-      self.mintBrokers[mbroker.qmfId] = mbroker
-    finally:
-      self.unlock()
-
   def methodResponse(self, broker, seq, response):
-    mbroker = self.mintBrokers[str(broker.getBrokerId())]
+    mbroker = self.getMintBrokerByQmfBroker(broker)
     up = update.MethodUpdate(self, mbroker, seq, response)
 
     self.updateThread.enqueue(up)

Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py	2008-12-03 16:18:20 UTC (rev 2914)
+++ mgmt/trunk/mint/python/mint/tools.py	2008-12-03 16:29:12 UTC (rev 2915)
@@ -89,7 +89,7 @@
         
         try:
             for arg in args[1:]:
-                model.getSession().addBroker(arg)
+                model.addBroker(arg)
 
             enq_last = 0
             deq_last = 0

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2008-12-03 16:18:20 UTC (rev 2914)
+++ mgmt/trunk/mint/python/mint/update.py	2008-12-03 16:29:12 UTC (rev 2915)
@@ -69,19 +69,19 @@
 
             conn.commit()
 
-            for broker in self.model.mintBrokers.values():
+            for broker in self.model.mintBrokersByQmfBroker.values():
               broker.objectDatabaseIds.commit()
 
             profile.commitTime += clock() - start
           else:
             conn.commit()
 
-            for broker in self.model.mintBrokers.values():
+            for broker in self.model.mintBrokersByQmfBroker.values():
               broker.objectDatabaseIds.commit()
       except:
         conn.rollback()
 
-        for broker in self.model.mintBrokers.values():
+        for broker in self.model.mintBrokersByQmfBroker.values():
           broker.objectDatabaseIds.rollback()
 
         log.exception("Update failed")
@@ -203,7 +203,7 @@
     attrs["qmfScopeId"] = oid.first
     attrs["qmfObjectId"] = oid.second
     attrs["qmfClassKey"] = str(self.object.getClassKey())
-    attrs["qmfBrokerId"] = self.broker.qmfId
+    attrs["qmfBrokerId"] = str(self.broker.qmfBroker.getBrokerId())
 
     cursor = conn.cursor()
 
@@ -272,26 +272,24 @@
       pass
 
   def processBroker(self, cursor, id):
-    try:
-      broker = self.model.mintBrokers[self.broker.qmfId]
-    except KeyError:
-      # XXX what does this mean?
-      return
-
-    if broker.databaseId is None:
+    if self.broker.databaseId is None:
       op = SqlGetBrokerRegistration()
-      op.execute(cursor, {"url": self.broker.getFullUrl()})
+      op.execute(cursor, {"url": self.broker.url})
 
       rec = cursor.fetchone()
 
+      #print op.text, {"url": self.broker.url}
+
       if rec:
         rid = rec[0]
 
         op = SqlAttachBroker()
         op.execute(cursor, {"id": id, "registrationId": rid})
 
-      broker.databaseId = id
+        #print op.text, {"id": id, "registrationId": rid}
 
+      self.broker.databaseId = id
+
 class StatisticUpdate(ModelUpdate):
   def process(self, conn):
     try:




More information about the rhmessaging-commits mailing list