Author: justi9
Date: 2008-12-03 16:50:53 -0500 (Wed, 03 Dec 2008)
New Revision: 2918
Added:
mgmt/trunk/mint/python/mint/util.py
Modified:
mgmt/trunk/cumin/python/cumin/__init__.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/python/mint/__init__.py
Log:
Use a poller thread to keep registrations and qmf brokers in sync,
connectionwise.
Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py 2008-12-03 17:24:54 UTC (rev 2917)
+++ mgmt/trunk/cumin/python/cumin/__init__.py 2008-12-03 21:50:53 UTC (rev 2918)
@@ -85,9 +85,6 @@
def start(self):
self.model.start()
- for reg in BrokerRegistration.select():
- reg.connect(self.model.data)
-
def stop(self):
self.model.stop()
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-12-03 17:24:54 UTC (rev 2917)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-12-03 21:50:53 UTC (rev 2918)
@@ -1723,8 +1723,6 @@
try:
object = self.cumin_class.mint_class(**args)
- object.connect(self.model.data)
-
completion("OK")
return object
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-12-03 17:24:54 UTC (rev 2917)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-12-03 21:50:53 UTC (rev 2918)
@@ -14,6 +14,8 @@
from mint.cache import MintCache
from mint.dbexpire import DBExpireThread
from qmf.console import ClassKey
+from util import *
+from time import sleep
log = logging.getLogger("mint")
@@ -216,7 +218,6 @@
name = StringCol(length=1000, default=None, unique=True, notNone=True)
url = StringCol(length=1000, default=None)
- mintBroker = None
broker = ForeignKey("Broker", cascade="null", default=None)
groups = SQLRelatedJoin("BrokerGroup",
intermediateTable="broker_group_mapping",
@@ -226,24 +227,6 @@
url_unique = DatabaseIndex(url, unique=True)
- def connect(self, model):
- log.info("Connecting to broker '%s' at %s" % (self.name,
self.url))
- try:
- self.mintBroker = model.addBroker(self.url)
- log.info("Connection succeeded")
- except Exception, e:
- log.info("Connection failed: %s ", e.message)
- print_exc()
-
- def disconnect(self, model):
- log.info("Disconnecting from broker '%s' at %s" % (self.name,
self.url))
- try:
- model.delBroker(self.mintBroker)
- log.info("Disconnection succeeded")
- except Exception, e:
- log.info("Disconnection failed: %s ", e.message)
- print_exc()
-
def getBrokerId(self):
return self.mintBroker.qmfId
@@ -254,12 +237,6 @@
except IndexError:
return None
- def destroySelf(self):
- if MintModel.staticInstance:
- self.disconnect(MintModel.staticInstance)
-
- super(BrokerRegistration, self).destroySelf()
-
class BrokerGroup(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -318,6 +295,8 @@
def __init__(self, dataUri, dbExpireFrequency, dbExpireThreshold, debug=False):
self.dataUri = dataUri
self.debug = debug
+ self.updateObjects = True
+ self.pollRegistrations = False # XXX
assert MintModel.staticInstance is None
MintModel.staticInstance = self
@@ -330,11 +309,15 @@
self.mintBrokersByUrl = dict()
self.__lock = RLock()
+
self.dbConn = None
+ self.mgmtSession = qmf.console.Session(self, manageConnections=True)
- self.dbExpireThread = dbexpire.DBExpireThread(self, frequency=dbExpireFrequency,
threshold=dbExpireThreshold)
self.updateThread = update.ModelUpdateThread(self)
- self.mgmtSession = qmf.console.Session(self, manageConnections=True)
+ self.dbExpireThread = dbexpire.DBExpireThread \
+ (self, frequency=dbExpireFrequency, threshold=dbExpireThreshold)
+ self.registrationThread = RegistrationThread(self)
+
self.outstandingMethodCalls = dict()
if self.debug:
@@ -360,10 +343,12 @@
def start(self):
self.updateThread.start()
self.dbExpireThread.start()
+ self.registrationThread.start()
def stop(self):
self.updateThread.stop()
self.dbExpireThread.stop()
+ self.registrationThread.stop()
def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
self.lock()
@@ -384,6 +369,8 @@
return seq
def addBroker(self, url):
+ log.info("Adding qmf broker at %s", url)
+
self.lock()
try:
qbroker = self.mgmtSession.addBroker(url)
@@ -402,6 +389,8 @@
def delBroker(self, mbroker):
assert isinstance(mbroker, MintBroker)
+ log.info("Removing qmf broker at %s", mbroker.url)
+
self.lock()
try:
self.mgmtSession.delBroker(mbroker.qmfBroker)
@@ -506,3 +495,35 @@
up = update.MethodUpdate(self, mbroker, seq, response)
self.updateThread.enqueue(up)
+
+class RegistrationThread(MintDaemonThread):
+ def run(self):
+ log.info("Polling for registration changes every 5 seconds")
+
+ while True:
+ try:
+ if self.stopRequested:
+ break
+
+ self.do_run()
+ except Exception, e:
+ log.exception(e)
+
+ def do_run(self):
+ log.info("Polling")
+
+ regUrls = set()
+
+ for reg in BrokerRegistration.select():
+ regUrls.add(reg.url)
+
+ if reg.url not in self.model.mintBrokersByUrl:
+ self.model.addBroker(broker.url)
+
+ for reg in regs:
+
+ for mbroker in self.model.mintBrokersByQmfObject.values():
+ if mbroker.url not in regUrls:
+ self.model.delBroker(mbroker)
+
+ sleep(5)
Added: mgmt/trunk/mint/python/mint/util.py
===================================================================
--- mgmt/trunk/mint/python/mint/util.py (rev 0)
+++ mgmt/trunk/mint/python/mint/util.py 2008-12-03 21:50:53 UTC (rev 2918)
@@ -0,0 +1,19 @@
+import sys, os, logging
+
+from threading import Thread
+
+log = logging.getLogger("mint")
+
+class MintDaemonThread(Thread):
+ def __init__(self, model):
+ super(MintDaemonThread, self).__init__()
+
+ self.model = model
+
+ self.stopRequested = False
+
+ self.setDaemon(True)
+
+ def stop(self):
+ assert self.stopRequested is False
+ self.stopRequested = True