Author: justi9
Date: 2007-11-13 14:45:55 -0500 (Tue, 13 Nov 2007)
New Revision: 1303
Added:
mgmt/mint/python/mint/schema.sql
Removed:
mgmt/mint/python/mint/updater.py
Modified:
mgmt/mint/bin/mint-test
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/schema.py
Log:
Updates mint-test to use Nuno's new model stuff. Moves model.py's
Model to MintModel in mint/__init__.py.
Modified: mgmt/mint/bin/mint-test
===================================================================
--- mgmt/mint/bin/mint-test 2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/bin/mint-test 2007-11-13 19:45:55 UTC (rev 1303)
@@ -17,18 +17,12 @@
usage()
from mint import *
-from mint.updater import *
from qpid.management import ManagedBroker
def do_main(dburi, brokerhost, brokerport):
- model = MintModel(dburi)
- model.init()
-
- broker = ManagedBroker(host=brokerhost, port=brokerport)
+ model = MintModel()
+ model.addManagedBroker(brokerhost, brokerport)
- updater = MintUpdater(model, broker)
- updater.start()
-
def main():
if len(sys.argv) != 3:
usage()
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/python/mint/__init__.py 2007-11-13 19:45:55 UTC (rev 1303)
@@ -1,3 +1,5 @@
+from qpid.management import ManagedBroker
+from datetime import *
from sqlobject import *
from schema import *
@@ -2,7 +4,179 @@
-class MintModel(object):
- def __init__(self, dburi):
- self.dburi = dburi
+class MintModel:
+ currentMethodId = None
+ outstandingMethodCalls = None
+ managedBrokers = None
+
+ def __init__(self):
+ self.currentMethodId = 1
+ self.outstandingMethodCalls = dict()
+ self.managedBrokers = dict()
+
+ def getQueueByOriginalId(self, id, create=False):
+ queue = None
+ try:
+ queue = MgmtQueue.selectBy(idOriginal=id)[:1][0]
+ except IndexError:
+ if (create): queue = MgmtQueue(idOriginal=id)
+ return queue
- def init(self):
- pass
+ def getQueueByName(self, name, vhost, create=False):
+ queue = None
+ try:
+ queue = MgmtQueue.selectBy(name=name, mgmtVhost=vhost)[:1][0]
+ except IndexError:
+ if (create): queue = MgmtQueue(name=name, mgmtVhost=vhost)
+ return queue
+
+ def getVhostByName(self, name, broker, create=False):
+ vhost = None
+ try:
+ vhost = MgmtVhost.selectBy(name=name, mgmtBroker=broker)[:1][0]
+ except IndexError:
+ if (create): vhost = MgmtVhost(name=name, mgmtBroker=broker)
+ return vhost
+
+ def getVhostByOriginalId(self, id, create=False):
+ vhost = None
+ try:
+ vhost = MgmtVhost.selectBy(idOriginal=id)[:1][0]
+ except IndexError:
+ if (create): vhost = MgmtVhost(idOriginal=id)
+ return vhost
+
+ def getBrokerByPort(self, port, system, create=False):
+ broker = None
+ try:
+ broker = MgmtBroker.selectBy(port=port, mgmtSystem=system)[:1][0]
+ except IndexError:
+ if (create): broker = MgmtBroker(port=port, mgmtSystem=system)
+ return broker
+
+ def getBrokerByOriginalId(self, id, create=False):
+ broker = None
+ try:
+ broker = MgmtBroker.selectBy(idOriginal=id)[:1][0]
+ except IndexError:
+ if (create): broker = MgmtBroker(idOriginal=id)
+ return broker
+
+ def getSystemByOriginalId(self, id, create=False):
+ system = None
+ try:
+ system = MgmtSystem.selectBy(idOriginal=id)[:1][0]
+ except IndexError:
+ if (create): system = MgmtSystem(idOriginal=id)
+ return system
+
+ def sanitizeDict(self, d):
+ for k in d.iterkeys():
+ if (k.endswith("Id")):
+ d[self.convertKey(k)] = d.pop(k)
+ elif (k == "id"):
+ d[self.convertKey(k)] = d.pop(k)
+ for k in d.iterkeys():
+ if (k.endswith("Ref")):
+ d[self.convertKey(k)] = d.pop(k)
+ return d
+
+ def convertKey(self, k):
+ if (k == "id"):
+ return k + "Original"
+ if (k.endswith("Id")):
+ return k + "ent"
+ elif (k.endswith("Ref")):
+ oldK = k
+ k = k[0].upper() + k[1:]
+ return "mgmt" + k.replace("Ref", "ID")
+
+ def configCallback(self, broker, objectName, list, timestamps):
+ print "\nCONFIG---------------------------------------------------"
+ print "broker=" + broker
+ print objectName
+ print list
+ result = None
+ d = self.sanitizeDict(dict(list))
+ d["managedBroker"] = self.managedBrokers[broker]
+ print d
+
+ d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
+ d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
+ if (objectName == "queue"):
+ print "* QUEUE"
+ queue = self.getQueueByName(d["name"],
self.getVhostByOriginalId(d.pop(self.convertKey("vhostRef"))), True)
+ queue.set(**d)
+ print "queue id = %d" % (queue.id)
+ result = queue
+ elif (objectName == "vhost"):
+ print "* VHOST"
+ vhost = self.getVhostByName(d["name"],
self.getBrokerByOriginalId(d.pop(self.convertKey("brokerRef"))), True)
+ vhost.set(**d)
+ print "vhost id = %d" % (vhost.id)
+ result = vhost
+ elif (objectName == "broker"):
+ print "* BROKER"
+ d.pop(self.convertKey("systemRef"))
+ broker = self.getBrokerByPort(d["port"],
self.getSystemByOriginalId("0"), True)
+ broker.set(**d)
+ broker.sync()
+ print "broker id = %d" % (broker.id)
+ result = broker
+ print "END CONFIG---------------------------------------------------\n"
+ return result
+
+ def instCallback(self, broker, objectName, list, timestamps):
+ print "\nINST---------------------------------------------------"
+ print "broker=" + broker
+ print objectName
+ print list
+ result = None
+ d = self.sanitizeDict(dict(list))
+ if (objectName == "queue"):
+ print "* QUEUE"
+ queue = self.getQueueByOriginalId(d[self.convertKey("id")])
+ d["mgmtQueue"] = queue.id
+ d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
+ queueStats = MgmtQueueStats()
+ queueStats.set(**d)
+ d = dict()
+ if (timestamps[2] != 0):
+ d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
+ d["mgmtQueueStats"] = queueStats
+ queue.set(**d)
+ print queue.id
+ result = queueStats
+ elif (objectName == "vhost"):
+ print "* VHOST"
+ elif (objectName == "broker"):
+ print "* BROKER"
+ print "END INST---------------------------------------------------\n"
+ return result
+
+ def methodCallback(self, broker, methodId, errorNo, errorText, args):
+ print "\nMETHOD---------------------------------------------------"
+ print "broker=" + broker
+ print "MethodId=%d" % (methodId)
+ print "Error: %d %s" % (errorNo, errorText)
+ print args
+ method = self.outstandingMethodCalls.pop(methodId)
+ method(errorText, args)
+ print "END METHOD---------------------------------------------------\n"
+
+ def addManagedBroker(self, host, port):
+ broker = ManagedBroker(host=host, port=port)
+ label = "%s:%d" % (host, port)
+ self.managedBrokers[label] = broker
+ broker.configListener(label, self.configCallback)
+ broker.instrumentationListener (label, self.instCallback)
+ broker.methodListener (label, self.methodCallback)
+ broker.start()
+ return label
+
+ def registerCallback(self, callback):
+ self.currentMethodId += 1
+ methodId = self.currentMethodId
+ self.outstandingMethodCalls[methodId] = callback
+ return methodId
+
+ def allSystems(self):
+ return MgmtSystem.select()
Modified: mgmt/mint/python/mint/schema.py
===================================================================
--- mgmt/mint/python/mint/schema.py 2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/python/mint/schema.py 2007-11-13 19:45:55 UTC (rev 1303)
@@ -1,127 +1,241 @@
from sqlobject import *
-class MgmtServer(SQLObject):
+class MgmtSystemStats(SQLObject):
class sqlmeta:
fromDatabase = True
- def joinCluster():
- pass
- def leaveCluster():
- pass
+class MgmtSystem(SQLObject):
+ schemaId = 1
+ schemaName = "system"
+ managedBroker = None
+ class sqlmeta:
+ fromDatabase = True
+ _SO_class_Mgmt_system_stats = None
+class MgmtBrokerStats(SQLObject):
+ class sqlmeta:
+ fromDatabase = True
-class MgmtServerStats(SQLObject):
+class MgmtBroker(SQLObject):
+ schemaId = 2
+ schemaName = "broker"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
+ def joinCluster(self, model, callbackMethod, clusterName):
+ actualArgs = dict()
+ actualArgs["clusterName"] = clusterName
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"joinCluster", args=actualArgs, packageName="qpid")
+ def leaveCluster(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"leaveCluster", args=actualArgs, packageName="qpid")
+ def echo(self, model, callbackMethod, sequence, body):
+ actualArgs = dict()
+ actualArgs["sequence"] = sequence
+ actualArgs["body"] = body
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"echo", args=actualArgs, packageName="qpid")
+ def crash(self, model, callbackMethod):
+ """Temporary test method to crash the broker"""
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"crash", args=actualArgs, packageName="qpid")
+ _SO_class_Mgmt_broker_stats = None
+ _SO_class_Mgmt_system = None
+class MgmtVhostStats(SQLObject):
+ class sqlmeta:
+ fromDatabase = True
+
+MgmtSystem.sqlmeta.addJoin(MultipleJoin('MgmtBroker',
joinMethodName='allBrokers'))
+
class MgmtVhost(SQLObject):
+ schemaId = 3
+ schemaName = "vhost"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
+ _SO_class_Mgmt_vhost_stats = None
+ _SO_class_Mgmt_broker = None
-class MgmtVhostStats(SQLObject):
+class MgmtQueueStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtBroker.sqlmeta.addJoin(MultipleJoin('MgmtVhost',
joinMethodName='allVhosts'))
+
class MgmtQueue(SQLObject):
+ schemaId = 4
+ schemaName = "queue"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
- def purge():
+ def purge(self, model, callbackMethod):
"""Discard all messages on queue"""
- pass
-
- def increaseDiskSize():
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"purge", args=actualArgs, packageName="qpid")
+ def increaseDiskSize(self, model, callbackMethod, pages):
"""Increase number of disk pages allocated for this
queue"""
- pass
+ actualArgs = dict()
+ actualArgs["pages"] = pages
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"increaseDiskSize", args=actualArgs, packageName="qpid")
+ _SO_class_Mgmt_queue_stats = None
+ _SO_class_Mgmt_vhost = None
-
-class MgmtQueueStats(SQLObject):
+class MgmtExchangeStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtQueue',
joinMethodName='allQueues'))
+
class MgmtExchange(SQLObject):
+ schemaId = 5
+ schemaName = "exchange"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
+ _SO_class_Mgmt_exchange_stats = None
+ _SO_class_Mgmt_vhost = None
-class MgmtExchangeStats(SQLObject):
+class MgmtBindingStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtExchange',
joinMethodName='allExchanges'))
+
class MgmtBinding(SQLObject):
+ schemaId = 6
+ schemaName = "binding"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
+ _SO_class_Mgmt_binding_stats = None
+ _SO_class_Mgmt_queue = None
+ _SO_class_Mgmt_exchange = None
-class MgmtBindingStats(SQLObject):
+class MgmtClientStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtQueue.sqlmeta.addJoin(MultipleJoin('MgmtBinding',
joinMethodName='allBindings'))
+MgmtExchange.sqlmeta.addJoin(MultipleJoin('MgmtBinding',
joinMethodName='allBindings'))
+
class MgmtClient(SQLObject):
+ schemaId = 7
+ schemaName = "client"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
- def close():
- pass
+ def close(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"close", args=actualArgs, packageName="qpid")
+ def detach(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"detach", args=actualArgs, packageName="qpid")
+ _SO_class_Mgmt_client_stats = None
+ _SO_class_Mgmt_vhost = None
- def detach():
- pass
-
-
-class MgmtClientStats(SQLObject):
+class MgmtSessionStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtClient',
joinMethodName='allClients'))
+
class MgmtSession(SQLObject):
+ schemaId = 8
+ schemaName = "session"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
- def solicitAck():
- pass
+ def solicitAck(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"solicitAck", args=actualArgs, packageName="qpid")
+ def detach(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"detach", args=actualArgs, packageName="qpid")
+ def resetLifespan(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"resetLifespan", args=actualArgs, packageName="qpid")
+ def close(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"close", args=actualArgs, packageName="qpid")
+ _SO_class_Mgmt_session_stats = None
+ _SO_class_Mgmt_vhost = None
+ _SO_class_Mgmt_client = None
- def detach():
- pass
-
- def resetLifespan():
- pass
-
- def close():
- pass
-
-
-class MgmtSessionStats(SQLObject):
+class MgmtDestinationStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtSession',
joinMethodName='allSessions'))
+MgmtClient.sqlmeta.addJoin(MultipleJoin('MgmtSession',
joinMethodName='allSessions'))
+
class MgmtDestination(SQLObject):
+ schemaId = 9
+ schemaName = "destination"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
- def throttle():
+ def throttle(self, model, callbackMethod, strength):
"""Apply extra rate limiting to destination: 0 = Normal, 10 =
Maximum"""
- pass
+ actualArgs = dict()
+ actualArgs["strength"] = strength
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"throttle", args=actualArgs, packageName="qpid")
+ def stop(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"stop", args=actualArgs, packageName="qpid")
+ def start(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"start", args=actualArgs, packageName="qpid")
+ _SO_class_Mgmt_destination_stats = None
+ _SO_class_Mgmt_session = None
- def stop():
- pass
-
- def start():
- pass
-
-
-class MgmtDestinationStats(SQLObject):
+class MgmtProducerStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtSession.sqlmeta.addJoin(MultipleJoin('MgmtDestination',
joinMethodName='allDestinations'))
+
class MgmtProducer(SQLObject):
+ schemaId = 10
+ schemaName = "producer"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
+ _SO_class_Mgmt_producer_stats = None
+ _SO_class_Mgmt_destination = None
+ _SO_class_Mgmt_exchange = None
-class MgmtProducerStats(SQLObject):
+class MgmtConsumerStats(SQLObject):
class sqlmeta:
fromDatabase = True
+MgmtDestination.sqlmeta.addJoin(MultipleJoin('MgmtProducer',
joinMethodName='allProducers'))
+MgmtExchange.sqlmeta.addJoin(MultipleJoin('MgmtProducer',
joinMethodName='allProducers'))
+
class MgmtConsumer(SQLObject):
+ schemaId = 11
+ schemaName = "consumer"
+ managedBroker = None
class sqlmeta:
fromDatabase = True
- def close():
- pass
-
-
-class MgmtConsumerStats(SQLObject):
- class sqlmeta:
- fromDatabase = True
-
+ def close(self, model, callbackMethod):
+ actualArgs = dict()
+ methodId = model.registerCallback(callbackMethod)
+ self.managedBroker.method(methodId, self.idOriginal, self.schemaName,
"close", args=actualArgs, packageName="qpid")
+ _SO_class_Mgmt_consumer_stats = None
+ _SO_class_Mgmt_destination = None
+ _SO_class_Mgmt_queue = None
Added: mgmt/mint/python/mint/schema.sql
===================================================================
--- mgmt/mint/python/mint/schema.sql (rev 0)
+++ mgmt/mint/python/mint/schema.sql 2007-11-13 19:45:55 UTC (rev 1303)
@@ -0,0 +1,332 @@
+
+CREATE TABLE mgmt_system (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_system_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ sys_ident VARCHAR(1000)
+);
+
+CREATE INDEX mgmt_system_sys_ident_index ON mgmt_system(sys_ident);
+
+CREATE TABLE mgmt_system_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_system_id BIGINT REFERENCES mgmt_system ,
+ rec_time TIMESTAMP
+);
+
+ALTER TABLE mgmt_system ADD FOREIGN KEY (mgmt_system_stats_id) REFERENCES
mgmt_system_stats;
+
+CREATE TABLE mgmt_broker (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_broker_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_system_id BIGINT REFERENCES mgmt_system,
+ port INT2 ,
+ worker_threads INT2 ,
+ max_conns INT2 ,
+ conn_backlog INT2 ,
+ staging_threshold INT4 ,
+ store_lib VARCHAR(1000) ,
+ async_store BOOLEAN ,
+ mgmt_pub_interval INT2 ,
+ initial_disk_page_size INT4 ,
+ initial_pages_per_queue INT4 ,
+ cluster_name VARCHAR(1000) ,
+ version VARCHAR(1000)
+);
+
+CREATE INDEX mgmt_broker_port_index ON mgmt_broker(port);
+
+CREATE TABLE mgmt_broker_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_broker_id BIGINT REFERENCES mgmt_broker ,
+ rec_time TIMESTAMP
+);
+
+ALTER TABLE mgmt_broker ADD FOREIGN KEY (mgmt_broker_stats_id) REFERENCES
mgmt_broker_stats;
+
+CREATE TABLE mgmt_vhost (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_vhost_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_broker_id BIGINT REFERENCES mgmt_broker,
+ name VARCHAR(1000)
+);
+
+CREATE INDEX mgmt_vhost_name_index ON mgmt_vhost(name);
+
+CREATE TABLE mgmt_vhost_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_vhost_id BIGINT REFERENCES mgmt_vhost ,
+ rec_time TIMESTAMP
+);
+
+ALTER TABLE mgmt_vhost ADD FOREIGN KEY (mgmt_vhost_stats_id) REFERENCES
mgmt_vhost_stats;
+
+CREATE TABLE mgmt_queue (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_queue_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+ name VARCHAR(1000) ,
+ durable BOOLEAN ,
+ auto_delete BOOLEAN ,
+ exclusive BOOLEAN ,
+ page_memory_limit INT4
+);
+
+CREATE INDEX mgmt_queue_name_index ON mgmt_queue(name);
+
+CREATE TABLE mgmt_queue_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_queue_id BIGINT REFERENCES mgmt_queue ,
+ rec_time TIMESTAMP,
+ disk_page_size INT4 ,
+ disk_pages INT4 ,
+ disk_available_size INT4 ,
+ msg_total_enqueues INT8 ,
+ msg_total_dequeues INT8 ,
+ msg_txn_enqueues INT8 ,
+ msg_txn_dequeues INT8 ,
+ msg_persist_enqueues INT8 ,
+ msg_persist_dequeues INT8 ,
+ msg_depth INT4 ,
+ msg_depth_high INT4 ,
+ msg_depth_low INT4 ,
+ byte_total_enqueues INT8 ,
+ byte_total_dequeues INT8 ,
+ byte_txn_enqueues INT8 ,
+ byte_txn_dequeues INT8 ,
+ byte_persist_enqueues INT8 ,
+ byte_persist_dequeues INT8 ,
+ byte_depth INT4 ,
+ byte_depth_high INT4 ,
+ byte_depth_low INT4 ,
+ enqueue_txn_starts INT8 ,
+ enqueue_txn_commits INT8 ,
+ enqueue_txn_rejects INT8 ,
+ enqueue_txn_count INT4 ,
+ enqueue_txn_count_high INT4 ,
+ enqueue_txn_count_low INT4 ,
+ dequeue_txn_starts INT8 ,
+ dequeue_txn_commits INT8 ,
+ dequeue_txn_rejects INT8 ,
+ dequeue_txn_count INT4 ,
+ dequeue_txn_count_high INT4 ,
+ dequeue_txn_count_low INT4 ,
+ consumers INT4 ,
+ consumers_high INT4 ,
+ consumers_low INT4 ,
+ bindings INT4 ,
+ bindings_high INT4 ,
+ bindings_low INT4 ,
+ unacked_messages INT4 ,
+ unacked_messages_high INT4 ,
+ unacked_messages_low INT4
+);
+
+ALTER TABLE mgmt_queue ADD FOREIGN KEY (mgmt_queue_stats_id) REFERENCES
mgmt_queue_stats;
+
+CREATE TABLE mgmt_exchange (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_exchange_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+ name VARCHAR(1000) ,
+ type VARCHAR(1000)
+);
+
+CREATE INDEX mgmt_exchange_name_index ON mgmt_exchange(name);
+
+CREATE TABLE mgmt_exchange_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_exchange_id BIGINT REFERENCES mgmt_exchange ,
+ rec_time TIMESTAMP,
+ producers INT4 ,
+ producers_high INT4 ,
+ producers_low INT4 ,
+ bindings INT4 ,
+ bindings_high INT4 ,
+ bindings_low INT4 ,
+ msg_receives INT8 ,
+ msg_drops INT8 ,
+ msg_routes INT8 ,
+ byte_receives INT8 ,
+ byte_drops INT8 ,
+ byte_routes INT8
+);
+
+ALTER TABLE mgmt_exchange ADD FOREIGN KEY (mgmt_exchange_stats_id) REFERENCES
mgmt_exchange_stats;
+
+CREATE TABLE mgmt_binding (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_binding_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_queue_id BIGINT REFERENCES mgmt_queue,
+ mgmt_exchange_id BIGINT REFERENCES mgmt_exchange,
+ binding_key VARCHAR(1000)
+);
+
+CREATE TABLE mgmt_binding_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_binding_id BIGINT REFERENCES mgmt_binding ,
+ rec_time TIMESTAMP,
+ msg_matched INT8
+);
+
+ALTER TABLE mgmt_binding ADD FOREIGN KEY (mgmt_binding_stats_id) REFERENCES
mgmt_binding_stats;
+
+CREATE TABLE mgmt_client (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_client_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+ ip_addr INT4 ,
+ port INT2
+);
+
+CREATE INDEX mgmt_client_ip_addr_index ON mgmt_client(ip_addr);
+
+CREATE INDEX mgmt_client_port_index ON mgmt_client(port);
+
+CREATE TABLE mgmt_client_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_client_id BIGINT REFERENCES mgmt_client ,
+ rec_time TIMESTAMP,
+ auth_identity VARCHAR(1000) ,
+ msgs_produced INT8 ,
+ msgs_consumed INT8 ,
+ bytes_produced INT8 ,
+ bytes_consumed INT8
+);
+
+ALTER TABLE mgmt_client ADD FOREIGN KEY (mgmt_client_stats_id) REFERENCES
mgmt_client_stats;
+
+CREATE TABLE mgmt_session (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_session_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+ name VARCHAR(1000) ,
+ mgmt_client_id BIGINT REFERENCES mgmt_client,
+ detached_lifespan INT4
+);
+
+CREATE INDEX mgmt_session_name_index ON mgmt_session(name);
+
+CREATE TABLE mgmt_session_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_session_id BIGINT REFERENCES mgmt_session ,
+ rec_time TIMESTAMP,
+ attached BOOLEAN ,
+ remaining_lifespan INT4 ,
+ frames_outstanding INT4
+);
+
+ALTER TABLE mgmt_session ADD FOREIGN KEY (mgmt_session_stats_id) REFERENCES
mgmt_session_stats;
+
+CREATE TABLE mgmt_destination (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_destination_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_session_id BIGINT REFERENCES mgmt_session,
+ name VARCHAR(1000)
+);
+
+CREATE INDEX mgmt_destination_name_index ON mgmt_destination(name);
+
+CREATE TABLE mgmt_destination_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_destination_id BIGINT REFERENCES mgmt_destination ,
+ rec_time TIMESTAMP,
+ flow_mode INT2 ,
+ max_msg_credits INT4 ,
+ max_byte_credits INT4 ,
+ msg_credits INT4 ,
+ byte_credits INT4
+);
+
+ALTER TABLE mgmt_destination ADD FOREIGN KEY (mgmt_destination_stats_id) REFERENCES
mgmt_destination_stats;
+
+CREATE TABLE mgmt_producer (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_producer_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_destination_id BIGINT REFERENCES mgmt_destination,
+ mgmt_exchange_id BIGINT REFERENCES mgmt_exchange
+);
+
+CREATE TABLE mgmt_producer_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_producer_id BIGINT REFERENCES mgmt_producer ,
+ rec_time TIMESTAMP,
+ msgs_produced INT8 ,
+ bytes_produced INT8
+);
+
+ALTER TABLE mgmt_producer ADD FOREIGN KEY (mgmt_producer_stats_id) REFERENCES
mgmt_producer_stats;
+
+CREATE TABLE mgmt_consumer (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_consumer_stats_id BIGINT,
+ rec_time TIMESTAMP,
+ creation_time TIMESTAMP,
+ deletion_time TIMESTAMP,
+ mgmt_destination_id BIGINT REFERENCES mgmt_destination,
+ mgmt_queue_id BIGINT REFERENCES mgmt_queue
+);
+
+CREATE TABLE mgmt_consumer_stats (
+ id BIGSERIAL PRIMARY KEY,
+ id_original BIGINT,
+ mgmt_consumer_id BIGINT REFERENCES mgmt_consumer ,
+ rec_time TIMESTAMP,
+ msgs_consumed INT8 ,
+ bytes_consumed INT8 ,
+ unacked_messages INT4 ,
+ unacked_messages_high INT4 ,
+ unacked_messages_low INT4
+);
+
+ALTER TABLE mgmt_consumer ADD FOREIGN KEY (mgmt_consumer_stats_id) REFERENCES
mgmt_consumer_stats;
Deleted: mgmt/mint/python/mint/updater.py
===================================================================
--- mgmt/mint/python/mint/updater.py 2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/python/mint/updater.py 2007-11-13 19:45:55 UTC (rev 1303)
@@ -1,57 +0,0 @@
-from time import sleep
-from datetime import datetime
-
-from schema import *
-
-class MintUpdater(object):
- def __init__(self, model, broker):
- self.model = model
- self.broker = broker
-
- self.broker.configListener("XXXcontext", configCallback)
- self.broker.instrumentationListener("XXXcontext", instCallback)
-
- def start(self):
- self.broker.start()
-
- while True:
- sleep(1)
-
-def getQueueByName(name, create=False):
- try:
- queues = MgmtQueue.selectBy(name=name)[:1]
- queue = queues[0]
- except IndexError:
- if (create): queue = MgmtQueue()
- return queue
-
-def configCallback(broker, oid, list, timestamps):
- print "broker=" + broker
- if oid == 4:
- print list
- d = dict(list)
- queue = getQueueByName(d["name"], True)
- queue.set(**d)
- recOn = datetime.fromtimestamp(timestamps[0]/1000000000)
- createdOn = datetime.fromtimestamp(timestamps[1]/1000000000)
- queue.set(recTime=recOn,creationTime=createdOn)
- print queue.id
- print " -> " + d["name"]
- return queue
-
-def instCallback(broker, oid, list, timestamps):
- print "broker=" + broker
- if oid == 4:
- print list
- d = dict(list)
- queue = getQueueByName(d.pop("name"))
- d["mgmtQueue"] = queue.id
- recOn = datetime.fromtimestamp(timestamps[0]/1000000000)
- queueStats = MgmtQueueStats()
- queueStats.set(recTime=recOn)
- queueStats.set(**d)
- if (timestamps[2] != 0):
- deletedOn = datetime.fromtimestamp(timestamps[2]/1000000000)
- queue.set(deletionTime=deletedOn)
- print queue.id
- return queueStats