[rhmessaging-commits] rhmessaging commits: r1458 - in mgmt: mint/python/mint and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Dec 11 17:21:52 EST 2007


Author: justi9
Date: 2007-12-11 17:21:52 -0500 (Tue, 11 Dec 2007)
New Revision: 1458

Modified:
   mgmt/cumin/python/cumin/__init__.py
   mgmt/mint/python/mint/__init__.py
   mgmt/notes/justin-todo.txt
Log:
Adds a broker connection thread to (a) avoid delaying mgmt daemon
startup and to periodically attempt to reconnect to brokers that may
have gone down.



Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py	2007-12-11 21:45:52 UTC (rev 1457)
+++ mgmt/cumin/python/cumin/__init__.py	2007-12-11 22:21:52 UTC (rev 1458)
@@ -8,6 +8,8 @@
 from wooly.parameters import IntegerParameter
 from mint import *
 from sqlobject.main import *
+from time import sleep
+from threading import Thread
 
 from model import CuminModel
 from demo import DemoData
@@ -29,18 +31,8 @@
 
         self.model = CuminModel()
 
-        for reg in BrokerRegistration.select():
-            try:
-                self.model.data.connectToBroker(reg.host, reg.port or 5672)
-            except socket.error:
-                print "Failed connecting to broker '%s' at %s:%i" % \
-                    (reg.name, reg.host, reg.port or 5672)
+        BrokerConnectThread(self.model).start()
 
-            if reg.broker is None:
-                pass
-                # wait for configCallback to create Broker in db, then
-                # connect it to the reg
-
         self.cumin_page = CuminPage(self, "cumin.html")
         self.set_default_page(self.cumin_page)
 
@@ -61,3 +53,27 @@
         app = Cumin(model)
 
         super(CuminServer, self).__init__(app, port)
+
+class BrokerConnectThread(Thread):
+    def __init__(self, model):
+        super(BrokerConnectThread, self).__init__()
+
+        self.model = model
+        self.setDaemon(True)
+
+    def run(self):
+        while True:
+            for reg in BrokerRegistration.select():
+                print "reg", reg
+
+                if reg.broker is None or reg.broker.managedBroker not in \
+                        self.model.data.connectedBrokers:
+
+                    try:
+                        self.model.data.connectToBroker \
+                            (reg.host, reg.port or 5672)
+                    except socket.error:
+                        print "Failed connecting to broker '%s' at %s:%i" % \
+                            (reg.name, reg.host, reg.port or 5672)
+
+                sleep(15)

Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py	2007-12-11 21:45:52 UTC (rev 1457)
+++ mgmt/mint/python/mint/__init__.py	2007-12-11 22:21:52 UTC (rev 1458)
@@ -1,6 +1,8 @@
+import socket
 from qpid.management import ManagedBroker
 from datetime import *
 from sqlobject import *
+from threading import Lock
 
 from mint import schema
 
@@ -116,6 +118,7 @@
     self.outstandingMethodCalls = dict()
     self.connectedBrokers = dict()
     self.debug = debug
+    self.lock = Lock()
 
   def setDebug(self, debug=True):
     self.debug = debug
@@ -193,13 +196,26 @@
     return result
   
   def connectToBroker(self, host, port):
-    broker = ManagedBroker(host=host, port=port)
-    label = "%s:%d" % (host, port)
-    self.connectedBrokers[label] = ConnectedBroker(broker)
-    broker.configListener(label, self.configCallback)
-    broker.instrumentationListener(label, self.instCallback)
-    broker.methodListener(label, self.methodCallback)
-    broker.start()
+    self.lock.acquire()
+    try:
+      broker = ManagedBroker(host=host, port=port)
+      label = "%s:%d" % (host, port)
+      self.connectedBrokers[label] = ConnectedBroker(broker)
+      broker.configListener(label, self.configCallback)
+      broker.instrumentationListener(label, self.instCallback)
+      broker.methodListener(label, self.methodCallback)
+      try:
+        broker.start()
+      except socket.error:
+        # XXX this is not ideal.  I'd prefer to avoid the dict
+        # assignment above until we can verify we can connect.  That
+        # way we could avoid the locking as well.
+
+        del self.connectedBrokers[label] 
+        raise
+    finally:
+      self.lock.release()
+
     return label
 
   def getConnectedBroker(self, label):

Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt	2007-12-11 21:45:52 UTC (rev 1457)
+++ mgmt/notes/justin-todo.txt	2007-12-11 22:21:52 UTC (rev 1458)
@@ -17,17 +17,12 @@
 
  * Add said cache to ChartPages as well, perhaps
 
- * Put broker connects on their own, separate thread so they do not
-   delay startup
-
  * Render stats without values as something other than 0, say a --
 
  * Render the "" exchange as "Default"
 
  * Add javascript for the check-all behavior
 
- * Find some way to reattach brokers that come back after being down
-
  * Email amqp-list, Jonathan, and Lana with doc requirements for mgmt
 
  * Paginate queue bindings in exchange view




More information about the rhmessaging-commits mailing list