[rhmessaging-commits] rhmessaging commits: r1701 - in mgmt: mint/python/mint and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Feb 18 15:37:30 EST 2008


Author: justi9
Date: 2008-02-18 15:37:30 -0500 (Mon, 18 Feb 2008)
New Revision: 1701

Modified:
   mgmt/cumin/python/cumin/__init__.py
   mgmt/mint/python/mint/__init__.py
Log:
Introduces a BrokerConnection object for keeping track of the state of
connections.  This replaces model.connectToBroker.  Adds a connect
method directly on BrokerRegistration that connects using
BrokerConnection.

Also some minor cleanups.



Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py	2008-02-18 20:35:12 UTC (rev 1700)
+++ mgmt/cumin/python/cumin/__init__.py	2008-02-18 20:37:30 UTC (rev 1701)
@@ -9,6 +9,7 @@
 from mint import *
 from time import sleep
 from threading import Thread, Event
+from traceback import print_exc
 
 from model import CuminModel, ModelPage
 from demo import DemoData
@@ -75,9 +76,8 @@
     def run(self):
         try:
             self.do_run()
-        except Exception, e:
-            # XXX add print_exc
-            print e
+        except:
+            print_exc()
 
     def do_run(self):
         while True:
@@ -90,22 +90,10 @@
                     self.attempts[reg] = attempts
                     
                     if attempts < 10:
-                        self.connect(reg)
+                        reg.connect(self.model.data)
                     elif attempts < 100 and attempts % 10 == 0:
-                        self.connect(reg)
+                        reg.connect(self.model.data)
                     elif attempts % 100 == 0:
-                        self.connect(reg)
+                        reg.connect(self.model.data)
 
             self.event.wait(10)
-    
-    def connect(self, reg):
-        print "Trying to connect to broker '%s' at %s:%i" % \
-            (reg.name, reg.host, reg.port or 5672)
-
-        try:
-            self.model.data.connectToBroker \
-                (reg.host, reg.port or 5672)
-
-            print "Connection succeeded"
-        except socket.error:
-            print "Connection failed"

Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py	2008-02-18 20:35:12 UTC (rev 1700)
+++ mgmt/mint/python/mint/__init__.py	2008-02-18 20:37:30 UTC (rev 1701)
@@ -3,6 +3,7 @@
 from datetime import *
 from sqlobject import *
 from threading import Lock
+from traceback import print_exc
 
 from mint import schema
 
@@ -31,6 +32,19 @@
   cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
   profile = ForeignKey("BrokerProfile", cascade="null", default=None)
 
+  def connect(self, model):
+    print "Connecting to broker '%s' at %s:%i" % \
+        (self.name, self.host, self.port or 5672)
+
+    conn = BrokerConnection(model, self.host, self.port or 5672)
+
+    try:
+      conn.open()
+      print "Connection succeeded"
+    except:
+      print "Connection failed: " + str(conn.exception)
+      print_exc()
+
 class BrokerGroup(SQLObject):
   name = StringCol(length=1000, default=None)
   brokers = SQLRelatedJoin("BrokerRegistration",
@@ -105,8 +119,63 @@
   def getByIndexAttrib(self, objType, indexAttrib, indexValue, create=False, args={}):
     ###FIX
     return None
-  
 
+class BrokerConnection(object):
+  def __init__(self, model, host, port):
+    self.model = model
+    self.key = "%s:%i" % (host, port)
+    self.broker = ManagedBroker(host=host, port=port)
+    self.state = None # in (None, "opening", "opened", "closing", "closed")
+    self.exception = None
+
+    self.broker.configListener(self.key, self.model.configCallback)
+    self.broker.instrumentationListener(self.key, self.model.instCallback)
+    self.broker.methodListener(self.key, self.model.methodCallback)
+
+  def isOpen(self):
+    return self.state == "opened"
+
+  def open(self):
+    self.state = "opening"
+
+    try:
+      self.model.lock.acquire()
+      try:
+        # XXX I want this to happen after broker start, but the
+        # callbacks rely on the broker being in the connectedBrokers
+        # dict
+        self.model.connectedBrokers[self.key] = ConnectedBroker(self.broker)
+
+        self.broker.start()
+
+        #self.model.connections[self.key] = self
+
+        self.state = "opened"
+      except Exception, e:
+        self.exception = e
+        raise e
+    finally:
+      self.model.lock.release()
+
+  def close(self):
+    self.state = "closing"
+
+    try:
+      self.model.lock.acquire()
+      try:
+        if not self.broker.isConnected():
+          raise Exception("Broker not connected")
+
+        self.broker.stop()
+        #del self.model.connections[self.key]
+        del self.model.connectedBrokers[self.key]
+        self.state = "closed"
+      except Exception, e:
+        self.exception = e
+        raise e
+    finally:
+      self.model.lock.release()
+
 class ConnectedBroker:
   def __init__(self, managedBroker):
     self.managedBroker = managedBroker
@@ -118,10 +187,9 @@
   def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent, create=False, args={}):
     return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
 
-
 class MintModel:
-  def __init__(self, data_url, debug=False):
-    self.data_url = data_url
+  def __init__(self, dataUri, debug=False):
+    self.dataUri = dataUri
     self.currentMethodId = 1
     self.outstandingMethodCalls = dict()
     self.connectedBrokers = dict()
@@ -130,7 +198,7 @@
 
   def check(self):
     try:
-      connectionForURI(self.data_url)
+      connectionForURI(self.dataUri)
     except Exception, e:
       if hasattr(e, "message") and e.message.find("does not exist"):
         print "Database not found; run cumin-database-init"
@@ -138,7 +206,7 @@
       raise e
 
   def init(self):
-    conn = connectionForURI(self.data_url)
+    conn = connectionForURI(self.dataUri)
     sqlhub.processConnection = conn
 
   def setDebug(self, debug=True):
@@ -165,7 +233,7 @@
       if (key.endswith("Ref")):
         keys.append(key)
     return keys
-    
+
   def configCallback(self, broker, objectName, list, timestamps):
     self.log("\nCONFIG---------------------------------------------------")
     self.log(objectName)
@@ -234,29 +302,6 @@
     self.log("END METHOD---------------------------------------------------\n")
     return result
   
-  def connectToBroker(self, host, port):
-    self.lock.acquire()
-    try:
-      broker = ManagedBroker(host=host, port=port)
-      label = "%s:%d" % (host, port)
-      self.connectedBrokers[label] = ConnectedBroker(broker)
-      broker.configListener(label, self.configCallback)
-      broker.instrumentationListener(label, self.instCallback)
-      broker.methodListener(label, self.methodCallback)
-      try:
-        broker.start()
-      except socket.error:
-        # XXX this is not ideal.  I'd prefer to avoid the dict
-        # assignment above until we can verify we can connect.  That
-        # way we could avoid the locking as well.
-
-        del self.connectedBrokers[label] 
-        raise
-    finally:
-      self.lock.release()
-
-    return label
-
   def registerCallback(self, callback):
     self.currentMethodId += 1
     methodId = self.currentMethodId
@@ -331,4 +376,4 @@
 
       conn.commit()
     finally:
-        conn.close()
+      conn.close()




More information about the rhmessaging-commits mailing list