Author: justi9
Date: 2008-11-14 15:12:00 -0500 (Fri, 14 Nov 2008)
New Revision: 2805
Modified:
mgmt/trunk/cumin/python/cumin/__init__.py
mgmt/trunk/cumin/python/cumin/broker.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/page.py
mgmt/trunk/cumin/python/cumin/page.strings
mgmt/trunk/cumin/python/cumin/tools.py
mgmt/trunk/cumin/python/wooly/forms.py
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/python/mint/schemaparser.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/sql/schema.sql
mgmt/trunk/parsley/python/parsley/command.py
Log:
Nuno's adaptation of mint to the qmfconsole api, which replaces the
older "management.py" api.
Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/__init__.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -18,6 +18,7 @@
from action import ActionPage
from user import LoginPage, UserSession
from datetime import timedelta
+from qpid.util import URL
from wooly import Session
@@ -121,8 +122,7 @@
def do_run(self):
while True:
for reg in BrokerRegistration.select():
- if reg.broker is None or reg.broker.managedBroker not in \
- self.model.data.connections:
+ if reg.broker is None or reg.getBrokerId() not in
self.model.data.managedBrokers:
attempts = self.attempts.get(reg, 0)
attempts += 1
self.attempts[reg] = attempts
@@ -133,7 +133,6 @@
reg.connect(self.model.data)
elif attempts % 100 == 0:
reg.connect(self.model.data)
-
self.event.wait(10)
class CuminServer(WebServer):
Modified: mgmt/trunk/cumin/python/cumin/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/broker.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/broker.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -658,14 +658,9 @@
if addr:
name = names[i]
- host, port = parse_broker_addr(addr)
+ url = "amqp://%s:%i" % parse_broker_addr(addr)
- args = {
- "name": name,
- "host": host,
- "port": port
- }
-
+ args = {"name": name, "url": url}
reg = action.invoke(None, args)
if len(groups) > i:
@@ -701,13 +696,13 @@
errs = aerrs.setdefault(i, list())
errs.append("The address field is empty; it is required")
else:
- host, port = parse_broker_addr(addr)
+ #host, port = parse_broker_addr(addr)
count = BrokerRegistration.selectBy \
- (host=host, port=port).count()
+ (url=addr).count()
if count:
errs = aerrs.setdefault(i, list())
- errs.append("The broker at %s:%i " % (host, port) +
+ errs.append("The broker at %s " % (url) +
"is already registered")
return not len(nerrs) and not len(aerrs)
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1668,14 +1668,10 @@
super(CuminBrokerRegistration, self).__init__ \
(model, "broker_registration", BrokerRegistration)
- prop = CuminProperty(self, "host")
- prop.title = "Host"
+ prop = CuminProperty(self, "url")
+ prop.title = "URL"
prop.summary = True
- prop = CuminProperty(self, "port")
- prop.title = "Port"
- prop.summary = True
-
action = self.Add(self, "add")
action.title = "Add"
action.navigable = False
Modified: mgmt/trunk/cumin/python/cumin/page.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/page.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -209,6 +209,9 @@
heading = self.Heading(app, "heading")
self.add_child(heading)
+ self.add_tab(self.OverviewTab(app, "over"))
+ self.add_tab(self.AccountTab(app, "acct"))
+
def render_change_password_href(self, session):
branch = session.branch()
self.frame.change_password.show(branch)
@@ -218,6 +221,22 @@
def render_title(self, session):
return "MRG Management"
+ class OverviewTab(Widget):
+ def render_title(self, session):
+ return "Overview"
+
+ def render_content(self, session):
+ pass
+
+ class AccountTab(ActionSet):
+ def render_title(self, session):
+ return "Your Account"
+
+ def render_change_password_href(self, session):
+ branch = session.branch()
+ self.frame.change_password.show(branch)
+ return branch.marshal()
+
class MessagingView(TabbedModeSet):
def __init__(self, app, name):
super(MessagingView, self).__init__(app, name)
Modified: mgmt/trunk/cumin/python/cumin/page.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.strings 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/page.strings 2008-11-14 20:12:00 UTC (rev 2805)
@@ -180,11 +180,15 @@
<div class="oblock">
{heading}
- <ul class="actions">
- <a class="nav" href="{change_password_href}">Change
Password</a>
- </ul>
+ <ul class="TabbedModeSet tabs">{tabs}</ul>
+ <div class="TabbedModeSet mode">{mode}</div>
</div>
+[AccountTab.html]
+<ul class="actions">
+ <a class="nav" href="{change_password_href}">Change
Password</a>
+</ul>
+
[MessagingView.html]
<script type="text/javascript">
<![CDATA[
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -31,9 +31,6 @@
self.config = CuminConfig()
- opt = CommandOption(self, "help", "h")
- opt.description = "Print this message"
-
opt = CommandOption(self, "data")
opt.argument = "URI"
opt.description = "Connect to database at URI"
@@ -84,6 +81,20 @@
opt = CommandOption(command, "force")
opt.description = "Don't complain and just do it"
+ command = self.AddBroker(self, "add-broker")
+ command.arguments = ("NAME", "URL")
+ command.description = "Add a new broker called NAME at URL"
+
+ command = self.RemoveBroker(self, "remove-broker")
+ command.arguments = ("URL",)
+ command.description = "Remove broker called NAME; requires --force"
+
+ opt = CommandOption(command, "force")
+ opt.description = "Don't complain and just do it"
+
+ command = self.ListBrokers(self, "list-brokers")
+ command.description = "List existing broker registrations"
+
command = self.AddUser(self, "add-user")
command.arguments = ("NAME",)
command.description = "Add a new user called NAME"
@@ -179,6 +190,46 @@
def run(self, opts, args):
self.parent.database.checkSchema()
+ class AddBroker(Command):
+ def run(self, opts, args):
+ try:
+ name, url = args[1:]
+ except IndexError:
+ raise CommandException(self, "NAME and URL are required")
+
+ for reg in BrokerRegistration.selectBy(name=name):
+ print "Error: a broker called '%s' already exists" %
name
+ sys.exit(1)
+
+ for reg in BrokerRegistration.selectBy(url=url):
+ print "Error: a broker at %s already exists" % url
+ sys.exit(1)
+
+ reg = BrokerRegistration(name=name, url=url)
+ reg.syncUpdate()
+
+ class RemoveBroker(Command):
+ def run(self, opts, args):
+ try:
+ name = args[1]
+ except IndexError:
+ raise CommandException(self, "NAME is required")
+
+ for reg in BrokerRegistration.selectBy(name=name):
+ break
+
+ if reg:
+ reg.destroySelf()
+ reg.syncUpdate()
+ else:
+ raise CommandException \
+ (self, "Broker '%s' is unknown", reg.name)
+
+ class ListBrokers(Command):
+ def run(self, opts, args):
+ for reg in BrokerRegistration.select():
+ print reg.name, reg.url
+
class AddUser(Command):
def run(self, opts, args):
try:
Modified: mgmt/trunk/cumin/python/wooly/forms.py
===================================================================
--- mgmt/trunk/cumin/python/wooly/forms.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/wooly/forms.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -184,6 +184,9 @@
class PasswordInput(StringInput):
pass
+# XXX Why does this have a boolean param? Shouldn't the name suggest
+# that somehow? Would some folks want a hidden input with a different
+# param? I think this needs to take a param
class HiddenInput(ScalarInput):
def __init__(self, app, name):
super(HiddenInput, self).__init__(app, name, None)
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,14 +1,12 @@
-import sys, os, socket, qpid, logging
-from qpid.datatypes import uuid4
-from qpid.connection import Connection as QpidConnection
-from qpid.util import connect
-from qpid.management import managementChannel, managementClient
-from datetime import *
-from sqlobject import *
+import logging
+import qpid.qmfconsole
+import struct
from threading import Lock, RLock
-from traceback import print_exc, print_stack
-
-from mint import schema
+from sqlobject import *
+from traceback import print_exc
+from qpid.util import URL
+from qpid.datatypes import UUID
+from mint.schema import *
from mint import update
log = logging.getLogger("mint")
@@ -28,95 +26,118 @@
cascade="null", default=None,
name="registration"))
-class MintInfo(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+class MintDatabase(object):
+ def __init__(self, uri):
+ self.uri = uri
- version = StringCol(length=1000, default="0.1", notNone=True)
+ def getConnection(self):
+ return connectionForURI(self.uri).getConnection()
-class BrokerRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ def check(self):
+ self.checkConnection()
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- host = StringCol(length=1000, default=None, notNone=True)
- port = IntCol(default=None, notNone=True)
- broker = ForeignKey("Broker", cascade="null", default=None)
- groups = SQLRelatedJoin("BrokerGroup",
- intermediateTable="broker_group_mapping",
- createRelatedTable=False)
- cluster = ForeignKey("BrokerCluster", cascade="null",
default=None)
- profile = ForeignKey("BrokerProfile", cascade="null",
default=None)
+ def init(self):
+ sqlhub.processConnection = connectionForURI(self.uri)
- host_port_unique = index.DatabaseIndex(host, port, unique=True)
+ def checkConnection(self):
+ conn = self.getConnection()
- def connect(self, model):
- log.info("Connecting to broker '%s' at %s:%i" % \
- (self.name, self.host, self.port or 5672))
+ try:
+ cursor = conn.cursor()
+ cursor.execute("select now()")
+ finally:
+ conn.close()
- conn = BrokerConnection(model, self.host, self.port or 5672)
+ def checkSchema(self):
+ pass
+ def dropSchema(self):
+ conn = self.getConnection()
try:
- conn.open()
- log.info("Connection succeeded")
- except:
- log.info("Connection failed: " + str(conn.exception))
+ cursor = conn.cursor()
+
+ cursor.execute("drop schema public cascade")
- def getDefaultVhost(self):
- if self.broker:
+ conn.commit()
+ finally:
+ conn.close()
+
+ def createSchema(self, file_paths):
+ conn = self.getConnection()
+
+ scripts = list()
+
+ for path in file_paths:
+ file = open(path, "r")
try:
- return Vhost.selectBy(broker=self.broker, name="/")[0]
- except IndexError:
+ scripts.append((path, file.read()))
+ finally:
+ file.close()
+
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("create schema public");
+ except:
+ conn.rollback()
pass
-class BrokerGroup(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ for path, text in scripts:
+ stmts = text.split(";")
+ count = 0
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- brokers = SQLRelatedJoin("BrokerRegistration",
- intermediateTable="broker_group_mapping",
- createRelatedTable=False)
+ for stmt in stmts:
+ stmt = stmt.strip()
-class BrokerGroupMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ if stmt:
+ try:
+ cursor.execute(stmt)
+ except Exception, e:
+ print "Failed executing statement:"
+ print stmt
- brokerRegistration = ForeignKey("BrokerRegistration", notNull=True,
- cascade=True)
- brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
- unique = index.DatabaseIndex(brokerRegistration, brokerGroup, unique=True)
+ raise e
-class BrokerCluster(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ count += 1
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="cluster_id")
+ print "Executed %i statements from file '%s'" % (count, path)
-class BrokerProfile(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ conn.commit()
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="profile_id")
- properties = SQLMultipleJoin("ConfigProperty",
joinColumn="profile_id")
+ info = MintInfo(version="0.1")
+ info.sync()
-class ConfigProperty(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ # Standard roles
- name = StringCol(length=1000, default=None)
- value = StringCol(length=1000, default=None)
- type = StringCol(length=1, default="s")
+ user = Role(name="user")
+ user.sync()
-class CollectorRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ admin = Role(name="admin")
+ admin.sync()
+ finally:
+ conn.close()
- name = StringCol(length=1000, default=None)
- collectorId = StringCol(length=1000, default=None)
+ def checkSchema(self):
+ conn = self.getConnection()
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("select version from mint_info");
+ except Exception, e:
+ print "No schema present"
+ return
+
+ for rec in cursor:
+ print "OK (version %s)" % rec[0]
+ return;
+
+ print "No schema present"
+ finally:
+ conn.close()
+
class Subject(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -165,139 +186,101 @@
class ObjectNotFound(Exception):
pass
-# Not thread safe
-class BrokerConnection(object):
- def __init__(self, model, host, port):
- self.model = model
- self.host = host
- self.port = port
- self.id = "%s:%i" % (host, port)
- self.objectsById = dict()
+class MintInfo(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- # Set upon receiving a broker info control
- self.sessionId = None
- self.brokerId = None
+ version = StringCol(length=1000, default="0.1", notNone=True)
- # state in (None, "opening", "opened", "closing",
"closed")
- self.state = None
- self.exception = None
+class CollectorRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- self.mconn = None
- self.mclient = None
- self.mchan = None
+ name = StringCol(length=1000, default=None)
+ collectorId = StringCol(length=1000, default=None)
- def getObject(self, cls, id):
- compositeId = "%s:%s" % (id.first, id.second)
+class BrokerRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- try:
- obj = self.objectsById[compositeId]
- except KeyError:
- try:
- obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second,
managedBroker=self.id)[0]
- self.objectsById[compositeId] = obj
- except IndexError:
- raise ObjectNotFound()
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ url = StringCol(length=1000, default=None)
+ qmfBroker = None
+ broker = ForeignKey("Broker", cascade="null", default=None)
+ groups = SQLRelatedJoin("BrokerGroup",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
+ cluster = ForeignKey("BrokerCluster", cascade="null",
default=None)
+ profile = ForeignKey("BrokerProfile", cascade="null",
default=None)
- return obj
+ url_unique = index.DatabaseIndex(url, unique=True)
- def isOpen(self):
- return self.state == "opened"
-
- def open(self):
- assert self.mconn is None
- assert self.mclient is None
- assert self.mchan is None
-
- self.state = "opening"
-
+ def connect(self, model):
+ log.info("Connecting to broker '%s' at %s" % (self.name,
self.url))
try:
- spec = qpid.spec.load(self.model.specPath)
- sock = connect(self.host, self.port)
+ self.qmfBroker = model.getSession().addBroker(self.url)
+ log.info("Connection succeeded")
except Exception, e:
- self.exception = e
- raise e
+ log.info("Connection failed: %s ", e.message)
+ print_exc()
- self.mconn = QpidConnection(sock, spec)
- self.mclient = managementClient(spec,
- self.model.controlCallback,
- self.model.propsCallback,
- self.model.statsCallback,
- self.model.methodCallback,
- self.model.closeCallback)
- self.mclient.schemaListener(self.model.schemaCallback)
+ def getBrokerId(self):
+ if self.qmfBroker is not None:
+ return str(self.qmfBroker.getBrokerId())
+ else:
+ return None
- try:
- self.mconn.start()
- self.mchan = self.mclient.addChannel \
- (self.mconn.session(str(uuid4())), self)
+ def getDefaultVhost(self):
+ if self.broker:
+ try:
+ return Vhost.selectBy(broker=self.broker, name="/")[0]
+ except IndexError:
+ return None
- self.state = "opened"
- except Exception, e:
- self.exception = e
- raise e
+class BrokerGroup(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- self.model.lock()
- try:
- self.model.connections[self.id] = self
- finally:
- self.model.unlock()
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ brokers = SQLRelatedJoin("BrokerRegistration",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
- def getSessionId(self):
- if not self.isOpen():
- raise Exception("Connection not open")
+class BrokerGroupMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- return self.mchan.sessionId
+ brokerRegistration = ForeignKey("BrokerRegistration", notNull=True,
+ cascade=True)
+ brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+ unique = index.DatabaseIndex(brokerRegistration, brokerGroup, unique=True)
- def callMethod(self, objId, className, methodName, callback, args):
- if not self.isOpen():
- raise Exception("Connection not open")
+class BrokerCluster(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- self.model.lock()
- try:
- self.model.currentMethodId += 1
- seq = self.model.currentMethodId
- self.model.outstandingMethodCalls[seq] = callback
- finally:
- self.model.unlock()
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="cluster_id")
- self.mclient.callMethod(self.mchan, seq, objId,
- className, methodName, args)
+class BrokerProfile(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- def close(self):
- self.state = "closing"
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration",
joinColumn="profile_id")
+ properties = SQLMultipleJoin("ConfigProperty",
joinColumn="profile_id")
- self.model.lock()
- try:
- del self.model.connections[self.id]
- finally:
- self.model.unlock()
-
- try:
- self.mclient.removeChannel(self.mchan)
-
- self.state = "closed"
- except Exception, e:
- self.exception = e
- raise e
-
- self.mconn.close()
- # XXX What else do I need to try to shutdown here?
-
- self.mconn = None
- self.mclient = None
- self.mchan = None
-
-class MintModel:
+
+class MintModel(qpid.qmfconsole.Console):
staticInstance = None
- def __init__(self, dataUri, specPath, debug=False):
+ def __init__(self, dataUri, specPath="", debug=False):
self.dataUri = dataUri
- self.specPath = specPath
self.debug = debug
- self.currentMethodId = 1
- self.outstandingMethodCalls = dict()
- self.connections = dict()
+ assert MintModel.staticInstance is None
+ MintModel.staticInstance = self
+
self.connCloseListener = None
self.__lock = RLock()
@@ -306,15 +289,14 @@
self.orphanObjectMap = dict()
self.updateThread = update.ModelUpdateThread(self)
+ self.mgmtSession = qpid.qmfconsole.Session(self)
+ self.outstandingMethodCalls = dict()
+ self.managedBrokers = dict()
- assert MintModel.staticInstance is None
- MintModel.staticInstance = self
-
if self.debug:
log.setLevel(logging.DEBUG)
def lock(self):
- #print_stack()
self.__lock.acquire()
def unlock(self):
@@ -326,12 +308,10 @@
except Exception, e:
if hasattr(e, "message") and e.message.find("does not exist"):
print "Database not found; run cumin-database-init"
-
raise e
def init(self):
- conn = connectionForURI(self.dataUri)
- sqlhub.processConnection = conn
+ sqlhub.processConnection = connectionForURI(self.dataUri)
def start(self):
self.updateThread.start()
@@ -342,156 +322,99 @@
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def schemaCallback(self, conn, classInfo, props, stats, methods, events):
- up = update.SchemaUpdate(conn, classInfo, props, stats, methods, events)
- self.updateThread.enqueue(up)
+ def getObject(self, cls, id, broker):
+ obj = None
+ if isinstance(id, qpid.qmfconsole.ObjectId):
+ compositeId = "%s:%s" % (id.first, id.second)
+ try:
+ obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second)[0]
+ except IndexError:
+ raise ObjectNotFound()
+ elif cls.__name__.endswith("Pool"):
+ try:
+ obj = cls.selectBy(sourceId=id)[0]
+ except IndexError:
+ raise ObjectNotFound()
+
+ return obj
- def propsCallback(self, conn, classInfo, props, timestamps):
- up = update.PropertyUpdate(conn, classInfo, props, timestamps)
- self.updateThread.enqueue(up)
+ def getSession(self):
+ return self.mgmtSession
- def statsCallback(self, conn, classInfo, stats, timestamps):
- up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
- self.updateThread.enqueue(up)
-
- def methodCallback(self, conn, methodId, errorId, errorText, args):
- up = update.MethodUpdate(conn, methodId, errorId, errorText, args)
- self.updateThread.enqueue(up)
-
- def closeCallback(self, conn, data):
- up = update.CloseUpdate(conn, data)
- self.updateThread.enqueue(up)
-
- def controlCallback(self, conn, type, data):
- up = update.ControlUpdate(conn, type, data)
- self.updateThread.enqueue(up)
-
- def registerCallback(self, callback):
+ def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
self.lock()
try:
- self.currentMethodId += 1
- methodId = self.currentMethodId
- self.outstandingMethodCalls[methodId] = callback
- return methodId
+ broker = self.managedBrokers[managedBroker]
finally:
self.unlock()
- def getConnectionByRegistration(self, reg):
+ pname, cname, hash = classKey.split(", ")
+ seq = self.mgmtSession._sendMethodRequest(broker, (pname, cname, hash), objId,
methodName, args)
+
+ if seq is not None:
+ self.lock()
+ try:
+ self.outstandingMethodCalls[seq] = callback
+ finally:
+ self.unlock()
+ return seq
+
+ def brokerConnected(self, broker):
+ """ Invoked when a connection is established to a broker
"""
self.lock()
try:
- return self.connections.get("%s:%i" % (reg.host, reg.port))
+ self.managedBrokers[str(broker.getBrokerId())] = broker
finally:
self.unlock()
-class MintDatabase(object):
- def __init__(self, uri):
- self.uri = uri
-
- def getConnection(self):
- return connectionForURI(self.uri).getConnection()
-
- def check(self):
- self.checkConnection()
-
- def init(self):
- conn = connectionForURI(self.uri)
- sqlhub.processConnection = conn
-
- def checkConnection(self):
- conn = self.getConnection()
-
+ def brokerDisconnected(self, broker):
+ """ Invoked when the connection to a broker is lost
"""
+ self.lock()
try:
- cursor = conn.cursor()
- cursor.execute("select now()")
+ del self.managedBrokers[str(broker.getBrokerId())]
finally:
- conn.close()
+ self.unlock()
+ if (self.connCloseListener != None):
+ self.connCloseListener(broker)
- def checkSchema(self):
+ def newPackage(self, name):
+ """ Invoked when a QMF package is discovered. """
pass
- def dropSchema(self):
- conn = self.getConnection()
- try:
- cursor = conn.cursor()
-
- cursor.execute("drop schema public cascade")
+ def newClass(self, kind, classKey):
+ """ Invoked when a new class is discovered. Session.getSchema can be
+ used to obtain details about the class."""
+ pass
- conn.commit()
- finally:
- conn.close()
+ def newAgent(self, agent):
+ """ Invoked when a QMF agent is discovered. """
+ pass
- def createSchema(self, file_paths):
- conn = self.getConnection()
+ def delAgent(self, agent):
+ """ Invoked when a QMF agent disconects. """
+ pass
- scripts = list()
+ def objectProps(self, broker, record):
+ """ Invoked when an object is updated. """
+ self.updateThread.enqueue(update.PropertyUpdate(broker, record))
- for path in file_paths:
- file = open(path, "r")
- try:
- scripts.append((path, file.read()))
- finally:
- file.close()
+ def objectStats(self, broker, record):
+ """ Invoked when an object is updated. """
+ self.updateThread.enqueue(update.StatisticUpdate(broker, record))
- try:
- cursor = conn.cursor()
+ def event(self, broker, event):
+ """ Invoked when an event is raised. """
+ pass
- try:
- cursor.execute("create schema public");
- except:
- conn.rollback()
- pass
+ def heartbeat(self, agent, timestamp):
+ pass
- for path, text in scripts:
- stmts = text.split(";")
- count = 0
-
- for stmt in stmts:
- stmt = stmt.strip()
-
- if stmt:
- try:
- cursor.execute(stmt)
- except Exception, e:
- print "Failed executing statement:"
- print stmt
-
- raise e
-
- count += 1
-
- print "Executed %i statements from file '%s'" % (count, path)
-
- conn.commit()
-
- info = MintInfo(version="0.1")
- info.sync()
-
- # Standard roles
-
- user = Role(name="user")
- user.sync()
-
- admin = Role(name="admin")
- admin.sync()
- finally:
- conn.close()
-
- def checkSchema(self):
- conn = self.getConnection()
-
+ def brokerInfo(self, broker):
+ self.lock()
try:
- cursor = conn.cursor()
-
- try:
- cursor.execute("select version from mint_info");
- except Exception, e:
- print "No schema present"
- return
-
- for rec in cursor:
- print "OK (version %s)" % rec[0]
- return;
-
- print "No schema present"
+ self.managedBrokers[str(broker.getBrokerId())] = broker
finally:
- conn.close()
+ self.unlock()
+
+ def methodResponse(self, broker, seq, response):
+ self.updateThread.enqueue(update.MethodUpdate(broker, seq, response))
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/schema.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,7 +1,7 @@
import mint
from sqlobject import *
from datetime import datetime
-from qpid.management import objectId
+from qpid.qmfconsole import ObjectId
class Pool(SQLObject):
class sqlmeta:
@@ -16,6 +16,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -135,6 +136,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -172,61 +174,49 @@
def GetAd(self, model, callback, JobAd):
- actualArgs = dict()
- actualArgs["JobAd"] = JobAd
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "GetAd",
+ actualArgs = list()
+ actualArgs.append(JobAd)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"GetAd",
callback, args=actualArgs)
def SetAttribute(self, model, callback, Name, Value):
- actualArgs = dict()
- actualArgs["Name"] = Name
- actualArgs["Value"] = Value
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "SetAttribute",
+ actualArgs = list()
+ actualArgs.append(Name)
+ actualArgs.append(Value)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"SetAttribute",
callback, args=actualArgs)
def Hold(self, model, callback, Reason):
- actualArgs = dict()
- actualArgs["Reason"] = Reason
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Hold",
+ actualArgs = list()
+ actualArgs.append(Reason)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Hold",
callback, args=actualArgs)
def Release(self, model, callback, Reason):
- actualArgs = dict()
- actualArgs["Reason"] = Reason
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Release",
+ actualArgs = list()
+ actualArgs.append(Reason)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"Release",
callback, args=actualArgs)
def Remove(self, model, callback, Reason):
- actualArgs = dict()
- actualArgs["Reason"] = Reason
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Remove",
+ actualArgs = list()
+ actualArgs.append(Reason)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"Remove",
callback, args=actualArgs)
def Fetch(self, model, callback, File, Start, End, Data):
- actualArgs = dict()
- actualArgs["File"] = File
- actualArgs["Start"] = Start
- actualArgs["End"] = End
- actualArgs["Data"] = Data
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Fetch",
+ actualArgs = list()
+ actualArgs.append(File)
+ actualArgs.append(Start)
+ actualArgs.append(End)
+ actualArgs.append(Data)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"Fetch",
callback, args=actualArgs)
class JobStats(SQLObject):
@@ -247,6 +237,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -298,6 +289,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -332,6 +324,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -350,30 +343,24 @@
def GetLimits(self, model, callback, Limits):
- actualArgs = dict()
- actualArgs["Limits"] = Limits
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "GetLimits",
+ actualArgs = list()
+ actualArgs.append(Limits)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"GetLimits",
callback, args=actualArgs)
def SetLimit(self, model, callback, Name, Max):
- actualArgs = dict()
- actualArgs["Name"] = Name
- actualArgs["Max"] = Max
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "SetLimit",
+ actualArgs = list()
+ actualArgs.append(Name)
+ actualArgs.append(Max)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"SetLimit",
callback, args=actualArgs)
def Reconfig(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Reconfig",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"Reconfig",
callback, args=actualArgs)
class NegotiatorStats(SQLObject):
@@ -401,6 +388,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -433,6 +421,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -453,21 +442,17 @@
def Start(self, model, callback, Subsystem):
- actualArgs = dict()
- actualArgs["Subsystem"] = Subsystem
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Start",
+ actualArgs = list()
+ actualArgs.append(Subsystem)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"Start",
callback, args=actualArgs)
def Stop(self, model, callback, Subsystem):
- actualArgs = dict()
- actualArgs["Subsystem"] = Subsystem
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Stop",
+ actualArgs = list()
+ actualArgs.append(Subsystem)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Stop",
callback, args=actualArgs)
class MasterStats(SQLObject):
@@ -495,6 +480,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -510,11 +496,9 @@
def reloadACLFile(self, model, callback):
"""Reload the ACL file"""
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "reloadACLFile",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"reloadACLFile",
callback, args=actualArgs)
class AclStats(SQLObject):
@@ -536,6 +520,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -552,19 +537,15 @@
def stopClusterNode(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "stopClusterNode",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"stopClusterNode",
callback, args=actualArgs)
def stopFullCluster(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "stopFullCluster",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"stopFullCluster",
callback, args=actualArgs)
class ClusterStats(SQLObject):
@@ -585,6 +566,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -631,6 +613,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -652,12 +635,10 @@
def expand(self, model, callback, by):
"""Increase number of files allocated for this
journal"""
- actualArgs = dict()
- actualArgs["by"] = by
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "expand",
+ actualArgs = list()
+ actualArgs.append(by)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"expand",
callback, args=actualArgs)
class JournalStats(SQLObject):
@@ -706,6 +687,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -738,6 +720,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -757,41 +740,35 @@
def echo(self, model, callback, sequence, body):
"""Request a response to test the path to the management
broker"""
- actualArgs = dict()
- actualArgs["sequence"] = sequence
- actualArgs["body"] = body
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "echo",
+ actualArgs = list()
+ actualArgs.append(sequence)
+ actualArgs.append(body)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "echo",
callback, args=actualArgs)
def connect(self, model, callback, host, port, durable, authMechanism, username,
password, transport):
"""Establish a connection to another broker"""
- actualArgs = dict()
- actualArgs["host"] = host
- actualArgs["port"] = port
- actualArgs["durable"] = durable
- actualArgs["authMechanism"] = authMechanism
- actualArgs["username"] = username
- actualArgs["password"] = password
- actualArgs["transport"] = transport
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "connect",
+ actualArgs = list()
+ actualArgs.append(host)
+ actualArgs.append(port)
+ actualArgs.append(durable)
+ actualArgs.append(authMechanism)
+ actualArgs.append(username)
+ actualArgs.append(password)
+ actualArgs.append(transport)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"connect",
callback, args=actualArgs)
def queueMoveMessages(self, model, callback, srcQueue, destQueue, qty):
"""Move messages from one queue to another"""
- actualArgs = dict()
- actualArgs["srcQueue"] = srcQueue
- actualArgs["destQueue"] = destQueue
- actualArgs["qty"] = qty
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "queueMoveMessages",
+ actualArgs = list()
+ actualArgs.append(srcQueue)
+ actualArgs.append(destQueue)
+ actualArgs.append(qty)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"queueMoveMessages",
callback, args=actualArgs)
class BrokerStats(SQLObject):
@@ -812,6 +789,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -844,6 +822,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -873,6 +852,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -889,12 +869,10 @@
def purge(self, model, callback, request):
"""Discard all or some messages on a queue"""
- actualArgs = dict()
- actualArgs["request"] = request
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "purge",
+ actualArgs = list()
+ actualArgs.append(request)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"purge",
callback, args=actualArgs)
class QueueStats(SQLObject):
@@ -942,6 +920,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -985,6 +964,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1017,6 +997,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1032,11 +1013,9 @@
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"close",
callback, args=actualArgs)
class ClientConnectionStats(SQLObject):
@@ -1062,6 +1041,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1076,29 +1056,25 @@
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"close",
callback, args=actualArgs)
def bridge(self, model, callback, durable, src, dest, key, tag, excludes, srcIsQueue,
srcIsLocal, dynamic):
"""Bridge messages over the link"""
- actualArgs = dict()
- actualArgs["durable"] = durable
- actualArgs["src"] = src
- actualArgs["dest"] = dest
- actualArgs["key"] = key
- actualArgs["tag"] = tag
- actualArgs["excludes"] = excludes
- actualArgs["srcIsQueue"] = srcIsQueue
- actualArgs["srcIsLocal"] = srcIsLocal
- actualArgs["dynamic"] = dynamic
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "bridge",
+ actualArgs = list()
+ actualArgs.append(durable)
+ actualArgs.append(src)
+ actualArgs.append(dest)
+ actualArgs.append(key)
+ actualArgs.append(tag)
+ actualArgs.append(excludes)
+ actualArgs.append(srcIsQueue)
+ actualArgs.append(srcIsLocal)
+ actualArgs.append(dynamic)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"bridge",
callback, args=actualArgs)
class LinkStats(SQLObject):
@@ -1121,6 +1097,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1141,11 +1118,9 @@
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"close",
callback, args=actualArgs)
class BridgeStats(SQLObject):
@@ -1166,6 +1141,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1182,35 +1158,27 @@
def solicitAck(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "solicitAck",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"solicitAck",
callback, args=actualArgs)
def detach(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "detach",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"detach",
callback, args=actualArgs)
def resetLifespan(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "resetLifespan",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"resetLifespan",
callback, args=actualArgs)
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey,
"close",
callback, args=actualArgs)
class SessionStats(SQLObject):
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -139,6 +139,7 @@
else:
self.generateAttrib("sourceScopeId", "BigIntCol")
self.generateAttrib("sourceObjectId", "BigIntCol")
+ self.generateAttrib("qmfClassKey", "BLOBCol")
self.generateTimestampAttrib("creation")
self.generateTimestampAttrib("deletion")
self.generateAttrib("managedBroker", "StringCol",
"length=1000")
@@ -154,10 +155,11 @@
else:
comment = ""
formalArgs = ", "
- actualArgs = " actualArgs = dict()\n"
+ actualArgs = " actualArgs = list()\n"
for arg in elem.query["arg"]:
formalArgs += "%s, " % (arg["@name"])
- actualArgs += " actualArgs[\"%s\"] = %s\n" %
(arg["@name"], arg["@name"])
+ actualArgs += " actualArgs.append(%s)\n" % (arg["@name"])
+
if (formalArgs != ", "):
formalArgs = formalArgs[:-2]
else:
@@ -165,10 +167,8 @@
self.pythonOutput += "\n def %s(self, model, callback%s):\n" %
(elem["@name"], formalArgs)
self.pythonOutput += comment
self.pythonOutput += actualArgs
- self.pythonOutput += " conn = model.connections[self.managedBroker]\n"
- self.pythonOutput += " classInfo =
self.classInfos[self.managedBroker]\n"
- self.pythonOutput += " originalId = objectId(None, self.sourceScopeId,
self.sourceObjectId)\n"
- self.pythonOutput += " conn.callMethod(originalId, classInfo,
\"%s\",\n" % elem["@name"]
+ self.pythonOutput += " originalId = ObjectId(None, self.sourceScopeId,
self.sourceObjectId)\n"
+ self.pythonOutput += " model.callMethod(self.managedBroker, originalId,
self.qmfClassKey, \"%s\",\n" % elem["@name"]
self.pythonOutput += " callback, args=actualArgs)\n"
def endClass(self):
@@ -183,7 +183,7 @@
self.pythonOutput += "import mint\n"
self.pythonOutput += "from sqlobject import *\n"
self.pythonOutput += "from datetime import datetime\n"
- self.pythonOutput += "from qpid.management import objectId\n\n"
+ self.pythonOutput += "from qpid.qmfconsole import ObjectId\n\n"
self.pythonOutput += "class Pool(SQLObject):\n"
self.pythonOutput += " class sqlmeta:\n"
self.pythonOutput += " lazyUpdate = True\n"
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/update.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,16 +1,13 @@
import logging
-
+import datetime
+import types
+import struct
from Queue import Queue as ConcurrentQueue, Full, Empty
from threading import Thread
-from datetime import datetime
-from qpid.management import managementClient
-from struct import unpack
-from schema import schemaReservedWordsMap
-from sqlobject import DateTimeCol, TimestampCol, Col
+from traceback import print_exc
+from qpid.datatypes import UUID
+from mint.schema import *
-import types
-import mint
-
log = logging.getLogger("mint.update")
def time_unwarp(t):
@@ -33,14 +30,10 @@
self.updates.put(update)
except Full:
log.exception("Queue is full")
+ pass
def run(self):
while True:
- #size = self.updates.qsize()
- #
- #if size > 1:
- # log.debug("Queue depth is %i", self.updates.qsize())
-
try:
update = self.updates.get(True, 1)
except Empty:
@@ -54,303 +47,202 @@
update.process(self.model)
except:
log.exception("Update failed")
+ pass
def stop(self):
self.stopRequested = True
-class UnknownClassException(Exception):
- pass
+class ModelUpdate(object):
+ def __init__(self, broker, obj):
+ self.broker = broker
+ self.qmfObj = obj
-def unmarshalClassInfo(classInfo):
- package = classInfo[0]
- name = classInfo[1].capitalize()
+ def getStatsClass(self, cls):
+ return getattr(mint, cls.__name__ + "Stats")
- if name == "Connection":
- name = "ClientConnection"
+ def process(self):
+ pass
- cls = getattr(mint, name)
+ def processAttributes(self, attrs, cls, model):
+ # translate keys into their string representation
+ for key in attrs.keys():
+ attrs[key.__repr__()] = attrs.pop(key)
- if cls is None:
- raise UnknownClassException("Class '%s' is unknown" % name)
+ orphan = False
+ for name in attrs.keys():
+ rename = ""
+ orig_name = name
+ if name in mint.schema.schemaReservedWordsMap:
+ rename = mint.schema.schemaReservedWordsMap.get(name)
+ attrs[rename] = attrs.pop(name)
+ name = rename
- return package, cls
+ if len(name) > 3 and name.endswith("Ref"):
+ # Navigate to referenced objects
+ clsname = name[0].upper() + name[1:-3]
+ id = attrs.pop(name)
-def processAttributes(conn, attrs, cls, model, updateObj):
- attrs.pop("id")
+ othercls = getattr(mint, clsname, None)
- if "connectionRef" in attrs:
- attrs["clientConnectionRef"] = attrs.pop("connectionRef")
+ if othercls:
+ attrname = name[0:-3]
- orphan = False
- for name in attrs.keys():
- rename = schemaReservedWordsMap.get(name)
+ try:
+ attrs[attrname] = model.getObject(othercls, id, self.broker)
+ except KeyError:
+ log.info("Referenced object %s '%s' not found by key
'%s'" % (clsname, id, attrname))
+ except mint.ObjectNotFound:
+ if not orphan:
+ log.info("Referenced object %s '%s' not found, deferring
creation of orphan object" % (clsname, id))
+ # store object in orphan map, will be picked up later when parent info is
received
+ if (clsname, id.first, id.second) not in model.orphanObjectMap:
+ model.orphanObjectMap[(clsname, id.first, id.second)] = set()
+ model.orphanObjectMap[(clsname, id.first, id.second)].add(self)
+ orphan = True
+ else:
+ log.error("Class '%s' not found" % clsname)
+ elif not hasattr(cls, orig_name):
+ # Remove attrs that we don't have in our schema
+ log.debug("Class '%s' has no field '%s'" %
(cls.__name__, name))
+ del attrs[name]
+ #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
+ elif name in ("DaemonStartTime", "EnteredCurrentActivity",
"EnteredCurrentState", "JobStart",
+ "LastBenchmark", "LastFetchWorkCompleted",
"LastFetchWorkSpawned", "LastPeriodicCheckpoint",
+ "MyCurrentTime", "QDate",
"JobQueueBirthdate", "MonitorSelfTime") \
+ and (type(attrs[name]) is types.LongType or type(attrs[name]) is
types.IntType or attrs[name] == 0):
+ attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
+ elif name.endswith("Time") and type(attrs[name]) is types.IntType and
attrs[name] == 0:
+ attrs[name] = datetime.fromtimestamp(attrs[name])
+ #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
+ elif isinstance(attrs[name], UUID):
+ # convert UUIDs into their string representation, to be handled by sqlobject
+ attrs[name] = str(attrs[name])
+ if orphan:
+ return None
+ else:
+ return attrs
- if rename:
- attrs[rename] = attrs.pop(name)
- name = rename
- if len(name) > 3 and name.endswith("Ref"):
- # Navigate to referenced objects
+class PropertyUpdate(ModelUpdate):
+ def __init__(self, broker, obj):
+ ModelUpdate.__init__(self, broker, obj)
- clsname = name[0].upper() + name[1:-3]
- id = attrs.pop(name)
-
- othercls = getattr(mint, clsname, None)
-
- if othercls:
- attrname = name[0:-3]
-
- try:
- attrs[attrname] = conn.getObject(othercls, id)
- except KeyError:
- log.info("Referenced object %s '%s' not found by key
'%s'" % (clsname, id, attrname))
- except mint.ObjectNotFound:
- if not orphan:
- log.info("Referenced object %s '%s' not found, deferring
creation of orphan object" % (clsname, id))
- # store object in orphan map, will be picked up later when parent info is
received
- if (clsname, id.first, id.second) not in model.orphanObjectMap:
- model.orphanObjectMap[(clsname, id.first, id.second)] = set()
- model.orphanObjectMap[(clsname, id.first, id.second)].add(updateObj)
- orphan = True
- else:
- log.error("Class '%s' not found" % clsname)
- elif not hasattr(cls, name):
- # Remove attrs that we don't have in our schema
- log.debug("Class '%s' has no field '%s'" % (cls.__name__,
name))
- del attrs[name]
- #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
- elif name in ("DaemonStartTime", "EnteredCurrentActivity",
"EnteredCurrentState", "JobStart",
- "LastBenchmark", "LastFetchWorkCompleted",
"LastFetchWorkSpawned", "LastPeriodicCheckpoint",
- "MyCurrentTime", "QDate",
"JobQueueBirthdate", "MonitorSelfTime") \
- and (type(attrs[name]) is types.LongType or type(attrs[name]) is
types.IntType or attrs[name] == 0):
- attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
- elif name.endswith("Time") and type(attrs[name]) is types.IntType and
attrs[name] == 0:
- attrs[name] = datetime.fromtimestamp(attrs[name])
- #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema
representation
- if orphan:
- return None
- else:
- return attrs
-
-def getStatsClass(cls):
- return getattr(mint, cls.__name__ + "Stats")
-
-class SchemaUpdate(object):
- def __init__(self, conn, classInfo, props, stats, methods, events):
- self.conn = conn
- self.classInfo = classInfo
- self.props = props
- self.stats = stats
- self.methods = methods
- self.events = events
-
def process(self, model):
- cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
- log.info("Processing %-8s %-16s %-16s" % ("schema", self.conn.id,
cls))
-
try:
- pkg, cls = unmarshalClassInfo(self.classInfo)
- cls.classInfos[self.conn.id] = self.classInfo
- except UnknownClassException, e:
- log.warn(e)
- return
+ cls = self.qmfObj.getSchema().getKey()[1]
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = eval(cls[0].upper()+cls[1:])
+ attrs = dict(self.qmfObj.getProperties())
+ timestamps = self.qmfObj.getTimestamps()
+ id = self.qmfObj.getObjectId()
- # XXX do more schema checking
+ if self.processAttributes(attrs, cls, model) == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
-class PropertyUpdate(object):
- def __init__(self, conn, classInfo, props, timestamps):
- self.conn = conn
- self.classInfo = classInfo
- self.props = props
- self.timestamps = timestamps
+ obj = None
+ try:
+ obj = model.getObject(cls, id, self.broker)
+ except mint.ObjectNotFound:
+ obj = cls()
+ log.debug("%s(%i) created", cls.__name__, obj.id)
+ attrs["sourceScopeId"] = id.first
+ attrs["sourceObjectId"] = id.second
+ pkg, cls, hash = self.qmfObj.getClassKey()
+ attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, cls, hash)
+ attrs["managedBroker"] = str(self.broker.getBrokerId())
+ except Exception, e:
+ print e
- def process(self, model):
- cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
- args = ("props", self.conn.id, cls, len(self.props))
- log.info("Processing %-8s %-16s %-16s %3i" % args)
+ attrs["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
+ attrs["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
+ if timestamps[2] != 0:
+ attrs["deletionTime"] =
datetime.fromtimestamp(timestamps[2]/1000000000)
+ log.debug("%s(%i) marked deleted", cls, obj.id)
- try:
- pkg, cls = unmarshalClassInfo(self.classInfo)
- except UnknownClassException, e:
- log.warn(e)
- return
+ obj.set(**attrs)
+ obj.syncUpdate()
- attrs = dict(self.props)
+ if (cls, id.first, id.second) in model.orphanObjectMap:
+ # this object is the parent of orphan objects in the map, re-enqueue for
insertion
+ orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
+ for orphanObj in orphanObjects:
+ model.updateThread.enqueue(orphanObj)
+ log.info("Inserted %d orphan objects whose creation had been deferred"
% (len(orphanObjects)))
- id = attrs["id"]
- if processAttributes(self.conn, attrs, cls, model, self) == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ if cls == "broker":
+ if str(self.broker.getBrokerId()) in model.managedBrokers:
+ model.managedBrokers[str(self.broker.getBrokerId())].broker = self.broker
+ reg = mint.BrokerRegistration.selectBy(url=self.broker.getFullUrl())[0]
+ reg.broker = obj
+ reg.syncUpdate()
- # XXX move these down to the try/except
+ except:
+ print_exc()
- attrs["managedBroker"] = self.conn.id
+ attrs["managedBroker"] = str(self.broker.getBrokerId())
attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (self.timestamps[0]/1000000000))
+ (timestamps[0]/1000000000))
attrs["creationTime"] = datetime.fromtimestamp \
- (self.timestamps[1]/1000000000)
+ (timestamps[1]/1000000000)
- if self.timestamps[2] != 0:
- attrs["deletionTime"] = datetime.fromtimestamp \
- (self.timestamps[2]/1000000000)
- try:
- obj = self.conn.getObject(cls, id)
- except mint.ObjectNotFound:
- obj = cls()
+class StatisticUpdate(ModelUpdate):
+ def __init__(self, broker, obj):
+ ModelUpdate.__init__(self, broker, obj)
- log.debug("%s(%i) created", cls.__name__, obj.id)
-
- attrs["sourceScopeId"] = id.first
- attrs["sourceObjectId"] = id.second
-
- #hash = "%08x%08x%08x%08x" % unpack("!LLLL",
self.classInfo[2])
- #classInfo = (self.classInfo[0], self.classInfo[1], hash)
- #obj.sourceClassInfo = ",".join(classInfo)
-
- obj.set(**attrs)
- obj.syncUpdate()
-
- if obj.deletionTime:
- log.debug("%s(%i) marked deleted", cls.__name__, obj.id)
-
- if (cls.__name__, id.first, id.second) in model.orphanObjectMap:
- # this object is the parent of orphan objects in the map, re-enqueue for insertion
- orphanObjects = model.orphanObjectMap.pop((cls.__name__, id.first, id.second))
- for orphanObj in orphanObjects:
- model.updateThread.enqueue(orphanObj)
- log.info("Inserted %d orphan objects whose creation had been deferred" %
(len(orphanObjects)))
-
- # XXX refactor this to take advantage of the get/create logic
- # above
- if isinstance(obj, mint.Broker) and obj.managedBroker:
- host, port = obj.managedBroker.split(":")
- port = int(port)
-
- if not obj.registration:
- try:
- reg = mint.BrokerRegistration.selectBy(host=host, port=port)[0]
- except IndexError:
- reg = None
-
- if reg:
- reg.broker = obj
- obj.registration = reg
-
- reg.syncUpdate()
- obj.syncUpdate()
-
-class StatisticUpdate(object):
- def __init__(self, conn, classInfo, stats, timestamps):
- self.conn = conn
- self.classInfo = classInfo
- self.stats = stats
- self.timestamps = timestamps
-
def process(self, model):
- cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
- args = ("stats", self.conn.id, cls, len(self.stats))
- log.info("Processing %-8s %-16s %-16s %3i" % args)
-
try:
- pkg, cls = unmarshalClassInfo(self.classInfo)
- except UnknownClassException, e:
- log.warn(e)
- return
+ cls = self.qmfObj.getSchema().getKey()[1]
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = eval(cls[0].upper()+cls[1:])
+ attrs = dict(self.qmfObj.getStatistics())
+ timestamps = self.qmfObj.getTimestamps()
+ id = self.qmfObj.getObjectId()
- attrs = dict(self.stats)
+ obj = None
+ try:
+ obj = model.getObject(cls, id, self.broker)
+ except mint.ObjectNotFound:
+ # handle this
+ raise mint.ObjectNotFound
- id = attrs["id"]
- obj = self.conn.getObject(cls, id)
- statscls = getStatsClass(cls)
- if processAttributes(self.conn, attrs, statscls, model, self) == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ statscls = self.getStatsClass(cls)
+ if self.processAttributes(attrs, statscls, model) == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (self.timestamps[0]/1000000000))
- # Set the stat->obj reference
- attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+ attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
+ (timestamps[0]/1000000000))
- statsobj = statscls()
- statsobj.set(**attrs)
- statsobj.syncUpdate()
+ # Set the stat->obj reference
+ attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+ statsobj = statscls()
+ statsobj.set(**attrs)
+ statsobj.syncUpdate()
- # XXX not sure if this should happen here. makes more sense in
- # prop update
- if self.timestamps[2] != 0:
- obj.deletionTime = datetime.fromtimestamp(self.timestamps[2]/1000000000)
+ obj.statsPrev = obj.statsCurr
+ obj.statsCurr = statsobj
+ obj.syncUpdate()
- obj.statsPrev = obj.statsCurr
- obj.statsCurr = statsobj
- obj.syncUpdate()
+ except:
+ print_exc()
-class MethodUpdate(object):
- def __init__(self, conn, methodId, errorCode, errorText, args):
- self.conn = conn
- self.methodId = methodId
- self.errorCode = errorCode
- self.errorText = errorText
- self.args = args
- def process(self, model):
- logArgs = ("method", self.conn.id, self.methodId, self.errorCode,
- self.errorText)
- log.info("Processing %-8s %-16s %-12s %-12s %s" % logArgs)
+class MethodUpdate(ModelUpdate):
+ def __init__(self, broker, seq, response):
+ ModelUpdate.__init__(self, broker, response)
+ self.seq = seq
- model.lock()
- try:
- method = model.outstandingMethodCalls.pop(self.methodId)
- method(self.errorText, self.args)
- finally:
- model.unlock()
-
-class CloseUpdate(object):
- def __init__(self, conn, data):
- self.conn = conn
- self.data = data
-
def process(self, model):
- log.info("Processing %-8s %-16s" % ("close", self.conn.id))
-
model.lock()
try:
- del model.connections[self.conn.id]
-
- if model.connCloseListener:
- model.connCloseListener(self.conn, self.data)
+ methodCallback = model.outstandingMethodCalls.pop(self.seq)
+ methodCallback(self.qmfObj.text, self.qmfObj.outArgs)
finally:
model.unlock()
-
-class ControlUpdate(object):
- __types = {
- managementClient.CTRL_BROKER_INFO: "broker_info",
- managementClient.CTRL_SCHEMA_LOADED: "schema_loaded",
- managementClient.CTRL_USER: "user",
- managementClient.CTRL_HEARTBEAT: "heartbeat"
- }
-
- def __init__(self, conn, typeCode, data):
- self.conn = conn
- self.typeCode = typeCode
- self.data = data
-
- def process(self, model):
- type = self.__types.get(self.typeCode, "[unknown]")
- args = ("control", self.conn.id, type, self.data)
- log.info("Processing %-8s %-16s %-16s %s" % args)
-
- if self.typeCode == managementClient.CTRL_BROKER_INFO:
- uuid = "%08x-%04x-%04x-%04x-%04x%08x" % unpack \
- ("!LHHHHL", self.data.brokerId)
-
- log.info("Broker ID from %s is '%s'" % (self.conn.id, uuid))
- log.info("Session ID from %s is '%s'" % \
- (self.conn.id, self.data.sessionId))
-
- self.conn.brokerId = uuid
- self.conn.sessionId = self.data.sessionId
-
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/sql/schema.sql 2008-11-14 20:12:00 UTC (rev 2805)
@@ -23,13 +23,12 @@
CREATE TABLE broker_registration (
id SERIAL PRIMARY KEY,
name VARCHAR(1000) NOT NULL UNIQUE,
- host VARCHAR(1000) NOT NULL,
- port INT NOT NULL,
+ url VARCHAR(1000),
broker_id INT,
cluster_id INT,
profile_id INT
);
-CREATE UNIQUE INDEX broker_registration_host_port_unique ON broker_registration (host,
port);
+CREATE UNIQUE INDEX broker_registration_url_unique ON broker_registration (url);
CREATE TABLE collector_registration (
id SERIAL PRIMARY KEY,
@@ -37,13 +36,6 @@
collector_id VARCHAR(1000)
);
-CREATE TABLE config_property (
- id SERIAL PRIMARY KEY,
- name VARCHAR(1000),
- value VARCHAR(1000),
- type VARCHAR(1)
-);
-
CREATE TABLE mint_info (
id SERIAL PRIMARY KEY,
version VARCHAR(1000) NOT NULL
@@ -75,6 +67,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -99,6 +92,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -123,6 +117,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -147,6 +142,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -176,6 +172,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -204,6 +201,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -233,6 +231,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -258,6 +257,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -282,6 +282,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -317,6 +318,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -363,6 +365,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -420,6 +423,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -445,6 +449,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -479,6 +484,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -516,6 +522,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -567,6 +574,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -608,6 +616,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -638,6 +647,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -747,6 +757,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -785,6 +796,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -811,6 +823,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -835,6 +848,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
Modified: mgmt/trunk/parsley/python/parsley/command.py
===================================================================
--- mgmt/trunk/parsley/python/parsley/command.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/parsley/python/parsley/command.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -12,6 +12,9 @@
self.commands = list()
self.commands_by_name = dict()
+ opt = CommandOption(self, "help", "h")
+ opt.description = "Print this message"
+
if self.parent:
self.parent.commands.append(self)
self.parent.commands_by_name[self.name] = self