Author: justi9
Date: 2009-04-02 15:49:59 -0400 (Thu, 02 Apr 2009)
New Revision: 3253
Added:
mgmt/trunk/mint/python/mint/database.py
mgmt/trunk/mint/python/mint/expire.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/poll.py
Removed:
mgmt/trunk/mint/python/mint/dbexpire.py
Modified:
mgmt/trunk/cumin/python/cumin/broker.py
mgmt/trunk/cumin/python/cumin/limits.py
mgmt/trunk/cumin/python/cumin/managementserver.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/tools.py
mgmt/trunk/mint/instance/etc/mint.conf
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/main.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/python/mint/util.py
Log:
Don't enable debug by default for the devel mint instance
Modified: mgmt/trunk/cumin/python/cumin/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/broker.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/broker.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -79,7 +79,7 @@
def render_content(self, session, data):
scopeId = data["qmf_scope_id"]
- dt = self.app.model.data.getLatestHeartbeat(scopeId)
+ dt = self.app.model.mint.model.getLatestHeartbeat(scopeId)
if dt is None:
return fmt_none()
@@ -202,7 +202,7 @@
try:
id = broker.qmfBrokerId
- mbroker = self.app.model.data.mintBrokersById[id]
+ mbroker = self.app.model.mint.model.mintBrokersById[id]
connected = mbroker.connected
except KeyError:
pass
Modified: mgmt/trunk/cumin/python/cumin/limits.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/limits.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/limits.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -52,7 +52,7 @@
#TODO: this probably shouldn't be called until after the invoke completes
def completion():
pass
- negotiator.Reconfig(self.app.model.data, completion)
+ negotiator.Reconfig(self.app.model.mint.model, completion)
def get_raw_limits(self, session, negotiator):
action = self.app.model.negotiator.GetLimits
Modified: mgmt/trunk/cumin/python/cumin/managementserver.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/managementserver.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/managementserver.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -41,7 +41,7 @@
url = data["url"]
try:
- server = self.app.model.data.mintBrokersByUrl[url]
+ server = self.app.model.mint.model.mintBrokersByUrl[url]
if server.connected:
html = "Connected"
@@ -274,7 +274,7 @@
url = data["url"]
try:
- server = self.app.model.data.mintBrokersByUrl[url]
+ server = self.app.model.mint.model.mintBrokersByUrl[url]
if server.connected:
html = "Connected"
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/model.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,31 +1,37 @@
+import logging
from datetime import datetime, timedelta
+from mint import Mint, MintConfig
+from struct import unpack, calcsize
+from types import *
+from wooly import *
+from wooly.parameters import *
+from wooly.widgets import *
+
from formats import *
from job import *
from parameters import *
from pool import PoolSlotSet, PoolJobStats
from slot import SlotStatSet
-from struct import unpack, calcsize
from system import SystemSlotSet
from time import *
-from types import *
from util import *
-from wooly import *
-from wooly.parameters import *
-from wooly.widgets import *
-from mint.schema import *
-import logging
-
log = logging.getLogger("cumin.model")
class CuminModel(object):
def __init__(self, app, data_uri):
self.app = app
- self.data = MintModel(data_uri)
- self.data.updateObjects = False
- self.data.expireObjects = False
+ opts = {"data": data_uri, "qmf": None}
+ config = MintConfig()
+ config.init(opts)
+
+ self.mint = Mint(config)
+ self.mint.updateEnabled = False
+ self.mint.pollEnabled = True
+ self.mint.expireEnabled = False
+
self.classes = list()
self.invocations = set()
@@ -68,10 +74,10 @@
CuminSlot(self)
def check(self):
- self.data.check()
+ self.mint.check()
def init(self):
- self.data.init()
+ self.mint.init()
self.frame = self.app.main_page.main
@@ -79,10 +85,10 @@
cls.init()
def start(self):
- self.data.start()
+ self.mint.start()
def stop(self):
- self.data.stop()
+ self.mint.stop()
def add_class(self, cls):
self.classes.append(cls)
@@ -306,7 +312,7 @@
def get_session_by_object(self, object):
assert object
- broker = self.model.data.mintBrokersById[object.qmfBrokerId]
+ broker = self.mint.model.mintBrokersById[object.qmfBrokerId]
return broker.getAmqpSession()
@@ -737,7 +743,7 @@
master = Master.select("System = '%s'" %
system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Start(self.model.data, completion, args["subsystem"])
+ master.Start(self.mint.model, completion, args["subsystem"])
class Stop(CuminAction):
def do_invoke(self, object, args, completion):
@@ -746,7 +752,7 @@
master = Master.select("System = '%s'" %
system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- #master.Stop(self.model.data, completion, args["subsystem"])
+ #master.Stop(self.mint.model, completion, args["subsystem"])
class CuminBroker(RemoteClass):
def __init__(self, model):
@@ -816,7 +822,8 @@
connected = False
if broker:
try:
- mbroker = self.model.data.mintBrokersById[broker.qmfBrokerId]
+ mbroker = self.mint.model.mintBrokersById \
+ [broker.qmfBrokerId]
connected = mbroker.connected
except KeyError:
pass
@@ -897,7 +904,7 @@
else:
authMechanism = "PLAIN"
- broker.connect(self.model.data, completion, host, port, durable,
+ broker.connect(self.mint.model, completion, host, port, durable,
authMechanism, username, password, transport)
class AddQueue(CuminAction):
@@ -1131,7 +1138,7 @@
return "Purge"
def do_invoke(self, queue, args, completion):
- queue.purge(self.model.data, completion, args["request"])
+ queue.purge(self.mint.model, completion, args["request"])
class Remove(CuminAction):
def show(self, session, queue):
@@ -1174,7 +1181,9 @@
def do_invoke(self, queue, args, completion):
broker = queue.vhost.broker
- broker.queueMoveMessages(self.model.data, completion,
args["srcQueue"], args["destQueue"], args["qty"])
+ broker.queueMoveMessages(self.mint.model, completion,
+ args["srcQueue"],
args["destQueue"],
+ args["qty"])
class CuminExchange(RemoteClass):
def __init__(self, model):
@@ -1351,7 +1360,7 @@
return frame.show_remove(session)
def do_invoke(self, bridge, args, completion):
- bridge.close(self.model.data, completion)
+ bridge.close(self.mint.model, completion)
class CuminConnection(RemoteClass):
def __init__(self, model):
@@ -1421,7 +1430,7 @@
def do_invoke(self, conn, args, completion):
session_ids = set()
- for broker in self.model.data.mintBrokersByQmfBroker:
+ for broker in self.mint.model.mintBrokersByQmfBroker:
session_ids.add(broker.getSessionId())
for sess in conn.sessions:
@@ -1430,7 +1439,7 @@
("Cannot close management connection %s" % \
conn.address)
- conn.close(self.model.data, completion)
+ conn.close(self.mint.model, completion)
class CuminSession(RemoteClass):
def __init__(self, model):
@@ -1472,38 +1481,38 @@
return "Close"
def do_invoke(self, sess, args, completion):
- for broker in self.model.data.mintBrokersByQmfBroker:
+ for broker in self.mint.model.mintBrokersByQmfBroker:
if sess.name == broker.getSessionId():
raise Exception \
("Cannot close management session %s" % sess.name)
- sess.close(self.model.data, completion)
+ sess.close(self.mint.model, completion)
class Detach(CuminAction):
def get_title(self, session):
return "Detach"
def do_invoke(self, sess, args, completion):
- for broker in self.model.data.mintBrokersByQmfBroker:
+ for broker in self.mint.model.mintBrokersByQmfBroker:
if sess.name == broker.getSessionId():
raise Exception \
("Cannot detach management session %s" % sess.name)
- sess.detach(self.model.data, completion)
+ sess.detach(self.mint.model, completion)
class ResetLifespan(CuminAction):
def get_title(self, session):
return "Reset Lifespan"
def do_invoke(self, object, args, completion):
- object.resetLifespan(self.model.data, completion)
+ object.resetLifespan(self.mint.model, completion)
class SolicitAck(CuminAction):
def get_title(self, session):
return "Solicit Acknowledgment"
def do_invoke(self, object, args, completion):
- object.solicitAck(self.model.data, completion)
+ object.solicitAck(self.mint.model, completion)
class CuminLink(RemoteClass):
def __init__(self, model):
@@ -1577,7 +1586,7 @@
excludes = args["excludes"]
sync = args["sync"]
- link.bridge(self.model.data, completion,
+ link.bridge(self.mint.model, completion,
durable, src, dest, key,
tag, excludes, False, False, dynamic, sync)
@@ -1590,7 +1599,7 @@
return "Close"
def do_invoke(self, link, args, completion):
- link.close(self.model.data, completion)
+ link.close(self.mint.model, completion)
class CuminBrokerStoreModule(RemoteClass):
def __init__(self, model):
@@ -2110,8 +2119,8 @@
def do_invoke(self, limit, negotiator, completion):
Name = limit.id
Max = limit.max
- negotiator.SetLimit(self.model.data, completion, Name, Max)
- #negotiator.SetLimit(self.model.data, completion, Name, str(Max))
+ negotiator.SetLimit(self.mint.model, completion, Name, Max)
+ #negotiator.SetLimit(self.mint.model, completion, Name, str(Max))
class CuminJobGroup(CuminClass):
def __init__(self, model):
@@ -2418,7 +2427,7 @@
return self.got_data
try:
- job.GetAd(self.model.data, completion, None)
+ job.GetAd(self.mint.model, completion, None)
except Exception, e:
return self.job_ads
@@ -2452,7 +2461,7 @@
try:
data = None
- job.Fetch(self.model.data, completion, file, start, end, data)
+ job.Fetch(self.mint.model, completion, file, start, end, data)
# wait for up to 20 seconds for completion to be called
wait(predicate, timeout=20)
if not self.got_data:
@@ -2475,7 +2484,7 @@
return "Hold"
def do_invoke(self, job, reason, completion):
- job.Hold(self.model.data, completion, reason)
+ job.Hold(self.mint.model, completion, reason)
def get_enabled(self, session, job):
is_held = JobStatusInfo.get_status_int("Held") == job.JobStatus
@@ -2492,7 +2501,7 @@
return "Release"
def do_invoke(self, job, reason, completion):
- job.Release(self.model.data, completion, reason)
+ job.Release(self.mint.model, completion, reason)
def get_enabled(self, session, job):
is_held = JobStatusInfo.get_status_int("Held") == job.JobStatus
@@ -2509,7 +2518,7 @@
return "Remove"
def do_invoke(self, job, reason, completion):
- job.Remove(self.model.data, completion, reason)
+ job.Remove(self.mint.model, completion, reason)
def get_enabled(self, session, job):
is_deleted = job.qmfDeleteTime is not None
@@ -2525,7 +2534,7 @@
def do_invoke(self, job, args, completion):
Name = args[0]
Value = args[1]
- job.SetAttribute(self.model.data, completion, Name, str(Value))
+ job.SetAttribute(self.mint.model, completion, Name, str(Value))
class GetStartedAction(CuminAction):
def get_xml_response(self, session, object, *args):
@@ -2808,7 +2817,7 @@
return self.got_data
try:
- negotiator.GetLimits(self.model.data, completion, None)
+ negotiator.GetLimits(self.mint.model, completion, None)
except Exception, e:
return self.lim
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -155,7 +155,15 @@
def init(self):
super(CuminAdminTool, self).init()
- self.database = MintDatabase(self.config.data)
+ config = MintConfig()
+ config.init({"data": self.config.data})
+
+ app = Mint(config)
+ app.updateEnabled = False
+ app.pollEnabled = False
+ app.expireEnabled = False
+
+ self.database = MintDatabase(app)
self.database.check()
self.database.init()
@@ -189,8 +197,6 @@
try:
opts, args = command.parse(remaining)
-
- print opts, args, remaining
except CommandException, e:
print "Error: %s" % e.message
e.command.print_help()
Modified: mgmt/trunk/mint/instance/etc/mint.conf
===================================================================
--- mgmt/trunk/mint/instance/etc/mint.conf 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/instance/etc/mint.conf 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,3 +1,3 @@
[main]
data: postgresql://cumin@localhost/cumin
-debug: True
+#debug: True
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/__init__.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,615 +1,2 @@
-import logging
-import qmf.console
-import pickle
-import struct
-import sys
-import os
-import types
-import socket
-
-from parsley.config import Config, ConfigParameter
-from psycopg2 import OperationalError
-from qmf.console import ClassKey
-from qpid.datatypes import UUID
-from qpid.util import URL
-from sqlobject import *
-from threading import Lock, RLock
-from time import sleep
-from traceback import print_exc
-
-from mint import update
-from mint.cache import MintCache
-from mint.dbexpire import DBExpireThread
-from mint.schema import *
-from util import *
-
-log = logging.getLogger("mint")
-
-thisModule = __import__(__name__)
-
-for item in dir(schema):
- cls = getattr(schema, item)
-
- try:
- if issubclass(cls, SQLObject) and cls is not SQLObject:
- setattr(thisModule, item, cls)
- except TypeError:
- pass
-
-Broker.sqlmeta.addColumn(ForeignKey("BrokerRegistration",
- cascade="null", default=None,
- name="registration"))
-
-class MintDatabase(object):
- def __init__(self, uri):
- self.uri = uri
-
- def getConnection(self):
- return connectionForURI(self.uri).getConnection()
-
- def check(self):
- self.checkConnection()
-
- def init(self):
- sqlhub.processConnection = connectionForURI(self.uri)
-
- def checkConnection(self):
- conn = self.getConnection()
-
- try:
- cursor = conn.cursor()
- cursor.execute("select now()")
- finally:
- conn.close()
-
- def checkSchema(self):
- pass
-
- def dropSchema(self):
- conn = self.getConnection()
- try:
- cursor = conn.cursor()
-
- cursor.execute("drop schema public cascade")
-
- conn.commit()
- finally:
- conn.close()
-
- def __splitSQLStatements(self, text):
- result = list()
- unmatchedQuote = False
- tmpStmt = ""
-
- for stmt in text.split(";"):
- stmt = stmt.rstrip()
- quotePos = stmt.find("'")
- while quotePos > 0:
- quotePos += 1
- if quotePos < len(stmt):
- if stmt[quotePos] != "'":
- unmatchedQuote = not unmatchedQuote
- else:
- # ignore 2 single quotes
- quotePos += 1
- quotePos = stmt.find("'", quotePos)
-
- if len(stmt.lstrip()) > 0:
- tmpStmt += stmt + ";"
- if not unmatchedQuote:
- # single quote has been matched/closed, generate statement
- result.append(tmpStmt.lstrip())
- tmpStmt = ""
-
- if unmatchedQuote:
- result.append(tmpStmt.lstrip())
- return result
-
- def createSchema(self, file_paths):
- conn = self.getConnection()
-
- scripts = list()
-
- for path in file_paths:
- file = open(path, "r")
- try:
- scripts.append((path, file.read()))
- finally:
- file.close()
-
- try:
- cursor = conn.cursor()
-
- try:
- cursor.execute("create schema public");
- except:
- conn.rollback()
- pass
-
- for path, text in scripts:
- stmts = self.__splitSQLStatements(text)
- count = 0
-
- for stmt in stmts:
- stmt = stmt.strip()
-
- if stmt:
- try:
- cursor.execute(stmt)
- except Exception, e:
- print "Failed executing statement:"
- print stmt
-
- raise e
-
- count += 1
-
- print "Executed %i statements from file '%s'" % (count, path)
-
- conn.commit()
-
- info = MintInfo(version="0.1")
- info.sync()
-
- # Standard roles
-
- user = Role(name="user")
- user.sync()
-
- admin = Role(name="admin")
- admin.sync()
- finally:
- conn.close()
-
- def checkSchema(self):
- conn = self.getConnection()
-
- try:
- cursor = conn.cursor()
-
- try:
- cursor.execute("select version from mint_info");
- except Exception, e:
- print "No schema present"
- return
-
- for rec in cursor:
- print "OK (version %s)" % rec[0]
- return;
-
- print "No schema present"
- finally:
- conn.close()
-
-class Subject(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- password = StringCol(length=1000, default=None)
- lastChallenged = TimestampCol(default=None)
- lastLoggedIn = TimestampCol(default=None)
- lastLoggedOut = TimestampCol(default=None)
- roles = SQLRelatedJoin("Role",
intermediateTable="subject_role_mapping",
- createRelatedTable=False)
-
- def getByName(cls, name):
- try:
- return Subject.selectBy(name=name)[0]
- except IndexError:
- pass
-
- getByName = classmethod(getByName)
-
-class Role(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- subjects = SQLRelatedJoin("Subject",
- intermediateTable="subject_role_mapping",
- createRelatedTable=False)
-
- def getByName(cls, name):
- try:
- return Role.selectBy(name=name)[0]
- except IndexError:
- pass
-
- getByName = classmethod(getByName)
-
-class SubjectRoleMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- subject = ForeignKey("Subject", notNull=True, cascade=True)
- role = ForeignKey("Role", notNull=True, cascade=True)
- unique = DatabaseIndex(subject, role, unique=True)
-
-class ObjectNotFound(Exception):
- pass
-
-class MintInfo(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- version = StringCol(length=1000, default="0.1", notNone=True)
-
-class CollectorRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None)
- collectorId = StringCol(length=1000, default=None)
-
-class BrokerRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- url = StringCol(length=1000, default=None)
- broker = ForeignKey("Broker", cascade="null", default=None)
- cluster = ForeignKey("BrokerCluster", cascade="null",
default=None)
- profile = ForeignKey("BrokerProfile", cascade="null",
default=None)
-
- url_unique = DatabaseIndex(url, unique=True)
-
- def getBrokerId(self):
- return self.mintBroker.qmfId
-
- def getDefaultVhost(self):
- if self.broker:
- try:
- return Vhost.selectBy(broker=self.broker, name="/")[0]
- except IndexError:
- return None
-
-class BrokerGroup(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- brokers = SQLRelatedJoin("Broker",
- intermediateTable="broker_group_mapping",
- createRelatedTable=False)
-
-class BrokerGroupMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- broker = ForeignKey("Broker", notNull=True, cascade=True)
- brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
- unique = DatabaseIndex(broker, brokerGroup, unique=True)
-
-class BrokerCluster(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="cluster_id")
-
-class BrokerProfile(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="profile_id")
- properties = SQLMultipleJoin("ConfigProperty",
joinColumn="profile_id")
-
-class MintBroker(object):
- def __init__(self, url, qmfBroker):
- self.url = url
- self.qmfBroker = qmfBroker
-
- self.qmfId = None
- self.databaseId = None
-
- self.objectDatabaseIds = MintCache() # database ids by qmf object id
- self.orphans = dict() # updates by qmf object id
-
- self.connected = False
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, self.url)
-
- def getAmqpSession(self):
- return self.qmfBroker.getAmqpSession()
-
- def getAmqpSessionId(self):
- return self.qmfBroker.getSessionId()
-
- def getFullUrl(self):
- return self.qmfBroker.getFullUrl()
-
-class MintModel(qmf.console.Console):
- staticInstance = None
-
- def __init__(self, dataUri, debug=False):
- self.dataUri = dataUri
- self.debug = debug
-
- self.updateObjects = True
-
- self.pollRegistrations = True
-
- self.expireObjects = True
- self.expireFrequency = 600
- self.expireThreshold = 24 * 3600
-
- assert MintModel.staticInstance is None
- MintModel.staticInstance = self
-
- # Lookup tables used for recovering MintBroker objects, which have
- # mint-specific accounting and wrap a qmf broker object
-
- self.mintBrokersByQmfBroker = dict()
- self.mintBrokersById = dict()
- self.mintBrokersByUrl = dict()
-
- # Agent heartbeats
- # (broker bank, agent bank) => latest heartbeat timestamp
-
- self.heartbeatsByAgentId = dict()
-
- self.__lock = RLock()
-
- self.dbConn = None
- self.qmfSession = None
-
- self.updateThread = update.ModelUpdateThread(self)
- self.expireThread = dbexpire.DBExpireThread(self)
- self.registrationThread = RegistrationThread(self)
-
- self.outstandingMethodCalls = dict()
-
- def lock(self):
- self.__lock.acquire()
-
- def unlock(self):
- self.__lock.release()
-
- def check(self):
- try:
- connectionForURI(self.dataUri)
- except Exception, e:
- if hasattr(e, "message") and e.message.find("does not exist"):
- print "Database not found; run cumin-database-init"
- raise e
-
- def init(self):
- log.info("Object updates are %s",
- self.updateObjects and "enabled" or "disabled")
-
- log.info("QMF server polling is %s",
- self.pollRegistrations and "enabled" or "disabled")
-
- log.info("Object expiration is %s",
- self.expireObjects and "enabled" or "disabled")
-
- sqlhub.processConnection = self.dbConn = connectionForURI(self.dataUri)
-
- assert self.qmfSession is None
-
- self.qmfSession = qmf.console.Session \
- (self, manageConnections=True, rcvObjects=self.updateObjects)
-
- self.updateThread.init()
- self.expireThread.init()
-
- def start(self):
- self.updateThread.start()
-
- if self.expireObjects:
- self.expireThread.start()
-
- if self.pollRegistrations:
- self.registrationThread.start()
-
- def stop(self):
- self.updateThread.stop()
-
- if self.expireObjects:
- self.expireThread.stop()
-
- if self.pollRegistrations:
- self.registrationThread.stop()
-
- def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
- self.lock()
- try:
- broker = self.mintBrokersById[brokerId]
- finally:
- self.unlock()
-
- seq = self.qmfSession._sendMethodRequest \
- (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
-
- if seq is not None:
- self.lock()
- try:
- self.outstandingMethodCalls[seq] = callback
- finally:
- self.unlock()
- return seq
-
- def addBroker(self, url):
- log.info("Adding qmf broker at %s", url)
-
- self.lock()
- try:
- qbroker = self.qmfSession.addBroker(url)
- mbroker = MintBroker(url, qbroker)
-
- # Can't add the by-id mapping here, as the id is not set yet;
- # instead we add it when brokerConnected is called
-
- self.mintBrokersByQmfBroker[qbroker] = mbroker
- self.mintBrokersByUrl[url] = mbroker
-
- return mbroker
- finally:
- self.unlock()
-
- def delBroker(self, mbroker):
- assert isinstance(mbroker, MintBroker)
-
- log.info("Removing qmf broker at %s", mbroker.url)
-
- self.lock()
- try:
- self.qmfSession.delBroker(mbroker.qmfBroker)
-
- del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
- del self.mintBrokersByUrl[mbroker.url]
-
- if mbroker.qmfId:
- del self.mintBrokersById[mbroker.qmfId]
- finally:
- self.unlock()
-
- def brokerConnected(self, qbroker):
- """ Invoked when a connection is established to a broker
"""
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- assert mbroker.connected is False
-
- mbroker.connected = True
- finally:
- self.unlock()
-
- def brokerInfo(self, qbroker):
- self.lock()
- try:
- id = str(qbroker.getBrokerId())
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- mbroker.qmfId = id
- self.mintBrokersById[id] = mbroker
- finally:
- self.unlock()
-
- def brokerDisconnected(self, qbroker):
- """ Invoked when the connection to a broker is lost
"""
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- assert mbroker.connected is True
-
- mbroker.connected = False
- finally:
- self.unlock()
-
- def getMintBrokerByQmfBroker(self, qbroker):
- self.lock()
- try:
- return self.mintBrokersByQmfBroker[qbroker]
- finally:
- self.unlock()
-
- def newPackage(self, name):
- """ Invoked when a QMF package is discovered. """
- pass
-
- def newClass(self, kind, classKey):
- """ Invoked when a new class is discovered. Session.getSchema can be
- used to obtain details about the class."""
- pass
-
- def newAgent(self, agent):
- """ Invoked when a QMF agent is discovered. """
- pass
-
- def delAgent(self, agent):
- """ Invoked when a QMF agent disconects. """
- pass
-
- def objectProps(self, broker, record):
- """ Invoked when an object is updated. """
-
- if not self.updateObjects:
- return
-
- mbroker = self.getMintBrokerByQmfBroker(broker)
-
- up = update.PropertyUpdate(self, mbroker, record)
-
- if record.getClassKey().getClassName() == "job":
- up.priority = 1
-
- self.updateThread.enqueue(up)
-
- def objectStats(self, broker, record):
- """ Invoked when an object is updated. """
-
- if not self.updateObjects:
- return
-
- if record.getClassKey().getClassName() == "job":
- return
-
- mbroker = self.getMintBrokerByQmfBroker(broker)
- up = update.StatisticUpdate(self, mbroker, record)
-
- self.updateThread.enqueue(up)
-
- def event(self, broker, event):
- """ Invoked when an event is raised. """
- pass
-
- def heartbeat(self, agent, timestamp):
- key = (agent.getBrokerBank(), agent.getAgentBank())
- timestamp = timestamp / 1000000000
- self.heartbeatsByAgentId[key] = datetime.fromtimestamp(timestamp)
-
- def getLatestHeartbeat(self, scopeId):
- scope = int(scopeId)
- broker = (scope & 0x0000FFFFF0000000) >> 28
- agent = scope & 0x000000000FFFFFFF
- key = (broker, agent)
-
- return self.heartbeatsByAgentId.get(key)
-
- def methodResponse(self, broker, seq, response):
- mbroker = self.getMintBrokerByQmfBroker(broker)
- up = update.MethodUpdate(self, mbroker, seq, response)
-
- self.updateThread.enqueue(up)
-
-class RegistrationThread(MintDaemonThread):
- def run(self):
- log.info("Polling for registration changes every 5 seconds")
-
- while True:
- try:
- if self.stopRequested:
- break
-
- self.do_run()
- except OperationalError, e:
- log.exception(e)
- if str(e).find("server closed the connection unexpectedly") >= 0:
- sys.exit()
- except Exception, e:
- log.exception(e)
-
- def do_run(self):
- regUrls = set()
-
- for reg in BrokerRegistration.select():
- if reg.url not in self.model.mintBrokersByUrl:
- try:
- self.model.addBroker(reg.url)
- except socket.error, e:
- log.info("Can't connect to broker at %s: %s", reg.url, e)
- pass
-
- regUrls.add(reg.url)
-
- for mbroker in self.model.mintBrokersByQmfBroker.values():
- if mbroker.url not in regUrls:
- self.model.delBroker(mbroker)
-
- sleep(5)
+from main import *
+from model import *
Added: mgmt/trunk/mint/python/mint/database.py
===================================================================
--- mgmt/trunk/mint/python/mint/database.py (rev 0)
+++ mgmt/trunk/mint/python/mint/database.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,145 @@
+from sqlobject import connectionForURI, sqlhub
+
+from model import MintInfo, Role
+
+class MintDatabase(object):
+ def __init__(self, app):
+ self.app = app
+
+ def getConnection(self):
+ return connectionForURI(self.app.config.data).getConnection()
+
+ def check(self):
+ self.checkConnection()
+
+ def init(self):
+ sqlhub.processConnection = connectionForURI(self.app.config.data)
+
+ def checkConnection(self):
+ conn = self.getConnection()
+
+ try:
+ cursor = conn.cursor()
+ cursor.execute("select now()")
+ finally:
+ conn.close()
+
+ def checkSchema(self):
+ pass
+
+ def dropSchema(self):
+ conn = self.getConnection()
+
+ try:
+ cursor = conn.cursor()
+
+ cursor.execute("drop schema public cascade")
+
+ conn.commit()
+ finally:
+ conn.close()
+
+ def __splitSQLStatements(self, text):
+ result = list()
+ unmatchedQuote = False
+ tmpStmt = ""
+
+ for stmt in text.split(";"):
+ stmt = stmt.rstrip()
+ quotePos = stmt.find("'")
+ while quotePos > 0:
+ quotePos += 1
+ if quotePos < len(stmt):
+ if stmt[quotePos] != "'":
+ unmatchedQuote = not unmatchedQuote
+ else:
+ # ignore 2 single quotes
+ quotePos += 1
+ quotePos = stmt.find("'", quotePos)
+
+ if len(stmt.lstrip()) > 0:
+ tmpStmt += stmt + ";"
+ if not unmatchedQuote:
+ # single quote has been matched/closed, generate statement
+ result.append(tmpStmt.lstrip())
+ tmpStmt = ""
+
+ if unmatchedQuote:
+ result.append(tmpStmt.lstrip())
+ return result
+
+ def createSchema(self, file_paths):
+ conn = self.getConnection()
+
+ scripts = list()
+
+ for path in file_paths:
+ file = open(path, "r")
+ try:
+ scripts.append((path, file.read()))
+ finally:
+ file.close()
+
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("create schema public");
+ except:
+ conn.rollback()
+ pass
+
+ for path, text in scripts:
+ stmts = self.__splitSQLStatements(text)
+ count = 0
+
+ for stmt in stmts:
+ stmt = stmt.strip()
+
+ if stmt:
+ try:
+ cursor.execute(stmt)
+ except Exception, e:
+ print "Failed executing statement:"
+ print stmt
+
+ raise e
+
+ count += 1
+
+ print "Executed %i statements from file '%s'" % (count, path)
+
+ conn.commit()
+
+ info = MintInfo(version="0.1")
+ info.sync()
+
+ # Standard roles
+
+ user = Role(name="user")
+ user.sync()
+
+ admin = Role(name="admin")
+ admin.sync()
+ finally:
+ conn.close()
+
+ def checkSchema(self):
+ conn = self.getConnection()
+
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("select version from mint_info");
+ except Exception, e:
+ print "No schema present"
+ return
+
+ for rec in cursor:
+ print "OK (version %s)" % rec[0]
+ return;
+
+ print "No schema present"
+ finally:
+ conn.close()
Deleted: mgmt/trunk/mint/python/mint/dbexpire.py
===================================================================
--- mgmt/trunk/mint/python/mint/dbexpire.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/dbexpire.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,58 +0,0 @@
-import logging
-import mint
-import time
-from threading import Thread
-from mint.schema import *
-from sql import *
-from util import *
-
-log = logging.getLogger("mint.dbexpire")
-
-class DBExpireThread(MintDaemonThread):
- def __init__(self, model):
- super(DBExpireThread, self).__init__(model)
-
- self.keepCurrStats = False
-
- self.ops = []
- self.attrs = dict()
-
- def init(self):
- frequency = self.model.expireFrequency
- threshold = self.model.expireThreshold
-
- for cls in mint.schema.statsClasses:
- self.ops.append(SqlExpire(eval(cls), self.keepCurrStats))
- self.ops.append(SqlExpire(Job, self.keepCurrStats))
-
- self.attrs["threshold"] = threshold
-
- frequency_out, frequency_unit = self.__convertTimeUnits(frequency)
- threshold_out, threshold_unit = self.__convertTimeUnits(threshold)
- log.debug("Expiring database records older than %d %s, every %d %s" % \
- (threshold_out, threshold_unit, frequency_out, frequency_unit))
-
- def run(self):
- frequency = self.model.expireFrequency
-
- while True:
- if self.stopRequested:
- break
- up = mint.update.DBExpireUpdate(self.model)
- self.model.updateThread.enqueue(up)
- time.sleep(frequency)
-
- def __convertTimeUnits(self, t):
- if t / (24*3600) >= 1:
- t_out = t / (24*3600)
- t_unit = "days"
- elif t / 3600 >= 1:
- t_out = t / 3600
- t_unit = "hours"
- elif t / 60 >= 1:
- t_out = t / 60
- t_unit = "minutes"
- else:
- t_out = t
- t_unit = "seconds"
- return (t_out, t_unit)
Copied: mgmt/trunk/mint/python/mint/expire.py (from rev 3248,
mgmt/trunk/mint/python/mint/dbexpire.py)
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py (rev 0)
+++ mgmt/trunk/mint/python/mint/expire.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,58 @@
+import logging
+import mint
+import time
+from threading import Thread
+from mint.schema import *
+from sql import *
+from util import *
+
+log = logging.getLogger("mint.expire")
+
+class ExpireThread(MintDaemonThread):
+ def __init__(self, app):
+ super(ExpireThread, self).__init__(app)
+
+ self.keepCurrStats = False
+
+ self.ops = []
+ self.attrs = dict()
+
+ def init(self):
+ frequency = self.app.expireFrequency
+ threshold = self.app.expireThreshold
+
+ for cls in mint.schema.statsClasses:
+ self.ops.append(SqlExpire(eval(cls), self.keepCurrStats))
+ self.ops.append(SqlExpire(Job, self.keepCurrStats))
+
+ self.attrs["threshold"] = threshold
+
+ frequency_out, frequency_unit = self.__convertTimeUnits(frequency)
+ threshold_out, threshold_unit = self.__convertTimeUnits(threshold)
+ log.debug("Expiring database records older than %d %s, every %d %s" % \
+ (threshold_out, threshold_unit, frequency_out, frequency_unit))
+
+ def run(self):
+ frequency = self.app.expireFrequency
+
+ while True:
+ if self.stopRequested:
+ break
+ up = mint.update.ExpireUpdate(self.app.model)
+ self.app.updateThread.enqueue(up)
+ time.sleep(frequency)
+
+ def __convertTimeUnits(self, t):
+ if t / (24*3600) >= 1:
+ t_out = t / (24*3600)
+ t_unit = "days"
+ elif t / 3600 >= 1:
+ t_out = t / 3600
+ t_unit = "hours"
+ elif t / 60 >= 1:
+ t_out = t / 60
+ t_unit = "minutes"
+ else:
+ t_out = t
+ t_unit = "seconds"
+ return (t_out, t_unit)
Property changes on: mgmt/trunk/mint/python/mint/expire.py
___________________________________________________________________
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Modified: mgmt/trunk/mint/python/mint/main.py
===================================================================
--- mgmt/trunk/mint/python/mint/main.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/main.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,30 +1,77 @@
import sys
import os
+import logging
from parsley.config import Config, ConfigParameter
from parsley.loggingex import enable_logging
-from mint import MintModel
+from database import MintDatabase
+from model import MintModel
+from update import UpdateThread
+from poll import PollThread
+from expire import ExpireThread
+log = logging.getLogger("mint.main")
+
class Mint(object):
def __init__(self, config):
self.config = config
+ self.database = MintDatabase(self)
+ self.model = MintModel(self)
- self.model = MintModel(self.config.data)
- self.model.expireFrequency = self.config.expire_frequency
- self.model.expireThreshold = self.config.expire_threshold
+ self.updateEnabled = True
+ self.updateThread = UpdateThread(self)
+ self.pollEnabled = False
+ self.pollThread = PollThread(self)
+
+ self.expireEnabled = True
+ self.expireFrequency = self.config.expire_frequency
+ self.expireThreshold = self.config.expire_threshold
+ self.expireThread = ExpireThread(self)
+
def check(self):
+ self.database.check()
self.model.check()
def init(self):
+ self.database.init()
self.model.init()
+ def state(cond):
+ return cond and "enabled" or "disabled"
+
+ log.info("Updates are %s", state(self.updateEnabled))
+ log.info("Polling is %s", state(self.pollEnabled))
+ log.info("Expiration is %s", state(self.expireEnabled))
+
+ self.updateThread.init()
+ self.pollThread.init()
+ self.expireThread.init()
+
def start(self):
self.model.start()
+ if self.updateEnabled:
+ self.updateThread.start()
+
+ if self.pollEnabled:
+ self.pollThread.start()
+
+ if self.expireEnabled:
+ self.expireThread.start()
+
def stop(self):
self.model.stop()
+ if self.updateEnabled:
+ self.updateThread.stop()
+
+ if self.pollEnabled:
+ self.pollThread.stop()
+
+ if self.expireEnabled:
+ self.expireThread.stop()
+
class MintConfig(Config):
def __init__(self):
super(MintConfig, self).__init__()
@@ -35,6 +82,9 @@
param = ConfigParameter(self, "data", str)
param.default = "postgresql://mint@localhost/mint"
+ param = ConfigParameter(self, "qmf", str)
+ param.default = "amqp://localhost"
+
param = ConfigParameter(self, "log-file", str)
param.default = os.path.join(self.home, "log", "mint.log")
@@ -50,13 +100,15 @@
param = ConfigParameter(self, "expire-threshold", int)
param.default = 24 * 3600 # 1 day
- def init(self, opts):
+ def init(self, opts=None):
super(MintConfig, self).init()
self.load_file(os.path.join(self.home, "etc", "mint.conf"))
self.load_file(os.path.join(os.path.expanduser("~"),
".mint.conf"))
- self.load_dict(opts)
+ if opts:
+ self.load_dict(opts)
+
if self.debug:
enable_logging("mint", "debug", sys.stderr)
else:
Added: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py (rev 0)
+++ mgmt/trunk/mint/python/mint/model.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,397 @@
+import logging
+import qmf.console
+import pickle
+import struct
+import sys
+import os
+import types
+import socket
+
+from qmf.console import ClassKey
+from qpid.datatypes import UUID
+from qpid.util import URL
+from sqlobject import *
+from threading import Lock, RLock
+from time import sleep
+from traceback import print_exc
+
+from mint import update
+from mint.cache import MintCache
+from mint.schema import *
+from util import *
+
+log = logging.getLogger("mint.model")
+
+thisModule = __import__(__name__)
+
+for item in dir(mint.schema):
+ cls = getattr(mint.schema, item)
+
+ try:
+ if issubclass(cls, SQLObject) and cls is not SQLObject:
+ setattr(thisModule, item, cls)
+ except TypeError:
+ pass
+
+Broker.sqlmeta.addColumn(ForeignKey("BrokerRegistration",
+ cascade="null", default=None,
+ name="registration"))
+
+class Subject(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ password = StringCol(length=1000, default=None)
+ lastChallenged = TimestampCol(default=None)
+ lastLoggedIn = TimestampCol(default=None)
+ lastLoggedOut = TimestampCol(default=None)
+ roles = SQLRelatedJoin("Role",
intermediateTable="subject_role_mapping",
+ createRelatedTable=False)
+
+ def getByName(cls, name):
+ try:
+ return Subject.selectBy(name=name)[0]
+ except IndexError:
+ pass
+
+ getByName = classmethod(getByName)
+
+class Role(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ subjects = SQLRelatedJoin("Subject",
+ intermediateTable="subject_role_mapping",
+ createRelatedTable=False)
+
+ def getByName(cls, name):
+ try:
+ return Role.selectBy(name=name)[0]
+ except IndexError:
+ pass
+
+ getByName = classmethod(getByName)
+
+class SubjectRoleMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ subject = ForeignKey("Subject", notNull=True, cascade=True)
+ role = ForeignKey("Role", notNull=True, cascade=True)
+ unique = DatabaseIndex(subject, role, unique=True)
+
+class ObjectNotFound(Exception):
+ pass
+
+class MintInfo(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ version = StringCol(length=1000, default="0.1", notNone=True)
+
+class CollectorRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None)
+ collectorId = StringCol(length=1000, default=None)
+
+class BrokerRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ url = StringCol(length=1000, default=None)
+ broker = ForeignKey("Broker", cascade="null", default=None)
+ cluster = ForeignKey("BrokerCluster", cascade="null",
default=None)
+ profile = ForeignKey("BrokerProfile", cascade="null",
default=None)
+
+ url_unique = DatabaseIndex(url, unique=True)
+
+ def getBrokerId(self):
+ return self.mintBroker.qmfId
+
+ def getDefaultVhost(self):
+ if self.broker:
+ try:
+ return Vhost.selectBy(broker=self.broker, name="/")[0]
+ except IndexError:
+ return None
+
+class BrokerGroup(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ brokers = SQLRelatedJoin("Broker",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
+
+class BrokerGroupMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ broker = ForeignKey("Broker", notNull=True, cascade=True)
+ brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+ unique = DatabaseIndex(broker, brokerGroup, unique=True)
+
+class BrokerCluster(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="cluster_id")
+
+class BrokerProfile(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="profile_id")
+ properties = SQLMultipleJoin("ConfigProperty",
joinColumn="profile_id")
+
+class MintBroker(object):
+ def __init__(self, url, qmfBroker):
+ self.url = url
+ self.qmfBroker = qmfBroker
+
+ self.qmfId = None
+ self.databaseId = None
+
+ self.objectDatabaseIds = MintCache() # database ids by qmf object id
+ self.orphans = dict() # updates by qmf object id
+
+ self.connected = False
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.url)
+
+ def getAmqpSession(self):
+ return self.qmfBroker.getAmqpSession()
+
+ def getAmqpSessionId(self):
+ return self.qmfBroker.getSessionId()
+
+ def getFullUrl(self):
+ return self.qmfBroker.getFullUrl()
+
+class MintModel(qmf.console.Console):
+ staticInstance = None
+
+ def __init__(self, app):
+ assert MintModel.staticInstance is None
+ MintModel.staticInstance = self
+
+ self.app = app
+
+ # Lookup tables used for recovering MintBroker objects, which have
+ # mint-specific accounting and wrap a qmf broker object
+
+ self.mintBrokersByQmfBroker = dict()
+ self.mintBrokersById = dict()
+ self.mintBrokersByUrl = dict()
+
+ # Agent heartbeats
+ # (broker bank, agent bank) => latest heartbeat timestamp
+
+ self.heartbeatsByAgentId = dict()
+
+ self.__lock = RLock()
+
+ self.dbConn = None
+ self.qmfSession = None
+
+ self.outstandingMethodCalls = dict()
+
+ def lock(self):
+ self.__lock.acquire()
+
+ def unlock(self):
+ self.__lock.release()
+
+ def check(self):
+ pass
+
+ def init(self):
+ assert self.qmfSession is None
+
+ self.qmfSession = qmf.console.Session \
+ (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+
+ def start(self):
+ pass
+
+ def stop(self):
+ for mbroker in self.mintBrokersById.values():
+ self.delBroker(mbroker)
+
+ def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
+ self.lock()
+ try:
+ broker = self.mintBrokersById[brokerId]
+ finally:
+ self.unlock()
+
+ seq = self.qmfSession._sendMethodRequest \
+ (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
+
+ if seq is not None:
+ self.lock()
+ try:
+ self.outstandingMethodCalls[seq] = callback
+ finally:
+ self.unlock()
+ return seq
+
+ def addBroker(self, url):
+ log.info("Adding qmf broker at %s", url)
+
+ self.lock()
+ try:
+ qbroker = self.qmfSession.addBroker(url)
+ mbroker = MintBroker(url, qbroker)
+
+ # Can't add the by-id mapping here, as the id is not set yet;
+ # instead we add it when brokerConnected is called
+
+ self.mintBrokersByQmfBroker[qbroker] = mbroker
+ self.mintBrokersByUrl[url] = mbroker
+
+ return mbroker
+ finally:
+ self.unlock()
+
+ def delBroker(self, mbroker):
+ assert isinstance(mbroker, MintBroker)
+
+ log.info("Removing qmf broker at %s", mbroker.url)
+
+ self.lock()
+ try:
+ self.qmfSession.delBroker(mbroker.qmfBroker)
+
+ del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
+ del self.mintBrokersByUrl[mbroker.url]
+
+ if mbroker.qmfId:
+ del self.mintBrokersById[mbroker.qmfId]
+ finally:
+ self.unlock()
+
+ def brokerConnected(self, qbroker):
+ """ Invoked when a connection is established to a broker
"""
+ self.lock()
+ try:
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ assert mbroker.connected is False
+
+ mbroker.connected = True
+ finally:
+ self.unlock()
+
+ def brokerInfo(self, qbroker):
+ self.lock()
+ try:
+ id = str(qbroker.getBrokerId())
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ mbroker.qmfId = id
+ self.mintBrokersById[id] = mbroker
+ finally:
+ self.unlock()
+
+ def brokerDisconnected(self, qbroker):
+ """ Invoked when the connection to a broker is lost
"""
+ self.lock()
+ try:
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ assert mbroker.connected is True
+
+ mbroker.connected = False
+ finally:
+ self.unlock()
+
+ def getMintBrokerByQmfBroker(self, qbroker):
+ self.lock()
+ try:
+ return self.mintBrokersByQmfBroker[qbroker]
+ finally:
+ self.unlock()
+
+ def newPackage(self, name):
+ """ Invoked when a QMF package is discovered. """
+ pass
+
+ def newClass(self, kind, classKey):
+ """ Invoked when a new class is discovered. Session.getSchema can be
+ used to obtain details about the class."""
+ pass
+
+ def newAgent(self, agent):
+ """ Invoked when a QMF agent is discovered. """
+ pass
+
+ def delAgent(self, agent):
+ """ Invoked when a QMF agent disconects. """
+ pass
+
+ def objectProps(self, broker, record):
+ """ Invoked when an object is updated. """
+
+ if not self.app.updateThread.isAlive():
+ return
+
+ mbroker = self.getMintBrokerByQmfBroker(broker)
+
+ up = update.PropertyUpdate(self, mbroker, record)
+
+ if record.getClassKey().getClassName() == "job":
+ up.priority = 1
+
+ self.app.updateThread.enqueue(up)
+
+ def objectStats(self, broker, record):
+ """ Invoked when an object is updated. """
+
+ if not self.app.updateThread.isAlive():
+ return
+
+ if record.getClassKey().getClassName() == "job":
+ return
+
+ mbroker = self.getMintBrokerByQmfBroker(broker)
+ up = update.StatisticUpdate(self, mbroker, record)
+
+ self.app.updateThread.enqueue(up)
+
+ def event(self, broker, event):
+ """ Invoked when an event is raised. """
+ pass
+
+ def heartbeat(self, agent, timestamp):
+ key = (agent.getBrokerBank(), agent.getAgentBank())
+ timestamp = timestamp / 1000000000
+ self.heartbeatsByAgentId[key] = datetime.fromtimestamp(timestamp)
+
+ def getLatestHeartbeat(self, scopeId):
+ scope = int(scopeId)
+ broker = (scope & 0x0000FFFFF0000000) >> 28
+ agent = scope & 0x000000000FFFFFFF
+ key = (broker, agent)
+
+ return self.heartbeatsByAgentId.get(key)
+
+ def methodResponse(self, broker, seq, response):
+ # XXX don't do this via the update thread?
+
+ if not self.updateThread.isAlive():
+ return
+
+ mbroker = self.getMintBrokerByQmfBroker(broker)
+ up = update.MethodUpdate(self, mbroker, seq, response)
+
+ self.app.updateThread.enqueue(up)
Added: mgmt/trunk/mint/python/mint/poll.py
===================================================================
--- mgmt/trunk/mint/python/mint/poll.py (rev 0)
+++ mgmt/trunk/mint/python/mint/poll.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,46 @@
+import logging
+from time import sleep
+from psycopg2 import OperationalError
+from mint.model import BrokerRegistration
+
+from util import MintDaemonThread
+
+log = logging.getLogger("mint.poll")
+
+class PollThread(MintDaemonThread):
+ interval = 5
+
+ def run(self):
+ log.info("Polling for changes every %i seconds", self.interval)
+
+ while True:
+ try:
+ if self.stopRequested:
+ break
+
+ self.do_run()
+ except OperationalError, e:
+ log.exception(e)
+ if str(e).find("server closed the connection unexpectedly") >= 0:
+ sys.exit() # XXX This seems like the wrong thing to do
+ except Exception, e:
+ log.exception(e)
+
+ sleep(self.interval)
+
+ def do_run(self):
+ regUrls = set()
+
+ for reg in BrokerRegistration.select():
+ if reg.url not in self.app.model.mintBrokersByUrl:
+ try:
+ self.app.model.addBroker(reg.url)
+ except socket.error, e:
+ log.info("Can't connect to broker at %s: %s", reg.url, e)
+
+ regUrls.add(reg.url)
+
+ for mbroker in self.app.model.mintBrokersByQmfBroker.values():
+ if mbroker.url not in regUrls:
+ self.app.model.delBroker(mbroker)
+
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/tools.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -89,6 +89,9 @@
def do_run(self, opts, args):
app = Mint(self.config)
+
+ app.pollEnabled = True
+
app.check()
app.init()
app.start()
@@ -109,24 +112,17 @@
def do_run(self, opts, args):
app = Mint(self.config)
- app.model.pollRegistrations = False
-
app.check()
app.init()
app.start()
- added = list()
-
try:
for arg in args[1:]:
- added.append(app.model.addBroker(arg))
+ app.model.addBroker(arg)
while True:
sleep(2)
finally:
- for broker in added:
- app.model.delBroker(broker)
-
app.stop()
class MintBenchTool(BaseMintTool):
@@ -142,14 +138,10 @@
def do_run(self, opts, args):
app = Mint(self.config)
- app.model.pollRegistrations = False
-
app.check()
app.init()
app.start()
- added = list()
-
head = "%6s %6s %6s %6s %6s %6s %6s %6s %6s" % \
("enqs", "deqs", "depth", "drop",
"defer",
"prop", "stat", "meth", "exp")
@@ -158,7 +150,7 @@
try:
for arg in args[1:]:
try:
- added.append(app.model.addBroker(arg))
+ app.model.addBroker(arg)
except socket.error, e:
print "Warning: Failed connecting to broker at
'%s'" % arg
@@ -182,7 +174,7 @@
sleep(1)
- ut = app.model.updateThread
+ ut = app.updateThread
enq = ut.enqueueCount
deq = ut.dequeueCount
@@ -228,9 +220,5 @@
finally:
#from threading import enumerate
#for item in enumerate():
- # print item
- for broker in added:
- app.model.delBroker(broker)
-
app.stop()
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/update.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -17,10 +17,14 @@
log = logging.getLogger("mint.update")
-class ModelUpdateThread(MintDaemonThread):
- def __init__(self, model):
- super(ModelUpdateThread, self).__init__(model)
+class UpdateThread(MintDaemonThread):
+ """
+ Only the update thread writes to the database
+ """
+ def __init__(self, app):
+ super(UpdateThread, self).__init__(app)
+
self.updates = UpdateQueue(slotCount=2)
self.enqueueCount = 0
@@ -36,7 +40,7 @@
self.conn = None
def init(self):
- self.conn = self.model.dbConn.getConnection()
+ self.conn = self.app.database.getConnection()
def enqueue(self, update):
try:
@@ -71,7 +75,7 @@
continue
self.process_update(update)
-
+
def process_update(self, update):
try:
update.process(self)
@@ -93,13 +97,13 @@
def commit(self):
self.conn.commit()
- for broker in self.model.mintBrokersByQmfBroker.values():
+ for broker in self.app.model.mintBrokersByQmfBroker.values():
broker.objectDatabaseIds.commit()
def rollback(self):
self.conn.rollback()
- for broker in self.model.mintBrokersByQmfBroker.values():
+ for broker in self.app.model.mintBrokersByQmfBroker.values():
broker.objectDatabaseIds.rollback()
class ReferenceException(Exception):
@@ -214,10 +218,15 @@
self.processTimestamp("qmfUpdateTime", timestamps[0], attrs)
- if cls is Job and attrs["qmfUpdateTime"] < datetime.now() -
timedelta(seconds=thread.model.expireThread.threshold):
- # drop updates for Jobs that are older then the expiration threshold,
- # since they would be deleted anyway in the next run of the db expiration thread
+ delta = timedelta(seconds=thread.app.expireThreshold)
+
+ if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - delta:
+ # drop updates for Jobs that are older then the expiration
+ # threshold, since they would be deleted anyway in the next run
+ # of the db expiration thread
+
log.debug("Property update is older than expiration threshold; skipping
it")
+
thread.dropCount += 1
return
@@ -384,18 +393,18 @@
thread.methUpdateCount += 1
-class DBExpireUpdate(ModelUpdate):
+class ExpireUpdate(ModelUpdate):
def __init__(self, model):
- super(DBExpireUpdate, self).__init__(model, None, None)
+ super(ExpireUpdate, self).__init__(model, None, None)
def process(self, thread):
cursor = thread.cursor()
- attrs = self.model.expireThread.attrs
+ attrs = thread.app.expireThread.attrs
total = 0
thread.commit()
- for op in self.model.expireThread.ops:
+ for op in thread.app.expireThread.ops:
log.debug("Running expire op %s", op)
count = op.execute(cursor, attrs)
Modified: mgmt/trunk/mint/python/mint/util.py
===================================================================
--- mgmt/trunk/mint/python/mint/util.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/util.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -5,15 +5,18 @@
log = logging.getLogger("mint")
class MintDaemonThread(Thread):
- def __init__(self, model):
+ def __init__(self, app):
super(MintDaemonThread, self).__init__()
- self.model = model
+ self.app = app
self.stopRequested = False
self.setDaemon(True)
+ def init(self):
+ pass
+
def stop(self):
assert self.stopRequested is False
self.stopRequested = True