[rhmessaging-commits] rhmessaging commits: r1303 - in mgmt/mint: python/mint and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Nov 13 14:45:55 EST 2007


Author: justi9
Date: 2007-11-13 14:45:55 -0500 (Tue, 13 Nov 2007)
New Revision: 1303

Added:
   mgmt/mint/python/mint/schema.sql
Removed:
   mgmt/mint/python/mint/updater.py
Modified:
   mgmt/mint/bin/mint-test
   mgmt/mint/python/mint/__init__.py
   mgmt/mint/python/mint/schema.py
Log:
Updates mint-test to use Nuno's new model stuff.  Moves model.py's
Model to MintModel in mint/__init__.py.



Modified: mgmt/mint/bin/mint-test
===================================================================
--- mgmt/mint/bin/mint-test	2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/bin/mint-test	2007-11-13 19:45:55 UTC (rev 1303)
@@ -17,18 +17,12 @@
     usage()
 
 from mint import *
-from mint.updater import *
 from qpid.management import ManagedBroker
 
 def do_main(dburi, brokerhost, brokerport):
-    model = MintModel(dburi)
-    model.init()
-    
-    broker = ManagedBroker(host=brokerhost, port=brokerport)
+    model = MintModel()
+    model.addManagedBroker(brokerhost, brokerport)
 
-    updater = MintUpdater(model, broker)
-    updater.start()
-
 def main():
     if len(sys.argv) != 3:
         usage()

Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py	2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/python/mint/__init__.py	2007-11-13 19:45:55 UTC (rev 1303)
@@ -1,3 +1,5 @@
+from qpid.management import ManagedBroker
+from datetime import *
 from sqlobject import *
 
 from schema import *
@@ -2,7 +4,179 @@
 
-class MintModel(object):
-    def __init__(self, dburi):
-        self.dburi = dburi
+class MintModel:
+  currentMethodId = None
+  outstandingMethodCalls = None
+  managedBrokers = None
+  
+  def __init__(self):
+    self.currentMethodId = 1
+    self.outstandingMethodCalls = dict()
+    self.managedBrokers = dict()
+    
+  def getQueueByOriginalId(self, id, create=False):
+    queue = None
+    try:
+      queue = MgmtQueue.selectBy(idOriginal=id)[:1][0]
+    except IndexError:
+      if (create): queue = MgmtQueue(idOriginal=id)
+    return queue
 
-    def init(self):
-        pass
+  def getQueueByName(self, name, vhost, create=False):
+    queue = None
+    try:
+      queue = MgmtQueue.selectBy(name=name, mgmtVhost=vhost)[:1][0]
+    except IndexError:
+      if (create): queue = MgmtQueue(name=name, mgmtVhost=vhost)
+    return queue
+
+  def getVhostByName(self, name, broker, create=False):
+    vhost = None
+    try:
+      vhost = MgmtVhost.selectBy(name=name, mgmtBroker=broker)[:1][0]
+    except IndexError:
+      if (create): vhost = MgmtVhost(name=name, mgmtBroker=broker)
+    return vhost
+
+  def getVhostByOriginalId(self, id, create=False):
+    vhost = None
+    try:
+      vhost = MgmtVhost.selectBy(idOriginal=id)[:1][0]
+    except IndexError:
+      if (create): vhost = MgmtVhost(idOriginal=id)
+    return vhost
+
+  def getBrokerByPort(self, port, system, create=False):
+    broker = None
+    try:
+      broker = MgmtBroker.selectBy(port=port, mgmtSystem=system)[:1][0]
+    except IndexError:
+      if (create): broker = MgmtBroker(port=port, mgmtSystem=system)
+    return broker
+
+  def getBrokerByOriginalId(self, id, create=False):
+    broker = None
+    try:
+      broker = MgmtBroker.selectBy(idOriginal=id)[:1][0]
+    except IndexError:
+      if (create): broker = MgmtBroker(idOriginal=id)
+    return broker
+
+  def getSystemByOriginalId(self, id, create=False):
+    system = None
+    try:
+      system = MgmtSystem.selectBy(idOriginal=id)[:1][0]
+    except IndexError:
+      if (create): system = MgmtSystem(idOriginal=id)
+    return system
+
+  def sanitizeDict(self, d):
+    for k in d.iterkeys():
+      if (k.endswith("Id")):
+        d[self.convertKey(k)] = d.pop(k)
+      elif (k == "id"):
+        d[self.convertKey(k)] = d.pop(k)
+    for k in d.iterkeys():
+      if (k.endswith("Ref")):
+        d[self.convertKey(k)] = d.pop(k)
+    return d
+
+  def convertKey(self, k):
+    if (k == "id"):
+      return k + "Original"
+    if (k.endswith("Id")):
+      return k + "ent"
+    elif (k.endswith("Ref")):
+      oldK = k
+      k = k[0].upper() + k[1:]
+      return "mgmt" + k.replace("Ref", "ID")
+
+  def configCallback(self, broker, objectName, list, timestamps):
+    print "\nCONFIG---------------------------------------------------"
+    print "broker=" + broker
+    print objectName
+    print list
+    result = None
+    d = self.sanitizeDict(dict(list))
+    d["managedBroker"] = self.managedBrokers[broker]
+    print d
+
+    d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
+    d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
+    if (objectName == "queue"):
+      print "* QUEUE"
+      queue = self.getQueueByName(d["name"], self.getVhostByOriginalId(d.pop(self.convertKey("vhostRef"))), True)
+      queue.set(**d)
+      print "queue id = %d" % (queue.id)
+      result = queue
+    elif (objectName == "vhost"):
+      print "* VHOST"
+      vhost = self.getVhostByName(d["name"], self.getBrokerByOriginalId(d.pop(self.convertKey("brokerRef"))), True)
+      vhost.set(**d)
+      print "vhost id = %d" % (vhost.id)
+      result = vhost
+    elif (objectName == "broker"):
+      print "* BROKER"
+      d.pop(self.convertKey("systemRef"))
+      broker = self.getBrokerByPort(d["port"], self.getSystemByOriginalId("0"), True)
+      broker.set(**d)
+      broker.sync()
+      print "broker id = %d" % (broker.id)
+      result = broker
+      print "END CONFIG---------------------------------------------------\n"
+    return result
+
+  def instCallback(self, broker, objectName, list, timestamps):
+    print "\nINST---------------------------------------------------"
+    print "broker=" + broker
+    print objectName
+    print list
+    result = None
+    d = self.sanitizeDict(dict(list))
+    if (objectName == "queue"):
+      print "* QUEUE"
+      queue = self.getQueueByOriginalId(d[self.convertKey("id")])
+      d["mgmtQueue"] = queue.id
+      d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
+      queueStats = MgmtQueueStats()
+      queueStats.set(**d)
+      d = dict()
+      if (timestamps[2] != 0):
+        d["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
+      d["mgmtQueueStats"] = queueStats
+      queue.set(**d)
+      print queue.id
+      result = queueStats
+    elif (objectName == "vhost"):
+      print "* VHOST"
+    elif (objectName == "broker"):
+      print "* BROKER"
+      print "END INST---------------------------------------------------\n"
+    return result
+
+  def methodCallback(self, broker, methodId, errorNo, errorText, args):
+    print "\nMETHOD---------------------------------------------------"
+    print "broker=" + broker
+    print "MethodId=%d" % (methodId)
+    print "Error: %d %s" % (errorNo, errorText)
+    print args
+    method = self.outstandingMethodCalls.pop(methodId)
+    method(errorText, args)
+    print "END METHOD---------------------------------------------------\n"
+
+  def addManagedBroker(self, host, port):
+    broker = ManagedBroker(host=host, port=port)
+    label = "%s:%d" % (host, port)
+    self.managedBrokers[label] = broker
+    broker.configListener(label, self.configCallback)
+    broker.instrumentationListener (label, self.instCallback)
+    broker.methodListener (label, self.methodCallback)
+    broker.start()
+    return label
+
+  def registerCallback(self, callback):
+    self.currentMethodId += 1
+    methodId = self.currentMethodId
+    self.outstandingMethodCalls[methodId] = callback
+    return methodId
+
+  def allSystems(self):
+    return MgmtSystem.select()

Modified: mgmt/mint/python/mint/schema.py
===================================================================
--- mgmt/mint/python/mint/schema.py	2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/python/mint/schema.py	2007-11-13 19:45:55 UTC (rev 1303)
@@ -1,127 +1,241 @@
 from sqlobject import *
 
-class MgmtServer(SQLObject):
+class MgmtSystemStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
-  def joinCluster():
-    pass
 
-  def leaveCluster():
-    pass
+class MgmtSystem(SQLObject):
+  schemaId = 1
+  schemaName = "system"
+  managedBroker = None
+  class sqlmeta:
+    fromDatabase = True
+  _SO_class_Mgmt_system_stats = None
 
+class MgmtBrokerStats(SQLObject):
+  class sqlmeta:
+    fromDatabase = True
 
-class MgmtServerStats(SQLObject):
+class MgmtBroker(SQLObject):
+  schemaId = 2
+  schemaName = "broker"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
+  def joinCluster(self, model, callbackMethod, clusterName):
+    actualArgs = dict()
+    actualArgs["clusterName"] = clusterName
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "joinCluster", args=actualArgs, packageName="qpid")
+  def leaveCluster(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "leaveCluster", args=actualArgs, packageName="qpid")
+  def echo(self, model, callbackMethod, sequence, body):
+    actualArgs = dict()
+    actualArgs["sequence"] = sequence
+    actualArgs["body"] = body
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "echo", args=actualArgs, packageName="qpid")
+  def crash(self, model, callbackMethod):
+    """Temporary test method to crash the broker"""
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "crash", args=actualArgs, packageName="qpid")
+  _SO_class_Mgmt_broker_stats = None
+  _SO_class_Mgmt_system = None
 
+class MgmtVhostStats(SQLObject):
+  class sqlmeta:
+    fromDatabase = True
+
+MgmtSystem.sqlmeta.addJoin(MultipleJoin('MgmtBroker', joinMethodName='allBrokers'))
+
 class MgmtVhost(SQLObject):
+  schemaId = 3
+  schemaName = "vhost"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
+  _SO_class_Mgmt_vhost_stats = None
+  _SO_class_Mgmt_broker = None
 
-class MgmtVhostStats(SQLObject):
+class MgmtQueueStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtBroker.sqlmeta.addJoin(MultipleJoin('MgmtVhost', joinMethodName='allVhosts'))
+
 class MgmtQueue(SQLObject):
+  schemaId = 4
+  schemaName = "queue"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
-  def purge():
+  def purge(self, model, callbackMethod):
     """Discard all messages on queue"""
-    pass
-
-  def increaseDiskSize():
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "purge", args=actualArgs, packageName="qpid")
+  def increaseDiskSize(self, model, callbackMethod, pages):
     """Increase number of disk pages allocated for this queue"""
-    pass
+    actualArgs = dict()
+    actualArgs["pages"] = pages
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "increaseDiskSize", args=actualArgs, packageName="qpid")
+  _SO_class_Mgmt_queue_stats = None
+  _SO_class_Mgmt_vhost = None
 
-
-class MgmtQueueStats(SQLObject):
+class MgmtExchangeStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtQueue', joinMethodName='allQueues'))
+
 class MgmtExchange(SQLObject):
+  schemaId = 5
+  schemaName = "exchange"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
+  _SO_class_Mgmt_exchange_stats = None
+  _SO_class_Mgmt_vhost = None
 
-class MgmtExchangeStats(SQLObject):
+class MgmtBindingStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtExchange', joinMethodName='allExchanges'))
+
 class MgmtBinding(SQLObject):
+  schemaId = 6
+  schemaName = "binding"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
+  _SO_class_Mgmt_binding_stats = None
+  _SO_class_Mgmt_queue = None
+  _SO_class_Mgmt_exchange = None
 
-class MgmtBindingStats(SQLObject):
+class MgmtClientStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtQueue.sqlmeta.addJoin(MultipleJoin('MgmtBinding', joinMethodName='allBindings'))
+MgmtExchange.sqlmeta.addJoin(MultipleJoin('MgmtBinding', joinMethodName='allBindings'))
+
 class MgmtClient(SQLObject):
+  schemaId = 7
+  schemaName = "client"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
-  def close():
-    pass
+  def close(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "close", args=actualArgs, packageName="qpid")
+  def detach(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "detach", args=actualArgs, packageName="qpid")
+  _SO_class_Mgmt_client_stats = None
+  _SO_class_Mgmt_vhost = None
 
-  def detach():
-    pass
-
-
-class MgmtClientStats(SQLObject):
+class MgmtSessionStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtClient', joinMethodName='allClients'))
+
 class MgmtSession(SQLObject):
+  schemaId = 8
+  schemaName = "session"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
-  def solicitAck():
-    pass
+  def solicitAck(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "solicitAck", args=actualArgs, packageName="qpid")
+  def detach(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "detach", args=actualArgs, packageName="qpid")
+  def resetLifespan(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "resetLifespan", args=actualArgs, packageName="qpid")
+  def close(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "close", args=actualArgs, packageName="qpid")
+  _SO_class_Mgmt_session_stats = None
+  _SO_class_Mgmt_vhost = None
+  _SO_class_Mgmt_client = None
 
-  def detach():
-    pass
-
-  def resetLifespan():
-    pass
-
-  def close():
-    pass
-
-
-class MgmtSessionStats(SQLObject):
+class MgmtDestinationStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtVhost.sqlmeta.addJoin(MultipleJoin('MgmtSession', joinMethodName='allSessions'))
+MgmtClient.sqlmeta.addJoin(MultipleJoin('MgmtSession', joinMethodName='allSessions'))
+
 class MgmtDestination(SQLObject):
+  schemaId = 9
+  schemaName = "destination"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
-  def throttle():
+  def throttle(self, model, callbackMethod, strength):
     """Apply extra rate limiting to destination: 0 = Normal, 10 = Maximum"""
-    pass
+    actualArgs = dict()
+    actualArgs["strength"] = strength
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "throttle", args=actualArgs, packageName="qpid")
+  def stop(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "stop", args=actualArgs, packageName="qpid")
+  def start(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "start", args=actualArgs, packageName="qpid")
+  _SO_class_Mgmt_destination_stats = None
+  _SO_class_Mgmt_session = None
 
-  def stop():
-    pass
-
-  def start():
-    pass
-
-
-class MgmtDestinationStats(SQLObject):
+class MgmtProducerStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtSession.sqlmeta.addJoin(MultipleJoin('MgmtDestination', joinMethodName='allDestinations'))
+
 class MgmtProducer(SQLObject):
+  schemaId = 10
+  schemaName = "producer"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
+  _SO_class_Mgmt_producer_stats = None
+  _SO_class_Mgmt_destination = None
+  _SO_class_Mgmt_exchange = None
 
-class MgmtProducerStats(SQLObject):
+class MgmtConsumerStats(SQLObject):
   class sqlmeta:
     fromDatabase = True
 
+MgmtDestination.sqlmeta.addJoin(MultipleJoin('MgmtProducer', joinMethodName='allProducers'))
+MgmtExchange.sqlmeta.addJoin(MultipleJoin('MgmtProducer', joinMethodName='allProducers'))
+
 class MgmtConsumer(SQLObject):
+  schemaId = 11
+  schemaName = "consumer"
+  managedBroker = None
   class sqlmeta:
     fromDatabase = True
-  def close():
-    pass
-
-
-class MgmtConsumerStats(SQLObject):
-  class sqlmeta:
-    fromDatabase = True
-
+  def close(self, model, callbackMethod):
+    actualArgs = dict()
+    methodId = model.registerCallback(callbackMethod)
+    self.managedBroker.method(methodId, self.idOriginal, self.schemaName, "close", args=actualArgs, packageName="qpid")
+  _SO_class_Mgmt_consumer_stats = None
+  _SO_class_Mgmt_destination = None
+  _SO_class_Mgmt_queue = None

Added: mgmt/mint/python/mint/schema.sql
===================================================================
--- mgmt/mint/python/mint/schema.sql	                        (rev 0)
+++ mgmt/mint/python/mint/schema.sql	2007-11-13 19:45:55 UTC (rev 1303)
@@ -0,0 +1,332 @@
+
+CREATE TABLE mgmt_system (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_system_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  sys_ident VARCHAR(1000) 
+);
+
+CREATE INDEX mgmt_system_sys_ident_index ON mgmt_system(sys_ident);
+
+CREATE TABLE mgmt_system_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_system_id BIGINT REFERENCES mgmt_system ,
+  rec_time TIMESTAMP
+);
+
+ALTER TABLE mgmt_system ADD FOREIGN KEY (mgmt_system_stats_id) REFERENCES mgmt_system_stats;
+
+CREATE TABLE mgmt_broker (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_broker_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_system_id BIGINT REFERENCES mgmt_system,
+  port INT2 ,
+  worker_threads INT2 ,
+  max_conns INT2 ,
+  conn_backlog INT2 ,
+  staging_threshold INT4 ,
+  store_lib VARCHAR(1000) ,
+  async_store BOOLEAN ,
+  mgmt_pub_interval INT2 ,
+  initial_disk_page_size INT4 ,
+  initial_pages_per_queue INT4 ,
+  cluster_name VARCHAR(1000) ,
+  version VARCHAR(1000) 
+);
+
+CREATE INDEX mgmt_broker_port_index ON mgmt_broker(port);
+
+CREATE TABLE mgmt_broker_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_broker_id BIGINT REFERENCES mgmt_broker ,
+  rec_time TIMESTAMP
+);
+
+ALTER TABLE mgmt_broker ADD FOREIGN KEY (mgmt_broker_stats_id) REFERENCES mgmt_broker_stats;
+
+CREATE TABLE mgmt_vhost (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_vhost_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_broker_id BIGINT REFERENCES mgmt_broker,
+  name VARCHAR(1000) 
+);
+
+CREATE INDEX mgmt_vhost_name_index ON mgmt_vhost(name);
+
+CREATE TABLE mgmt_vhost_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_vhost_id BIGINT REFERENCES mgmt_vhost ,
+  rec_time TIMESTAMP
+);
+
+ALTER TABLE mgmt_vhost ADD FOREIGN KEY (mgmt_vhost_stats_id) REFERENCES mgmt_vhost_stats;
+
+CREATE TABLE mgmt_queue (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_queue_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+  name VARCHAR(1000) ,
+  durable BOOLEAN ,
+  auto_delete BOOLEAN ,
+  exclusive BOOLEAN ,
+  page_memory_limit INT4 
+);
+
+CREATE INDEX mgmt_queue_name_index ON mgmt_queue(name);
+
+CREATE TABLE mgmt_queue_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_queue_id BIGINT REFERENCES mgmt_queue ,
+  rec_time TIMESTAMP,
+  disk_page_size INT4 ,
+  disk_pages INT4 ,
+  disk_available_size INT4 ,
+  msg_total_enqueues INT8 ,
+  msg_total_dequeues INT8 ,
+  msg_txn_enqueues INT8 ,
+  msg_txn_dequeues INT8 ,
+  msg_persist_enqueues INT8 ,
+  msg_persist_dequeues INT8 ,
+  msg_depth INT4 ,
+  msg_depth_high INT4 ,
+  msg_depth_low INT4 ,
+  byte_total_enqueues INT8 ,
+  byte_total_dequeues INT8 ,
+  byte_txn_enqueues INT8 ,
+  byte_txn_dequeues INT8 ,
+  byte_persist_enqueues INT8 ,
+  byte_persist_dequeues INT8 ,
+  byte_depth INT4 ,
+  byte_depth_high INT4 ,
+  byte_depth_low INT4 ,
+  enqueue_txn_starts INT8 ,
+  enqueue_txn_commits INT8 ,
+  enqueue_txn_rejects INT8 ,
+  enqueue_txn_count INT4 ,
+  enqueue_txn_count_high INT4 ,
+  enqueue_txn_count_low INT4 ,
+  dequeue_txn_starts INT8 ,
+  dequeue_txn_commits INT8 ,
+  dequeue_txn_rejects INT8 ,
+  dequeue_txn_count INT4 ,
+  dequeue_txn_count_high INT4 ,
+  dequeue_txn_count_low INT4 ,
+  consumers INT4 ,
+  consumers_high INT4 ,
+  consumers_low INT4 ,
+  bindings INT4 ,
+  bindings_high INT4 ,
+  bindings_low INT4 ,
+  unacked_messages INT4 ,
+  unacked_messages_high INT4 ,
+  unacked_messages_low INT4 
+);
+
+ALTER TABLE mgmt_queue ADD FOREIGN KEY (mgmt_queue_stats_id) REFERENCES mgmt_queue_stats;
+
+CREATE TABLE mgmt_exchange (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_exchange_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+  name VARCHAR(1000) ,
+  type VARCHAR(1000) 
+);
+
+CREATE INDEX mgmt_exchange_name_index ON mgmt_exchange(name);
+
+CREATE TABLE mgmt_exchange_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_exchange_id BIGINT REFERENCES mgmt_exchange ,
+  rec_time TIMESTAMP,
+  producers INT4 ,
+  producers_high INT4 ,
+  producers_low INT4 ,
+  bindings INT4 ,
+  bindings_high INT4 ,
+  bindings_low INT4 ,
+  msg_receives INT8 ,
+  msg_drops INT8 ,
+  msg_routes INT8 ,
+  byte_receives INT8 ,
+  byte_drops INT8 ,
+  byte_routes INT8 
+);
+
+ALTER TABLE mgmt_exchange ADD FOREIGN KEY (mgmt_exchange_stats_id) REFERENCES mgmt_exchange_stats;
+
+CREATE TABLE mgmt_binding (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_binding_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_queue_id BIGINT REFERENCES mgmt_queue,
+  mgmt_exchange_id BIGINT REFERENCES mgmt_exchange,
+  binding_key VARCHAR(1000) 
+);
+
+CREATE TABLE mgmt_binding_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_binding_id BIGINT REFERENCES mgmt_binding ,
+  rec_time TIMESTAMP,
+  msg_matched INT8 
+);
+
+ALTER TABLE mgmt_binding ADD FOREIGN KEY (mgmt_binding_stats_id) REFERENCES mgmt_binding_stats;
+
+CREATE TABLE mgmt_client (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_client_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+  ip_addr INT4 ,
+  port INT2 
+);
+
+CREATE INDEX mgmt_client_ip_addr_index ON mgmt_client(ip_addr);
+
+CREATE INDEX mgmt_client_port_index ON mgmt_client(port);
+
+CREATE TABLE mgmt_client_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_client_id BIGINT REFERENCES mgmt_client ,
+  rec_time TIMESTAMP,
+  auth_identity VARCHAR(1000) ,
+  msgs_produced INT8 ,
+  msgs_consumed INT8 ,
+  bytes_produced INT8 ,
+  bytes_consumed INT8 
+);
+
+ALTER TABLE mgmt_client ADD FOREIGN KEY (mgmt_client_stats_id) REFERENCES mgmt_client_stats;
+
+CREATE TABLE mgmt_session (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_session_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_vhost_id BIGINT REFERENCES mgmt_vhost,
+  name VARCHAR(1000) ,
+  mgmt_client_id BIGINT REFERENCES mgmt_client,
+  detached_lifespan INT4 
+);
+
+CREATE INDEX mgmt_session_name_index ON mgmt_session(name);
+
+CREATE TABLE mgmt_session_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_session_id BIGINT REFERENCES mgmt_session ,
+  rec_time TIMESTAMP,
+  attached BOOLEAN ,
+  remaining_lifespan INT4 ,
+  frames_outstanding INT4 
+);
+
+ALTER TABLE mgmt_session ADD FOREIGN KEY (mgmt_session_stats_id) REFERENCES mgmt_session_stats;
+
+CREATE TABLE mgmt_destination (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_destination_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_session_id BIGINT REFERENCES mgmt_session,
+  name VARCHAR(1000) 
+);
+
+CREATE INDEX mgmt_destination_name_index ON mgmt_destination(name);
+
+CREATE TABLE mgmt_destination_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_destination_id BIGINT REFERENCES mgmt_destination ,
+  rec_time TIMESTAMP,
+  flow_mode INT2 ,
+  max_msg_credits INT4 ,
+  max_byte_credits INT4 ,
+  msg_credits INT4 ,
+  byte_credits INT4 
+);
+
+ALTER TABLE mgmt_destination ADD FOREIGN KEY (mgmt_destination_stats_id) REFERENCES mgmt_destination_stats;
+
+CREATE TABLE mgmt_producer (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_producer_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_destination_id BIGINT REFERENCES mgmt_destination,
+  mgmt_exchange_id BIGINT REFERENCES mgmt_exchange
+);
+
+CREATE TABLE mgmt_producer_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_producer_id BIGINT REFERENCES mgmt_producer ,
+  rec_time TIMESTAMP,
+  msgs_produced INT8 ,
+  bytes_produced INT8 
+);
+
+ALTER TABLE mgmt_producer ADD FOREIGN KEY (mgmt_producer_stats_id) REFERENCES mgmt_producer_stats;
+
+CREATE TABLE mgmt_consumer (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_consumer_stats_id BIGINT,
+  rec_time TIMESTAMP,
+  creation_time TIMESTAMP,
+  deletion_time TIMESTAMP,
+  mgmt_destination_id BIGINT REFERENCES mgmt_destination,
+  mgmt_queue_id BIGINT REFERENCES mgmt_queue
+);
+
+CREATE TABLE mgmt_consumer_stats (
+  id BIGSERIAL PRIMARY KEY,
+  id_original BIGINT,
+  mgmt_consumer_id BIGINT REFERENCES mgmt_consumer ,
+  rec_time TIMESTAMP,
+  msgs_consumed INT8 ,
+  bytes_consumed INT8 ,
+  unacked_messages INT4 ,
+  unacked_messages_high INT4 ,
+  unacked_messages_low INT4 
+);
+
+ALTER TABLE mgmt_consumer ADD FOREIGN KEY (mgmt_consumer_stats_id) REFERENCES mgmt_consumer_stats;

Deleted: mgmt/mint/python/mint/updater.py
===================================================================
--- mgmt/mint/python/mint/updater.py	2007-11-13 18:36:01 UTC (rev 1302)
+++ mgmt/mint/python/mint/updater.py	2007-11-13 19:45:55 UTC (rev 1303)
@@ -1,57 +0,0 @@
-from time import sleep
-from datetime import datetime
-
-from schema import *
-
-class MintUpdater(object):
-    def __init__(self, model, broker):
-        self.model = model
-        self.broker = broker
-
-        self.broker.configListener("XXXcontext", configCallback)
-        self.broker.instrumentationListener("XXXcontext", instCallback)
-
-    def start(self):
-        self.broker.start()
-
-        while True:
-            sleep(1)
-
-def getQueueByName(name, create=False):
-  try:
-    queues = MgmtQueue.selectBy(name=name)[:1]
-    queue = queues[0]
-  except IndexError:
-    if (create): queue = MgmtQueue()
-  return queue
-
-def configCallback(broker, oid, list, timestamps):
-  print "broker=" + broker
-  if oid == 4:
-    print list
-    d = dict(list)
-    queue = getQueueByName(d["name"], True)
-    queue.set(**d)
-    recOn = datetime.fromtimestamp(timestamps[0]/1000000000)
-    createdOn = datetime.fromtimestamp(timestamps[1]/1000000000)
-    queue.set(recTime=recOn,creationTime=createdOn)
-    print queue.id
-    print " -> " + d["name"]
-    return queue
-
-def instCallback(broker, oid, list, timestamps):
-  print "broker=" + broker
-  if oid == 4:
-    print list
-    d = dict(list)
-    queue = getQueueByName(d.pop("name"))
-    d["mgmtQueue"] = queue.id
-    recOn = datetime.fromtimestamp(timestamps[0]/1000000000)
-    queueStats = MgmtQueueStats()
-    queueStats.set(recTime=recOn)
-    queueStats.set(**d)
-    if (timestamps[2] != 0):
-      deletedOn = datetime.fromtimestamp(timestamps[2]/1000000000)
-      queue.set(deletionTime=deletedOn)
-    print queue.id
-    return queueStats




More information about the rhmessaging-commits mailing list