[rhmessaging-commits] rhmessaging commits: r1701 - in mgmt: mint/python/mint and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Feb 18 15:37:30 EST 2008
Author: justi9
Date: 2008-02-18 15:37:30 -0500 (Mon, 18 Feb 2008)
New Revision: 1701
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/mint/python/mint/__init__.py
Log:
Introduces a BrokerConnection object for keeping track of the state of
connections. This replaces model.connectToBroker. Adds a connect
method directly on BrokerRegistration that connects using
BrokerConnection.
Also some minor cleanups.
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2008-02-18 20:35:12 UTC (rev 1700)
+++ mgmt/cumin/python/cumin/__init__.py 2008-02-18 20:37:30 UTC (rev 1701)
@@ -9,6 +9,7 @@
from mint import *
from time import sleep
from threading import Thread, Event
+from traceback import print_exc
from model import CuminModel, ModelPage
from demo import DemoData
@@ -75,9 +76,8 @@
def run(self):
try:
self.do_run()
- except Exception, e:
- # XXX add print_exc
- print e
+ except:
+ print_exc()
def do_run(self):
while True:
@@ -90,22 +90,10 @@
self.attempts[reg] = attempts
if attempts < 10:
- self.connect(reg)
+ reg.connect(self.model.data)
elif attempts < 100 and attempts % 10 == 0:
- self.connect(reg)
+ reg.connect(self.model.data)
elif attempts % 100 == 0:
- self.connect(reg)
+ reg.connect(self.model.data)
self.event.wait(10)
-
- def connect(self, reg):
- print "Trying to connect to broker '%s' at %s:%i" % \
- (reg.name, reg.host, reg.port or 5672)
-
- try:
- self.model.data.connectToBroker \
- (reg.host, reg.port or 5672)
-
- print "Connection succeeded"
- except socket.error:
- print "Connection failed"
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2008-02-18 20:35:12 UTC (rev 1700)
+++ mgmt/mint/python/mint/__init__.py 2008-02-18 20:37:30 UTC (rev 1701)
@@ -3,6 +3,7 @@
from datetime import *
from sqlobject import *
from threading import Lock
+from traceback import print_exc
from mint import schema
@@ -31,6 +32,19 @@
cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
profile = ForeignKey("BrokerProfile", cascade="null", default=None)
+ def connect(self, model):
+ print "Connecting to broker '%s' at %s:%i" % \
+ (self.name, self.host, self.port or 5672)
+
+ conn = BrokerConnection(model, self.host, self.port or 5672)
+
+ try:
+ conn.open()
+ print "Connection succeeded"
+ except:
+ print "Connection failed: " + str(conn.exception)
+ print_exc()
+
class BrokerGroup(SQLObject):
name = StringCol(length=1000, default=None)
brokers = SQLRelatedJoin("BrokerRegistration",
@@ -105,8 +119,63 @@
def getByIndexAttrib(self, objType, indexAttrib, indexValue, create=False, args={}):
###FIX
return None
-
+class BrokerConnection(object):
+ def __init__(self, model, host, port):
+ self.model = model
+ self.key = "%s:%i" % (host, port)
+ self.broker = ManagedBroker(host=host, port=port)
+ self.state = None # in (None, "opening", "opened", "closing", "closed")
+ self.exception = None
+
+ self.broker.configListener(self.key, self.model.configCallback)
+ self.broker.instrumentationListener(self.key, self.model.instCallback)
+ self.broker.methodListener(self.key, self.model.methodCallback)
+
+ def isOpen(self):
+ return self.state == "opened"
+
+ def open(self):
+ self.state = "opening"
+
+ try:
+ self.model.lock.acquire()
+ try:
+ # XXX I want this to happen after broker start, but the
+ # callbacks rely on the broker being in the connectedBrokers
+ # dict
+ self.model.connectedBrokers[self.key] = ConnectedBroker(self.broker)
+
+ self.broker.start()
+
+ #self.model.connections[self.key] = self
+
+ self.state = "opened"
+ except Exception, e:
+ self.exception = e
+ raise e
+ finally:
+ self.model.lock.release()
+
+ def close(self):
+ self.state = "closing"
+
+ try:
+ self.model.lock.acquire()
+ try:
+ if not self.broker.isConnected():
+ raise Exception("Broker not connected")
+
+ self.broker.stop()
+ #del self.model.connections[self.key]
+ del self.model.connectedBrokers[self.key]
+ self.state = "closed"
+ except Exception, e:
+ self.exception = e
+ raise e
+ finally:
+ self.model.lock.release()
+
class ConnectedBroker:
def __init__(self, managedBroker):
self.managedBroker = managedBroker
@@ -118,10 +187,9 @@
def getByIndexAttrib(self, objType, indexAttrib, indexValue, parent, create=False, args={}):
return self.objs.getByIndexAttrib(objType, indexAttrib, indexValue, create, args)
-
class MintModel:
- def __init__(self, data_url, debug=False):
- self.data_url = data_url
+ def __init__(self, dataUri, debug=False):
+ self.dataUri = dataUri
self.currentMethodId = 1
self.outstandingMethodCalls = dict()
self.connectedBrokers = dict()
@@ -130,7 +198,7 @@
def check(self):
try:
- connectionForURI(self.data_url)
+ connectionForURI(self.dataUri)
except Exception, e:
if hasattr(e, "message") and e.message.find("does not exist"):
print "Database not found; run cumin-database-init"
@@ -138,7 +206,7 @@
raise e
def init(self):
- conn = connectionForURI(self.data_url)
+ conn = connectionForURI(self.dataUri)
sqlhub.processConnection = conn
def setDebug(self, debug=True):
@@ -165,7 +233,7 @@
if (key.endswith("Ref")):
keys.append(key)
return keys
-
+
def configCallback(self, broker, objectName, list, timestamps):
self.log("\nCONFIG---------------------------------------------------")
self.log(objectName)
@@ -234,29 +302,6 @@
self.log("END METHOD---------------------------------------------------\n")
return result
- def connectToBroker(self, host, port):
- self.lock.acquire()
- try:
- broker = ManagedBroker(host=host, port=port)
- label = "%s:%d" % (host, port)
- self.connectedBrokers[label] = ConnectedBroker(broker)
- broker.configListener(label, self.configCallback)
- broker.instrumentationListener(label, self.instCallback)
- broker.methodListener(label, self.methodCallback)
- try:
- broker.start()
- except socket.error:
- # XXX this is not ideal. I'd prefer to avoid the dict
- # assignment above until we can verify we can connect. That
- # way we could avoid the locking as well.
-
- del self.connectedBrokers[label]
- raise
- finally:
- self.lock.release()
-
- return label
-
def registerCallback(self, callback):
self.currentMethodId += 1
methodId = self.currentMethodId
@@ -331,4 +376,4 @@
conn.commit()
finally:
- conn.close()
+ conn.close()
More information about the rhmessaging-commits
mailing list