[rhmessaging-commits] rhmessaging commits: r3253 - in mgmt/trunk: mint/instance/etc and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Apr 2 15:49:59 EDT 2009


Author: justi9
Date: 2009-04-02 15:49:59 -0400 (Thu, 02 Apr 2009)
New Revision: 3253

Added:
   mgmt/trunk/mint/python/mint/database.py
   mgmt/trunk/mint/python/mint/expire.py
   mgmt/trunk/mint/python/mint/model.py
   mgmt/trunk/mint/python/mint/poll.py
Removed:
   mgmt/trunk/mint/python/mint/dbexpire.py
Modified:
   mgmt/trunk/cumin/python/cumin/broker.py
   mgmt/trunk/cumin/python/cumin/limits.py
   mgmt/trunk/cumin/python/cumin/managementserver.py
   mgmt/trunk/cumin/python/cumin/model.py
   mgmt/trunk/cumin/python/cumin/tools.py
   mgmt/trunk/mint/instance/etc/mint.conf
   mgmt/trunk/mint/python/mint/__init__.py
   mgmt/trunk/mint/python/mint/main.py
   mgmt/trunk/mint/python/mint/tools.py
   mgmt/trunk/mint/python/mint/update.py
   mgmt/trunk/mint/python/mint/util.py
Log:
Don't enable debug by default for the devel mint instance

Modified: mgmt/trunk/cumin/python/cumin/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/broker.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/broker.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -79,7 +79,7 @@
 
         def render_content(self, session, data):
             scopeId = data["qmf_scope_id"]
-            dt = self.app.model.data.getLatestHeartbeat(scopeId)
+            dt = self.app.model.mint.model.getLatestHeartbeat(scopeId)
 
             if dt is None:
                 return fmt_none()
@@ -202,7 +202,7 @@
 
             try:
                 id = broker.qmfBrokerId
-                mbroker = self.app.model.data.mintBrokersById[id]
+                mbroker = self.app.model.mint.model.mintBrokersById[id]
                 connected = mbroker.connected
             except KeyError:
                 pass

Modified: mgmt/trunk/cumin/python/cumin/limits.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/limits.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/limits.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -52,7 +52,7 @@
 #TODO: this probably shouldn't be called until after the invoke completes
         def completion():
             pass
-        negotiator.Reconfig(self.app.model.data, completion)
+        negotiator.Reconfig(self.app.model.mint.model, completion)
 
     def get_raw_limits(self, session, negotiator):
         action = self.app.model.negotiator.GetLimits

Modified: mgmt/trunk/cumin/python/cumin/managementserver.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/managementserver.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/managementserver.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -41,7 +41,7 @@
             url = data["url"]
 
             try:
-                server = self.app.model.data.mintBrokersByUrl[url]
+                server = self.app.model.mint.model.mintBrokersByUrl[url]
 
                 if server.connected:
                     html = "Connected"
@@ -274,7 +274,7 @@
             url = data["url"]
 
             try:
-                server = self.app.model.data.mintBrokersByUrl[url]
+                server = self.app.model.mint.model.mintBrokersByUrl[url]
 
                 if server.connected:
                     html = "Connected"

Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/model.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,31 +1,37 @@
+import logging
 from datetime import datetime, timedelta
+from mint import Mint, MintConfig
+from struct import unpack, calcsize
+from types import *
+from wooly import *
+from wooly.parameters import *
+from wooly.widgets import *
+
 from formats import *
 from job import *
 from parameters import *
 from pool import PoolSlotSet, PoolJobStats
 from slot import SlotStatSet
-from struct import unpack, calcsize
 from system import SystemSlotSet
 from time import *
-from types import *
 from util import *
-from wooly import *
-from wooly.parameters import *
-from wooly.widgets import *
-from mint.schema import *
-import logging
 
-
 log = logging.getLogger("cumin.model")
 
 class CuminModel(object):
     def __init__(self, app, data_uri):
         self.app = app
 
-        self.data = MintModel(data_uri)
-        self.data.updateObjects = False
-        self.data.expireObjects = False
+        opts = {"data": data_uri, "qmf": None}
 
+        config = MintConfig()
+        config.init(opts)
+
+        self.mint = Mint(config)
+        self.mint.updateEnabled = False
+        self.mint.pollEnabled = True
+        self.mint.expireEnabled = False
+
         self.classes = list()
         self.invocations = set()
 
@@ -68,10 +74,10 @@
         CuminSlot(self)
 
     def check(self):
-        self.data.check()
+        self.mint.check()
 
     def init(self):
-        self.data.init()
+        self.mint.init()
 
         self.frame = self.app.main_page.main
 
@@ -79,10 +85,10 @@
             cls.init()
 
     def start(self):
-        self.data.start()
+        self.mint.start()
 
     def stop(self):
-        self.data.stop()
+        self.mint.stop()
 
     def add_class(self, cls):
         self.classes.append(cls)
@@ -306,7 +312,7 @@
     def get_session_by_object(self, object):
         assert object
 
-        broker = self.model.data.mintBrokersById[object.qmfBrokerId]
+        broker = self.mint.model.mintBrokersById[object.qmfBrokerId]
 
         return broker.getAmqpSession()
 
@@ -737,7 +743,7 @@
                 master = Master.select("System = '%s'" % system_name)[0]
             except IndexError:
                 raise Exception("Master daemon not running")
-            master.Start(self.model.data, completion, args["subsystem"])
+            master.Start(self.mint.model, completion, args["subsystem"])
 
     class Stop(CuminAction):
         def do_invoke(self, object, args, completion):
@@ -746,7 +752,7 @@
                 master = Master.select("System = '%s'" % system_name)[0]
             except IndexError:
                 raise Exception("Master daemon not running")
-            #master.Stop(self.model.data, completion, args["subsystem"])
+            #master.Stop(self.mint.model, completion, args["subsystem"])
 
 class CuminBroker(RemoteClass):
     def __init__(self, model):
@@ -816,7 +822,8 @@
             connected = False
             if broker:
                 try:
-                    mbroker = self.model.data.mintBrokersById[broker.qmfBrokerId]
+                    mbroker = self.mint.model.mintBrokersById \
+                        [broker.qmfBrokerId]
                     connected = mbroker.connected
                 except KeyError:
                     pass
@@ -897,7 +904,7 @@
             else:
                 authMechanism = "PLAIN"
 
-            broker.connect(self.model.data, completion, host, port, durable,
+            broker.connect(self.mint.model, completion, host, port, durable,
                            authMechanism, username, password, transport)
 
     class AddQueue(CuminAction):
@@ -1131,7 +1138,7 @@
             return "Purge"
 
         def do_invoke(self, queue, args, completion):
-            queue.purge(self.model.data, completion, args["request"])
+            queue.purge(self.mint.model, completion, args["request"])
 
     class Remove(CuminAction):
         def show(self, session, queue):
@@ -1174,7 +1181,9 @@
 
         def do_invoke(self, queue, args, completion):
             broker = queue.vhost.broker
-            broker.queueMoveMessages(self.model.data, completion, args["srcQueue"], args["destQueue"], args["qty"])
+            broker.queueMoveMessages(self.mint.model, completion,
+                                     args["srcQueue"], args["destQueue"],
+                                     args["qty"])
 
 class CuminExchange(RemoteClass):
     def __init__(self, model):
@@ -1351,7 +1360,7 @@
             return frame.show_remove(session)
 
         def do_invoke(self, bridge, args, completion):
-            bridge.close(self.model.data, completion)
+            bridge.close(self.mint.model, completion)
 
 class CuminConnection(RemoteClass):
     def __init__(self, model):
@@ -1421,7 +1430,7 @@
         def do_invoke(self, conn, args, completion):
             session_ids = set()
 
-            for broker in self.model.data.mintBrokersByQmfBroker:
+            for broker in self.mint.model.mintBrokersByQmfBroker:
                 session_ids.add(broker.getSessionId())
 
             for sess in conn.sessions:
@@ -1430,7 +1439,7 @@
                         ("Cannot close management connection %s" % \
                              conn.address)
 
-            conn.close(self.model.data, completion)
+            conn.close(self.mint.model, completion)
 
 class CuminSession(RemoteClass):
     def __init__(self, model):
@@ -1472,38 +1481,38 @@
             return "Close"
 
         def do_invoke(self, sess, args, completion):
-            for broker in self.model.data.mintBrokersByQmfBroker:
+            for broker in self.mint.model.mintBrokersByQmfBroker:
                 if sess.name == broker.getSessionId():
                     raise Exception \
                         ("Cannot close management session %s" % sess.name)
 
-            sess.close(self.model.data, completion)
+            sess.close(self.mint.model, completion)
 
     class Detach(CuminAction):
         def get_title(self, session):
             return "Detach"
 
         def do_invoke(self, sess, args, completion):
-            for broker in self.model.data.mintBrokersByQmfBroker:
+            for broker in self.mint.model.mintBrokersByQmfBroker:
                 if sess.name == broker.getSessionId():
                     raise Exception \
                         ("Cannot detach management session %s" % sess.name)
 
-            sess.detach(self.model.data, completion)
+            sess.detach(self.mint.model, completion)
 
     class ResetLifespan(CuminAction):
         def get_title(self, session):
             return "Reset Lifespan"
 
         def do_invoke(self, object, args, completion):
-            object.resetLifespan(self.model.data, completion)
+            object.resetLifespan(self.mint.model, completion)
 
     class SolicitAck(CuminAction):
         def get_title(self, session):
             return "Solicit Acknowledgment"
 
         def do_invoke(self, object, args, completion):
-            object.solicitAck(self.model.data, completion)
+            object.solicitAck(self.mint.model, completion)
 
 class CuminLink(RemoteClass):
     def __init__(self, model):
@@ -1577,7 +1586,7 @@
             excludes = args["excludes"]
             sync = args["sync"]
 
-            link.bridge(self.model.data, completion,
+            link.bridge(self.mint.model, completion,
                         durable, src, dest, key,
                         tag, excludes, False, False, dynamic, sync)
 
@@ -1590,7 +1599,7 @@
             return "Close"
 
         def do_invoke(self, link, args, completion):
-            link.close(self.model.data, completion)
+            link.close(self.mint.model, completion)
 
 class CuminBrokerStoreModule(RemoteClass):
     def __init__(self, model):
@@ -2110,8 +2119,8 @@
         def do_invoke(self, limit, negotiator, completion):
             Name = limit.id
             Max = limit.max
-            negotiator.SetLimit(self.model.data, completion, Name, Max)
-            #negotiator.SetLimit(self.model.data, completion, Name, str(Max))
+            negotiator.SetLimit(self.mint.model, completion, Name, Max)
+            #negotiator.SetLimit(self.mint.model, completion, Name, str(Max))
 
 class CuminJobGroup(CuminClass):
     def __init__(self, model):
@@ -2418,7 +2427,7 @@
                 return self.got_data
 
             try:
-                job.GetAd(self.model.data, completion, None)
+                job.GetAd(self.mint.model, completion, None)
             except Exception, e:
                 return self.job_ads
 
@@ -2452,7 +2461,7 @@
 
             try:
                 data = None
-                job.Fetch(self.model.data, completion, file, start, end, data)
+                job.Fetch(self.mint.model, completion, file, start, end, data)
                 # wait for up to 20 seconds for completion to be called
                 wait(predicate, timeout=20)
                 if not self.got_data:
@@ -2475,7 +2484,7 @@
             return "Hold"
 
         def do_invoke(self, job, reason, completion):
-            job.Hold(self.model.data, completion, reason)
+            job.Hold(self.mint.model, completion, reason)
 
         def get_enabled(self, session, job):
             is_held = JobStatusInfo.get_status_int("Held") == job.JobStatus
@@ -2492,7 +2501,7 @@
             return "Release"
 
         def do_invoke(self, job, reason, completion):
-            job.Release(self.model.data, completion, reason)
+            job.Release(self.mint.model, completion, reason)
 
         def get_enabled(self, session, job):
             is_held = JobStatusInfo.get_status_int("Held") == job.JobStatus
@@ -2509,7 +2518,7 @@
             return "Remove"
 
         def do_invoke(self, job, reason, completion):
-            job.Remove(self.model.data, completion, reason)
+            job.Remove(self.mint.model, completion, reason)
 
         def get_enabled(self, session, job):
             is_deleted = job.qmfDeleteTime is not None
@@ -2525,7 +2534,7 @@
         def do_invoke(self, job, args, completion):
             Name = args[0]
             Value = args[1]
-            job.SetAttribute(self.model.data, completion, Name, str(Value))
+            job.SetAttribute(self.mint.model, completion, Name, str(Value))
 
 class GetStartedAction(CuminAction):
     def get_xml_response(self, session, object, *args):
@@ -2808,7 +2817,7 @@
                 return self.got_data
 
             try:
-                negotiator.GetLimits(self.model.data, completion, None)
+                negotiator.GetLimits(self.mint.model, completion, None)
             except Exception, e:
                 return self.lim
 

Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/tools.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -155,7 +155,15 @@
     def init(self):
         super(CuminAdminTool, self).init()
 
-        self.database = MintDatabase(self.config.data)
+        config = MintConfig()
+        config.init({"data": self.config.data})
+
+        app = Mint(config)
+        app.updateEnabled = False
+        app.pollEnabled = False
+        app.expireEnabled = False
+
+        self.database = MintDatabase(app)
         self.database.check()
         self.database.init()
 
@@ -189,8 +197,6 @@
 
         try:
             opts, args = command.parse(remaining)
-
-            print opts, args, remaining
         except CommandException, e:
             print "Error: %s" % e.message
             e.command.print_help()

Modified: mgmt/trunk/mint/instance/etc/mint.conf
===================================================================
--- mgmt/trunk/mint/instance/etc/mint.conf	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/instance/etc/mint.conf	2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,3 +1,3 @@
 [main]
 data: postgresql://cumin@localhost/cumin
-debug: True
+#debug: True

Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/__init__.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,615 +1,2 @@
-import logging
-import qmf.console
-import pickle
-import struct
-import sys
-import os
-import types
-import socket
-
-from parsley.config import Config, ConfigParameter
-from psycopg2 import OperationalError
-from qmf.console import ClassKey
-from qpid.datatypes import UUID
-from qpid.util import URL
-from sqlobject import *
-from threading import Lock, RLock
-from time import sleep
-from traceback import print_exc
-
-from mint import update
-from mint.cache import MintCache
-from mint.dbexpire import DBExpireThread
-from mint.schema import *
-from util import *
-
-log = logging.getLogger("mint")
-
-thisModule = __import__(__name__)
-
-for item in dir(schema):
-  cls = getattr(schema, item)
-
-  try:
-    if issubclass(cls, SQLObject) and cls is not SQLObject:
-      setattr(thisModule, item, cls)
-  except TypeError:
-    pass
-
-Broker.sqlmeta.addColumn(ForeignKey("BrokerRegistration",
-                                    cascade="null", default=None,
-                                    name="registration"))
-
-class MintDatabase(object):
-  def __init__(self, uri):
-    self.uri = uri
-
-  def getConnection(self):
-    return connectionForURI(self.uri).getConnection()
-
-  def check(self):
-    self.checkConnection()
-
-  def init(self):
-    sqlhub.processConnection = connectionForURI(self.uri)
-
-  def checkConnection(self):
-    conn = self.getConnection()
-
-    try:
-      cursor = conn.cursor()
-      cursor.execute("select now()")
-    finally:
-      conn.close()
-
-  def checkSchema(self):
-    pass
-
-  def dropSchema(self):
-    conn = self.getConnection()
-    try:
-      cursor = conn.cursor()
-
-      cursor.execute("drop schema public cascade")
-
-      conn.commit()
-    finally:
-      conn.close()
-
-  def __splitSQLStatements(self, text):
-    result = list()
-    unmatchedQuote = False
-    tmpStmt = ""
-
-    for stmt in text.split(";"):
-      stmt = stmt.rstrip()
-      quotePos = stmt.find("'")
-      while quotePos > 0:
-        quotePos += 1
-        if quotePos < len(stmt):
-          if stmt[quotePos] != "'":
-            unmatchedQuote = not unmatchedQuote
-          else:
-            # ignore 2 single quotes
-            quotePos += 1
-        quotePos = stmt.find("'", quotePos)
-
-      if len(stmt.lstrip()) > 0:
-          tmpStmt += stmt + ";"
-          if not unmatchedQuote:
-            # single quote has been matched/closed, generate statement
-            result.append(tmpStmt.lstrip())
-            tmpStmt = ""
-
-    if unmatchedQuote:
-        result.append(tmpStmt.lstrip())
-    return result
-
-  def createSchema(self, file_paths):
-    conn = self.getConnection()
-
-    scripts = list()
-
-    for path in file_paths:
-      file = open(path, "r")
-      try:
-        scripts.append((path, file.read()))
-      finally:
-        file.close()
-
-    try:
-      cursor = conn.cursor()
-
-      try:
-        cursor.execute("create schema public");
-      except:
-        conn.rollback()
-        pass
-
-      for path, text in scripts:
-        stmts = self.__splitSQLStatements(text)
-        count = 0
-
-        for stmt in stmts:
-          stmt = stmt.strip()
-
-          if stmt:
-            try:
-              cursor.execute(stmt)
-            except Exception, e:
-              print "Failed executing statement:"
-              print stmt
-
-              raise e
-
-            count += 1
-
-        print "Executed %i statements from file '%s'" % (count, path)
-
-      conn.commit()
-
-      info = MintInfo(version="0.1")
-      info.sync()
-
-      # Standard roles
-
-      user = Role(name="user")
-      user.sync()
-
-      admin = Role(name="admin")
-      admin.sync()
-    finally:
-      conn.close()
-
-  def checkSchema(self):
-    conn = self.getConnection()
-
-    try:
-      cursor = conn.cursor()
-
-      try:
-        cursor.execute("select version from mint_info");
-      except Exception, e:
-        print "No schema present"
-        return
-
-      for rec in cursor:
-        print "OK (version %s)" % rec[0]
-        return;
-
-      print "No schema present"
-    finally:
-      conn.close()
-
-class Subject(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  name = StringCol(length=1000, default=None, unique=True, notNone=True)
-  password = StringCol(length=1000, default=None)
-  lastChallenged = TimestampCol(default=None)
-  lastLoggedIn = TimestampCol(default=None)
-  lastLoggedOut = TimestampCol(default=None)
-  roles = SQLRelatedJoin("Role", intermediateTable="subject_role_mapping",
-                         createRelatedTable=False)
-
-  def getByName(cls, name):
-    try:
-      return Subject.selectBy(name=name)[0]
-    except IndexError:
-      pass
-
-  getByName = classmethod(getByName)
-
-class Role(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  name = StringCol(length=1000, default=None, unique=True, notNone=True)
-  subjects = SQLRelatedJoin("Subject",
-                            intermediateTable="subject_role_mapping",
-                            createRelatedTable=False)
-
-  def getByName(cls, name):
-    try:
-      return Role.selectBy(name=name)[0]
-    except IndexError:
-      pass
-
-  getByName = classmethod(getByName)
-
-class SubjectRoleMapping(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  subject = ForeignKey("Subject", notNull=True, cascade=True)
-  role = ForeignKey("Role", notNull=True, cascade=True)
-  unique = DatabaseIndex(subject, role, unique=True)
-
-class ObjectNotFound(Exception):
-  pass
-
-class MintInfo(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  version = StringCol(length=1000, default="0.1", notNone=True)
-
-class CollectorRegistration(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  name = StringCol(length=1000, default=None)
-  collectorId = StringCol(length=1000, default=None)
-
-class BrokerRegistration(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  name = StringCol(length=1000, default=None, unique=True, notNone=True)
-  url = StringCol(length=1000, default=None)
-  broker = ForeignKey("Broker", cascade="null", default=None)
-  cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
-  profile = ForeignKey("BrokerProfile", cascade="null", default=None)
-
-  url_unique = DatabaseIndex(url, unique=True)
-
-  def getBrokerId(self):
-    return self.mintBroker.qmfId
-
-  def getDefaultVhost(self):
-    if self.broker:
-      try:
-        return Vhost.selectBy(broker=self.broker, name="/")[0]
-      except IndexError:
-        return None
-
-class BrokerGroup(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  name = StringCol(length=1000, default=None, unique=True, notNone=True)
-  brokers = SQLRelatedJoin("Broker",
-                           intermediateTable="broker_group_mapping",
-                           createRelatedTable=False)
-
-class BrokerGroupMapping(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  broker = ForeignKey("Broker", notNull=True, cascade=True)
-  brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
-  unique = DatabaseIndex(broker, brokerGroup, unique=True)
-
-class BrokerCluster(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  name = StringCol(length=1000, default=None)
-  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
-
-class BrokerProfile(SQLObject):
-  class sqlmeta:
-    lazyUpdate = True
-
-  name = StringCol(length=1000, default=None)
-  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
-  properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
-
-class MintBroker(object):
-  def __init__(self, url, qmfBroker):
-    self.url = url
-    self.qmfBroker = qmfBroker
-
-    self.qmfId = None
-    self.databaseId = None
-
-    self.objectDatabaseIds = MintCache() # database ids by qmf object id
-    self.orphans = dict() # updates by qmf object id
-
-    self.connected = False
-
-  def __repr__(self):
-    return "%s(%s)" % (self.__class__.__name__, self.url)
-
-  def getAmqpSession(self):
-    return self.qmfBroker.getAmqpSession()
-
-  def getAmqpSessionId(self):
-    return self.qmfBroker.getSessionId()
-
-  def getFullUrl(self):
-    return self.qmfBroker.getFullUrl()
-
-class MintModel(qmf.console.Console):
-  staticInstance = None
-
-  def __init__(self, dataUri, debug=False):
-    self.dataUri = dataUri
-    self.debug = debug
-
-    self.updateObjects = True
-
-    self.pollRegistrations = True
-
-    self.expireObjects = True
-    self.expireFrequency = 600
-    self.expireThreshold = 24 * 3600
-
-    assert MintModel.staticInstance is None
-    MintModel.staticInstance = self
-
-    # Lookup tables used for recovering MintBroker objects, which have
-    # mint-specific accounting and wrap a qmf broker object
-
-    self.mintBrokersByQmfBroker = dict()
-    self.mintBrokersById = dict()
-    self.mintBrokersByUrl = dict()
-
-    # Agent heartbeats
-    # (broker bank, agent bank) => latest heartbeat timestamp
-
-    self.heartbeatsByAgentId = dict()
-
-    self.__lock = RLock()
-
-    self.dbConn = None
-    self.qmfSession = None
-
-    self.updateThread = update.ModelUpdateThread(self)
-    self.expireThread = dbexpire.DBExpireThread(self)
-    self.registrationThread = RegistrationThread(self)
-
-    self.outstandingMethodCalls = dict()
-
-  def lock(self):
-    self.__lock.acquire()
-
-  def unlock(self):
-    self.__lock.release()
-
-  def check(self):
-    try:
-      connectionForURI(self.dataUri)
-    except Exception, e:
-      if hasattr(e, "message") and e.message.find("does not exist"):
-        print "Database not found; run cumin-database-init"
-      raise e
-
-  def init(self):
-    log.info("Object updates are %s",
-             self.updateObjects and "enabled" or "disabled")
-
-    log.info("QMF server polling is %s",
-             self.pollRegistrations and "enabled" or "disabled")
-
-    log.info("Object expiration is %s",
-             self.expireObjects and "enabled" or "disabled")
-
-    sqlhub.processConnection = self.dbConn = connectionForURI(self.dataUri)
-
-    assert self.qmfSession is None
-
-    self.qmfSession = qmf.console.Session \
-        (self, manageConnections=True, rcvObjects=self.updateObjects)
-
-    self.updateThread.init()
-    self.expireThread.init()
-
-  def start(self):
-    self.updateThread.start()
-
-    if self.expireObjects:
-      self.expireThread.start()
-
-    if self.pollRegistrations:
-      self.registrationThread.start()
-
-  def stop(self):
-    self.updateThread.stop()
-
-    if self.expireObjects:
-      self.expireThread.stop()
-
-    if self.pollRegistrations:
-      self.registrationThread.stop()
-
-  def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
-    self.lock()
-    try:
-      broker = self.mintBrokersById[brokerId]
-    finally:
-      self.unlock()
-
-    seq = self.qmfSession._sendMethodRequest \
-        (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
-
-    if seq is not None:
-      self.lock()
-      try:
-        self.outstandingMethodCalls[seq] = callback
-      finally:
-        self.unlock()
-    return seq
-
-  def addBroker(self, url):
-    log.info("Adding qmf broker at %s", url)
-
-    self.lock()
-    try:
-      qbroker = self.qmfSession.addBroker(url)
-      mbroker = MintBroker(url, qbroker)
-
-      # Can't add the by-id mapping here, as the id is not set yet;
-      # instead we add it when brokerConnected is called
-
-      self.mintBrokersByQmfBroker[qbroker] = mbroker
-      self.mintBrokersByUrl[url] = mbroker
-
-      return mbroker
-    finally:
-      self.unlock()
-
-  def delBroker(self, mbroker):
-    assert isinstance(mbroker, MintBroker)
-
-    log.info("Removing qmf broker at %s", mbroker.url)
-
-    self.lock()
-    try:
-      self.qmfSession.delBroker(mbroker.qmfBroker)
-
-      del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
-      del self.mintBrokersByUrl[mbroker.url]
-
-      if mbroker.qmfId:
-        del self.mintBrokersById[mbroker.qmfId]
-    finally:
-      self.unlock()
-
-  def brokerConnected(self, qbroker):
-    """ Invoked when a connection is established to a broker """
-    self.lock()
-    try:
-      mbroker = self.mintBrokersByQmfBroker[qbroker]
-
-      assert mbroker.connected is False
-
-      mbroker.connected = True
-    finally:
-      self.unlock()
-
-  def brokerInfo(self, qbroker):
-    self.lock()
-    try:
-      id = str(qbroker.getBrokerId())
-      mbroker = self.mintBrokersByQmfBroker[qbroker]
-
-      mbroker.qmfId = id
-      self.mintBrokersById[id] = mbroker
-    finally:
-      self.unlock()
-
-  def brokerDisconnected(self, qbroker):
-    """ Invoked when the connection to a broker is lost """
-    self.lock()
-    try:
-      mbroker = self.mintBrokersByQmfBroker[qbroker]
-
-      assert mbroker.connected is True
-
-      mbroker.connected = False
-    finally:
-      self.unlock()
-
-  def getMintBrokerByQmfBroker(self, qbroker):
-    self.lock()
-    try:
-      return self.mintBrokersByQmfBroker[qbroker]
-    finally:
-      self.unlock()
-
-  def newPackage(self, name):
-    """ Invoked when a QMF package is discovered. """
-    pass
-
-  def newClass(self, kind, classKey):
-    """ Invoked when a new class is discovered.  Session.getSchema can be
-    used to obtain details about the class."""
-    pass
-
-  def newAgent(self, agent):
-    """ Invoked when a QMF agent is discovered. """
-    pass
-
-  def delAgent(self, agent):
-    """ Invoked when a QMF agent disconects. """
-    pass
-
-  def objectProps(self, broker, record):
-    """ Invoked when an object is updated. """
-
-    if not self.updateObjects:
-      return
-
-    mbroker = self.getMintBrokerByQmfBroker(broker)
-
-    up = update.PropertyUpdate(self, mbroker, record)
-
-    if record.getClassKey().getClassName() == "job":
-      up.priority = 1
-
-    self.updateThread.enqueue(up)
-
-  def objectStats(self, broker, record):
-    """ Invoked when an object is updated. """
-
-    if not self.updateObjects:
-      return
-
-    if record.getClassKey().getClassName() == "job":
-      return
-
-    mbroker = self.getMintBrokerByQmfBroker(broker)
-    up = update.StatisticUpdate(self, mbroker, record)
-
-    self.updateThread.enqueue(up)
-
-  def event(self, broker, event):
-    """ Invoked when an event is raised. """
-    pass
-
-  def heartbeat(self, agent, timestamp):
-    key = (agent.getBrokerBank(), agent.getAgentBank())
-    timestamp = timestamp / 1000000000
-    self.heartbeatsByAgentId[key] = datetime.fromtimestamp(timestamp)
-
-  def getLatestHeartbeat(self, scopeId):
-    scope = int(scopeId)
-    broker = (scope & 0x0000FFFFF0000000) >> 28
-    agent = scope & 0x000000000FFFFFFF
-    key = (broker, agent)
-
-    return self.heartbeatsByAgentId.get(key)
-
-  def methodResponse(self, broker, seq, response):
-    mbroker = self.getMintBrokerByQmfBroker(broker)
-    up = update.MethodUpdate(self, mbroker, seq, response)
-
-    self.updateThread.enqueue(up)
-
-class RegistrationThread(MintDaemonThread):
-  def run(self):
-    log.info("Polling for registration changes every 5 seconds")
-
-    while True:
-      try:
-        if self.stopRequested:
-          break
-
-        self.do_run()
-      except OperationalError, e:
-        log.exception(e)
-        if str(e).find("server closed the connection unexpectedly") >= 0:
-          sys.exit()
-      except Exception, e:
-        log.exception(e)
-
-  def do_run(self):
-    regUrls = set()
-
-    for reg in BrokerRegistration.select():
-      if reg.url not in self.model.mintBrokersByUrl:
-        try:
-          self.model.addBroker(reg.url)
-        except socket.error, e:
-          log.info("Can't connect to broker at %s: %s", reg.url, e)
-          pass
-
-      regUrls.add(reg.url)
-
-    for mbroker in self.model.mintBrokersByQmfBroker.values():
-      if mbroker.url not in regUrls:
-        self.model.delBroker(mbroker)
-
-    sleep(5)
+from main import *
+from model import *

Added: mgmt/trunk/mint/python/mint/database.py
===================================================================
--- mgmt/trunk/mint/python/mint/database.py	                        (rev 0)
+++ mgmt/trunk/mint/python/mint/database.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,145 @@
+from sqlobject import connectionForURI, sqlhub
+
+from model import MintInfo, Role
+
+class MintDatabase(object):
+  def __init__(self, app):
+    self.app = app
+
+  def getConnection(self):
+    return connectionForURI(self.app.config.data).getConnection()
+
+  def check(self):
+    self.checkConnection()
+
+  def init(self):
+    sqlhub.processConnection = connectionForURI(self.app.config.data)
+
+  def checkConnection(self):
+    conn = self.getConnection()
+
+    try:
+      cursor = conn.cursor()
+      cursor.execute("select now()")
+    finally:
+      conn.close()
+
+  def checkSchema(self):
+    pass
+
+  def dropSchema(self):
+    conn = self.getConnection()
+
+    try:
+      cursor = conn.cursor()
+
+      cursor.execute("drop schema public cascade")
+
+      conn.commit()
+    finally:
+      conn.close()
+
+  def __splitSQLStatements(self, text):
+    result = list()
+    unmatchedQuote = False
+    tmpStmt = ""
+
+    for stmt in text.split(";"):
+      stmt = stmt.rstrip()
+      quotePos = stmt.find("'")
+      while quotePos > 0:
+        quotePos += 1
+        if quotePos < len(stmt):
+          if stmt[quotePos] != "'":
+            unmatchedQuote = not unmatchedQuote
+          else:
+            # ignore 2 single quotes
+            quotePos += 1
+        quotePos = stmt.find("'", quotePos)
+
+      if len(stmt.lstrip()) > 0:
+          tmpStmt += stmt + ";"
+          if not unmatchedQuote:
+            # single quote has been matched/closed, generate statement
+            result.append(tmpStmt.lstrip())
+            tmpStmt = ""
+
+    if unmatchedQuote:
+        result.append(tmpStmt.lstrip())
+    return result
+
+  def createSchema(self, file_paths):
+    conn = self.getConnection()
+
+    scripts = list()
+
+    for path in file_paths:
+      file = open(path, "r")
+      try:
+        scripts.append((path, file.read()))
+      finally:
+        file.close()
+
+    try:
+      cursor = conn.cursor()
+
+      try:
+        cursor.execute("create schema public");
+      except:
+        conn.rollback()
+        pass
+
+      for path, text in scripts:
+        stmts = self.__splitSQLStatements(text)
+        count = 0
+
+        for stmt in stmts:
+          stmt = stmt.strip()
+
+          if stmt:
+            try:
+              cursor.execute(stmt)
+            except Exception, e:
+              print "Failed executing statement:"
+              print stmt
+
+              raise e
+
+            count += 1
+
+        print "Executed %i statements from file '%s'" % (count, path)
+
+      conn.commit()
+
+      info = MintInfo(version="0.1")
+      info.sync()
+
+      # Standard roles
+
+      user = Role(name="user")
+      user.sync()
+
+      admin = Role(name="admin")
+      admin.sync()
+    finally:
+      conn.close()
+
+  def checkSchema(self):
+    conn = self.getConnection()
+
+    try:
+      cursor = conn.cursor()
+
+      try:
+        cursor.execute("select version from mint_info");
+      except Exception, e:
+        print "No schema present"
+        return
+
+      for rec in cursor:
+        print "OK (version %s)" % rec[0]
+        return;
+
+      print "No schema present"
+    finally:
+      conn.close()

Deleted: mgmt/trunk/mint/python/mint/dbexpire.py
===================================================================
--- mgmt/trunk/mint/python/mint/dbexpire.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/dbexpire.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,58 +0,0 @@
-import logging
-import mint
-import time
-from threading import Thread
-from mint.schema import *
-from sql import *
-from util import *
-
-log = logging.getLogger("mint.dbexpire")
-
-class DBExpireThread(MintDaemonThread):
-  def __init__(self, model):
-    super(DBExpireThread, self).__init__(model)
-
-    self.keepCurrStats = False
-
-    self.ops = []
-    self.attrs = dict()
-
-  def init(self):
-    frequency = self.model.expireFrequency
-    threshold = self.model.expireThreshold
-
-    for cls in mint.schema.statsClasses:
-      self.ops.append(SqlExpire(eval(cls), self.keepCurrStats))
-    self.ops.append(SqlExpire(Job, self.keepCurrStats))
-
-    self.attrs["threshold"] = threshold
-
-    frequency_out, frequency_unit = self.__convertTimeUnits(frequency)
-    threshold_out, threshold_unit = self.__convertTimeUnits(threshold)
-    log.debug("Expiring database records older than %d %s, every %d %s" % \
-                (threshold_out, threshold_unit, frequency_out, frequency_unit))
-
-  def run(self):
-    frequency = self.model.expireFrequency
-
-    while True:
-      if self.stopRequested:
-        break
-      up = mint.update.DBExpireUpdate(self.model)
-      self.model.updateThread.enqueue(up)
-      time.sleep(frequency)
-
-  def __convertTimeUnits(self, t):
-    if t / (24*3600) >= 1:
-      t_out = t / (24*3600)
-      t_unit = "days"
-    elif t / 3600 >= 1:
-      t_out = t / 3600
-      t_unit = "hours"
-    elif t / 60 >= 1:
-      t_out = t / 60
-      t_unit = "minutes"
-    else:
-      t_out = t
-      t_unit = "seconds"
-    return (t_out, t_unit)

Copied: mgmt/trunk/mint/python/mint/expire.py (from rev 3248, mgmt/trunk/mint/python/mint/dbexpire.py)
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py	                        (rev 0)
+++ mgmt/trunk/mint/python/mint/expire.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,58 @@
+import logging
+import mint
+import time
+from threading import Thread
+from mint.schema import *
+from sql import *
+from util import *
+
+log = logging.getLogger("mint.expire")
+
+class ExpireThread(MintDaemonThread):
+  def __init__(self, app):
+    super(ExpireThread, self).__init__(app)
+
+    self.keepCurrStats = False
+
+    self.ops = []
+    self.attrs = dict()
+
+  def init(self):
+    frequency = self.app.expireFrequency
+    threshold = self.app.expireThreshold
+
+    for cls in mint.schema.statsClasses:
+      self.ops.append(SqlExpire(eval(cls), self.keepCurrStats))
+    self.ops.append(SqlExpire(Job, self.keepCurrStats))
+
+    self.attrs["threshold"] = threshold
+
+    frequency_out, frequency_unit = self.__convertTimeUnits(frequency)
+    threshold_out, threshold_unit = self.__convertTimeUnits(threshold)
+    log.debug("Expiring database records older than %d %s, every %d %s" % \
+                (threshold_out, threshold_unit, frequency_out, frequency_unit))
+
+  def run(self):
+    frequency = self.app.expireFrequency
+
+    while True:
+      if self.stopRequested:
+        break
+      up = mint.update.ExpireUpdate(self.app.model)
+      self.app.updateThread.enqueue(up)
+      time.sleep(frequency)
+
+  def __convertTimeUnits(self, t):
+    if t / (24*3600) >= 1:
+      t_out = t / (24*3600)
+      t_unit = "days"
+    elif t / 3600 >= 1:
+      t_out = t / 3600
+      t_unit = "hours"
+    elif t / 60 >= 1:
+      t_out = t / 60
+      t_unit = "minutes"
+    else:
+      t_out = t
+      t_unit = "seconds"
+    return (t_out, t_unit)


Property changes on: mgmt/trunk/mint/python/mint/expire.py
___________________________________________________________________
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + native

Modified: mgmt/trunk/mint/python/mint/main.py
===================================================================
--- mgmt/trunk/mint/python/mint/main.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/main.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,30 +1,77 @@
 import sys
 import os
+import logging
 from parsley.config import Config, ConfigParameter
 from parsley.loggingex import enable_logging
 
-from mint import MintModel
+from database import MintDatabase
+from model import MintModel
+from update import UpdateThread
+from poll import PollThread
+from expire import ExpireThread
 
+log = logging.getLogger("mint.main")
+
 class Mint(object):
   def __init__(self, config):
     self.config = config
+    self.database = MintDatabase(self)
+    self.model = MintModel(self)
 
-    self.model = MintModel(self.config.data)
-    self.model.expireFrequency = self.config.expire_frequency
-    self.model.expireThreshold = self.config.expire_threshold
+    self.updateEnabled = True
+    self.updateThread = UpdateThread(self)
 
+    self.pollEnabled = False
+    self.pollThread = PollThread(self)
+
+    self.expireEnabled = True
+    self.expireFrequency = self.config.expire_frequency
+    self.expireThreshold = self.config.expire_threshold
+    self.expireThread = ExpireThread(self)
+
   def check(self):
+    self.database.check()
     self.model.check()
 
   def init(self):
+    self.database.init()
     self.model.init()
 
+    def state(cond):
+      return cond and "enabled" or "disabled"
+
+    log.info("Updates are %s", state(self.updateEnabled))
+    log.info("Polling is %s", state(self.pollEnabled))
+    log.info("Expiration is %s", state(self.expireEnabled))
+
+    self.updateThread.init()
+    self.pollThread.init()
+    self.expireThread.init()
+
   def start(self):
     self.model.start()
 
+    if self.updateEnabled:
+      self.updateThread.start()
+
+    if self.pollEnabled:
+      self.pollThread.start()
+
+    if self.expireEnabled:
+      self.expireThread.start()
+
   def stop(self):
     self.model.stop()
 
+    if self.updateEnabled:
+      self.updateThread.stop()
+
+    if self.pollEnabled:
+      self.pollThread.stop()
+
+    if self.expireEnabled:
+      self.expireThread.stop()
+
 class MintConfig(Config):
   def __init__(self):
     super(MintConfig, self).__init__()
@@ -35,6 +82,9 @@
     param = ConfigParameter(self, "data", str)
     param.default = "postgresql://mint@localhost/mint"
 
+    param = ConfigParameter(self, "qmf", str)
+    param.default = "amqp://localhost"
+
     param = ConfigParameter(self, "log-file", str)
     param.default = os.path.join(self.home, "log", "mint.log")
 
@@ -50,13 +100,15 @@
     param = ConfigParameter(self, "expire-threshold", int)
     param.default = 24 * 3600 # 1 day
 
-  def init(self, opts):
+  def init(self, opts=None):
     super(MintConfig, self).init()
 
     self.load_file(os.path.join(self.home, "etc", "mint.conf"))
     self.load_file(os.path.join(os.path.expanduser("~"), ".mint.conf"))
-    self.load_dict(opts)
 
+    if opts:
+      self.load_dict(opts)
+
     if self.debug:
       enable_logging("mint", "debug", sys.stderr)
     else:

Added: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py	                        (rev 0)
+++ mgmt/trunk/mint/python/mint/model.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,397 @@
+import logging
+import qmf.console
+import pickle
+import struct
+import sys
+import os
+import types
+import socket
+
+from qmf.console import ClassKey
+from qpid.datatypes import UUID
+from qpid.util import URL
+from sqlobject import *
+from threading import Lock, RLock
+from time import sleep
+from traceback import print_exc
+
+from mint import update
+from mint.cache import MintCache
+from mint.schema import *
+from util import *
+
+log = logging.getLogger("mint.model")
+
+thisModule = __import__(__name__)
+
+for item in dir(mint.schema):
+  cls = getattr(mint.schema, item)
+
+  try:
+    if issubclass(cls, SQLObject) and cls is not SQLObject:
+      setattr(thisModule, item, cls)
+  except TypeError:
+    pass
+
+Broker.sqlmeta.addColumn(ForeignKey("BrokerRegistration",
+                                    cascade="null", default=None,
+                                    name="registration"))
+
+class Subject(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  name = StringCol(length=1000, default=None, unique=True, notNone=True)
+  password = StringCol(length=1000, default=None)
+  lastChallenged = TimestampCol(default=None)
+  lastLoggedIn = TimestampCol(default=None)
+  lastLoggedOut = TimestampCol(default=None)
+  roles = SQLRelatedJoin("Role", intermediateTable="subject_role_mapping",
+                         createRelatedTable=False)
+
+  def getByName(cls, name):
+    try:
+      return Subject.selectBy(name=name)[0]
+    except IndexError:
+      pass
+
+  getByName = classmethod(getByName)
+
+class Role(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  name = StringCol(length=1000, default=None, unique=True, notNone=True)
+  subjects = SQLRelatedJoin("Subject",
+                            intermediateTable="subject_role_mapping",
+                            createRelatedTable=False)
+
+  def getByName(cls, name):
+    try:
+      return Role.selectBy(name=name)[0]
+    except IndexError:
+      pass
+
+  getByName = classmethod(getByName)
+
+class SubjectRoleMapping(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  subject = ForeignKey("Subject", notNull=True, cascade=True)
+  role = ForeignKey("Role", notNull=True, cascade=True)
+  unique = DatabaseIndex(subject, role, unique=True)
+
+class ObjectNotFound(Exception):
+  pass
+
+class MintInfo(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  version = StringCol(length=1000, default="0.1", notNone=True)
+
+class CollectorRegistration(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  name = StringCol(length=1000, default=None)
+  collectorId = StringCol(length=1000, default=None)
+
+class BrokerRegistration(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  name = StringCol(length=1000, default=None, unique=True, notNone=True)
+  url = StringCol(length=1000, default=None)
+  broker = ForeignKey("Broker", cascade="null", default=None)
+  cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
+  profile = ForeignKey("BrokerProfile", cascade="null", default=None)
+
+  url_unique = DatabaseIndex(url, unique=True)
+
+  def getBrokerId(self):
+    return self.mintBroker.qmfId
+
+  def getDefaultVhost(self):
+    if self.broker:
+      try:
+        return Vhost.selectBy(broker=self.broker, name="/")[0]
+      except IndexError:
+        return None
+
+class BrokerGroup(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  name = StringCol(length=1000, default=None, unique=True, notNone=True)
+  brokers = SQLRelatedJoin("Broker",
+                           intermediateTable="broker_group_mapping",
+                           createRelatedTable=False)
+
+class BrokerGroupMapping(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  broker = ForeignKey("Broker", notNull=True, cascade=True)
+  brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+  unique = DatabaseIndex(broker, brokerGroup, unique=True)
+
+class BrokerCluster(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  name = StringCol(length=1000, default=None)
+  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
+
+class BrokerProfile(SQLObject):
+  class sqlmeta:
+    lazyUpdate = True
+
+  name = StringCol(length=1000, default=None)
+  brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
+  properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
+
+class MintBroker(object):
+  def __init__(self, url, qmfBroker):
+    self.url = url
+    self.qmfBroker = qmfBroker
+
+    self.qmfId = None
+    self.databaseId = None
+
+    self.objectDatabaseIds = MintCache() # database ids by qmf object id
+    self.orphans = dict() # updates by qmf object id
+
+    self.connected = False
+
+  def __repr__(self):
+    return "%s(%s)" % (self.__class__.__name__, self.url)
+
+  def getAmqpSession(self):
+    return self.qmfBroker.getAmqpSession()
+
+  def getAmqpSessionId(self):
+    return self.qmfBroker.getSessionId()
+
+  def getFullUrl(self):
+    return self.qmfBroker.getFullUrl()
+
+class MintModel(qmf.console.Console):
+  staticInstance = None
+
+  def __init__(self, app):
+    assert MintModel.staticInstance is None
+    MintModel.staticInstance = self
+
+    self.app = app
+
+    # Lookup tables used for recovering MintBroker objects, which have
+    # mint-specific accounting and wrap a qmf broker object
+
+    self.mintBrokersByQmfBroker = dict()
+    self.mintBrokersById = dict()
+    self.mintBrokersByUrl = dict()
+
+    # Agent heartbeats
+    # (broker bank, agent bank) => latest heartbeat timestamp
+
+    self.heartbeatsByAgentId = dict()
+
+    self.__lock = RLock()
+
+    self.dbConn = None
+    self.qmfSession = None
+
+    self.outstandingMethodCalls = dict()
+
+  def lock(self):
+    self.__lock.acquire()
+
+  def unlock(self):
+    self.__lock.release()
+
+  def check(self):
+    pass
+
+  def init(self):
+    assert self.qmfSession is None
+
+    self.qmfSession = qmf.console.Session \
+        (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+
+  def start(self):
+    pass
+
+  def stop(self):
+    for mbroker in self.mintBrokersById.values():
+      self.delBroker(mbroker)
+
+  def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
+    self.lock()
+    try:
+      broker = self.mintBrokersById[brokerId]
+    finally:
+      self.unlock()
+
+    seq = self.qmfSession._sendMethodRequest \
+        (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
+
+    if seq is not None:
+      self.lock()
+      try:
+        self.outstandingMethodCalls[seq] = callback
+      finally:
+        self.unlock()
+    return seq
+
+  def addBroker(self, url):
+    log.info("Adding qmf broker at %s", url)
+
+    self.lock()
+    try:
+      qbroker = self.qmfSession.addBroker(url)
+      mbroker = MintBroker(url, qbroker)
+
+      # Can't add the by-id mapping here, as the id is not set yet;
+      # instead we add it when brokerConnected is called
+
+      self.mintBrokersByQmfBroker[qbroker] = mbroker
+      self.mintBrokersByUrl[url] = mbroker
+
+      return mbroker
+    finally:
+      self.unlock()
+
+  def delBroker(self, mbroker):
+    assert isinstance(mbroker, MintBroker)
+
+    log.info("Removing qmf broker at %s", mbroker.url)
+
+    self.lock()
+    try:
+      self.qmfSession.delBroker(mbroker.qmfBroker)
+
+      del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
+      del self.mintBrokersByUrl[mbroker.url]
+
+      if mbroker.qmfId:
+        del self.mintBrokersById[mbroker.qmfId]
+    finally:
+      self.unlock()
+
+  def brokerConnected(self, qbroker):
+    """ Invoked when a connection is established to a broker """
+    self.lock()
+    try:
+      mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+      assert mbroker.connected is False
+
+      mbroker.connected = True
+    finally:
+      self.unlock()
+
+  def brokerInfo(self, qbroker):
+    self.lock()
+    try:
+      id = str(qbroker.getBrokerId())
+      mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+      mbroker.qmfId = id
+      self.mintBrokersById[id] = mbroker
+    finally:
+      self.unlock()
+
+  def brokerDisconnected(self, qbroker):
+    """ Invoked when the connection to a broker is lost """
+    self.lock()
+    try:
+      mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+      assert mbroker.connected is True
+
+      mbroker.connected = False
+    finally:
+      self.unlock()
+
+  def getMintBrokerByQmfBroker(self, qbroker):
+    self.lock()
+    try:
+      return self.mintBrokersByQmfBroker[qbroker]
+    finally:
+      self.unlock()
+
+  def newPackage(self, name):
+    """ Invoked when a QMF package is discovered. """
+    pass
+
+  def newClass(self, kind, classKey):
+    """ Invoked when a new class is discovered.  Session.getSchema can be
+    used to obtain details about the class."""
+    pass
+
+  def newAgent(self, agent):
+    """ Invoked when a QMF agent is discovered. """
+    pass
+
+  def delAgent(self, agent):
+    """ Invoked when a QMF agent disconects. """
+    pass
+
+  def objectProps(self, broker, record):
+    """ Invoked when an object is updated. """
+
+    if not self.app.updateThread.isAlive():
+      return
+
+    mbroker = self.getMintBrokerByQmfBroker(broker)
+
+    up = update.PropertyUpdate(self, mbroker, record)
+
+    if record.getClassKey().getClassName() == "job":
+      up.priority = 1
+
+    self.app.updateThread.enqueue(up)
+
+  def objectStats(self, broker, record):
+    """ Invoked when an object is updated. """
+
+    if not self.app.updateThread.isAlive():
+      return
+
+    if record.getClassKey().getClassName() == "job":
+      return
+
+    mbroker = self.getMintBrokerByQmfBroker(broker)
+    up = update.StatisticUpdate(self, mbroker, record)
+
+    self.app.updateThread.enqueue(up)
+
+  def event(self, broker, event):
+    """ Invoked when an event is raised. """
+    pass
+
+  def heartbeat(self, agent, timestamp):
+    key = (agent.getBrokerBank(), agent.getAgentBank())
+    timestamp = timestamp / 1000000000
+    self.heartbeatsByAgentId[key] = datetime.fromtimestamp(timestamp)
+
+  def getLatestHeartbeat(self, scopeId):
+    scope = int(scopeId)
+    broker = (scope & 0x0000FFFFF0000000) >> 28
+    agent = scope & 0x000000000FFFFFFF
+    key = (broker, agent)
+
+    return self.heartbeatsByAgentId.get(key)
+
+  def methodResponse(self, broker, seq, response):
+    # XXX don't do this via the update thread?
+
+    if not self.updateThread.isAlive():
+      return
+
+    mbroker = self.getMintBrokerByQmfBroker(broker)
+    up = update.MethodUpdate(self, mbroker, seq, response)
+
+    self.app.updateThread.enqueue(up)

Added: mgmt/trunk/mint/python/mint/poll.py
===================================================================
--- mgmt/trunk/mint/python/mint/poll.py	                        (rev 0)
+++ mgmt/trunk/mint/python/mint/poll.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,46 @@
+import logging
+from time import sleep
+from psycopg2 import OperationalError
+from mint.model import BrokerRegistration
+
+from util import MintDaemonThread
+
+log = logging.getLogger("mint.poll")
+
+class PollThread(MintDaemonThread):
+  interval = 5
+
+  def run(self):
+    log.info("Polling for changes every %i seconds", self.interval)
+
+    while True:
+      try:
+        if self.stopRequested:
+          break
+
+        self.do_run()
+      except OperationalError, e:
+        log.exception(e)
+        if str(e).find("server closed the connection unexpectedly") >= 0:
+          sys.exit() # XXX This seems like the wrong thing to do
+      except Exception, e:
+        log.exception(e)
+
+      sleep(self.interval)
+
+  def do_run(self):
+    regUrls = set()
+
+    for reg in BrokerRegistration.select():
+      if reg.url not in self.app.model.mintBrokersByUrl:
+        try:
+          self.app.model.addBroker(reg.url)
+        except socket.error, e:
+          log.info("Can't connect to broker at %s: %s", reg.url, e)
+
+      regUrls.add(reg.url)
+
+    for mbroker in self.app.model.mintBrokersByQmfBroker.values():
+      if mbroker.url not in regUrls:
+        self.app.model.delBroker(mbroker)
+

Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/tools.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -89,6 +89,9 @@
 
     def do_run(self, opts, args):
         app = Mint(self.config)
+
+        app.pollEnabled = True
+
         app.check()
         app.init()
         app.start()
@@ -109,24 +112,17 @@
     def do_run(self, opts, args):
         app = Mint(self.config)
 
-        app.model.pollRegistrations = False
-
         app.check()
         app.init()
         app.start()
 
-        added = list()
-
         try:
             for arg in args[1:]:
-                added.append(app.model.addBroker(arg))
+                app.model.addBroker(arg)
 
             while True:
                 sleep(2)
         finally:
-            for broker in added:
-                app.model.delBroker(broker)
-
             app.stop()
 
 class MintBenchTool(BaseMintTool):
@@ -142,14 +138,10 @@
     def do_run(self, opts, args):
         app = Mint(self.config)
 
-        app.model.pollRegistrations = False
-
         app.check()
         app.init()
         app.start()
 
-        added = list()
-
         head = "%6s %6s %6s %6s %6s %6s %6s %6s %6s" % \
             ("enqs", "deqs", "depth", "drop", "defer",
              "prop", "stat", "meth", "exp")
@@ -158,7 +150,7 @@
         try:
             for arg in args[1:]:
                 try:
-                    added.append(app.model.addBroker(arg))
+                    app.model.addBroker(arg)
                 except socket.error, e:
                     print "Warning: Failed connecting to broker at '%s'" % arg
 
@@ -182,7 +174,7 @@
 
                     sleep(1)
 
-                    ut = app.model.updateThread
+                    ut = app.updateThread
 
                     enq = ut.enqueueCount
                     deq = ut.dequeueCount
@@ -228,9 +220,5 @@
         finally:
             #from threading import enumerate
             #for item in enumerate():
-            #    print item
 
-            for broker in added:
-                app.model.delBroker(broker)
-
             app.stop()

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/update.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -17,10 +17,14 @@
 
 log = logging.getLogger("mint.update")
 
-class ModelUpdateThread(MintDaemonThread):
-  def __init__(self, model):
-    super(ModelUpdateThread, self).__init__(model)
+class UpdateThread(MintDaemonThread):
+  """
+  Only the update thread writes to the database
+  """
 
+  def __init__(self, app):
+    super(UpdateThread, self).__init__(app)
+
     self.updates = UpdateQueue(slotCount=2)
 
     self.enqueueCount = 0
@@ -36,7 +40,7 @@
     self.conn = None
 
   def init(self):
-    self.conn = self.model.dbConn.getConnection()
+    self.conn = self.app.database.getConnection()
 
   def enqueue(self, update):
     try:
@@ -71,7 +75,7 @@
           continue
 
       self.process_update(update)
-        
+
   def process_update(self, update):
     try:
       update.process(self)
@@ -93,13 +97,13 @@
   def commit(self):
     self.conn.commit()
 
-    for broker in self.model.mintBrokersByQmfBroker.values():
+    for broker in self.app.model.mintBrokersByQmfBroker.values():
       broker.objectDatabaseIds.commit()
 
   def rollback(self):
     self.conn.rollback()
 
-    for broker in self.model.mintBrokersByQmfBroker.values():
+    for broker in self.app.model.mintBrokersByQmfBroker.values():
       broker.objectDatabaseIds.rollback()
 
 class ReferenceException(Exception):
@@ -214,10 +218,15 @@
 
     self.processTimestamp("qmfUpdateTime", timestamps[0], attrs)
 
-    if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - timedelta(seconds=thread.model.expireThread.threshold):
-      # drop updates for Jobs that are older then the expiration threshold, 
-      # since they would be deleted anyway in the next run of the db expiration thread
+    delta = timedelta(seconds=thread.app.expireThreshold)
+
+    if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - delta:
+      # drop updates for Jobs that are older then the expiration
+      # threshold, since they would be deleted anyway in the next run
+      # of the db expiration thread
+
       log.debug("Property update is older than expiration threshold; skipping it")
+
       thread.dropCount += 1
       return
 
@@ -384,18 +393,18 @@
 
     thread.methUpdateCount += 1
 
-class DBExpireUpdate(ModelUpdate):
+class ExpireUpdate(ModelUpdate):
   def __init__(self, model):
-    super(DBExpireUpdate, self).__init__(model, None, None)
+    super(ExpireUpdate, self).__init__(model, None, None)
 
   def process(self, thread):
     cursor = thread.cursor()
-    attrs = self.model.expireThread.attrs
+    attrs = thread.app.expireThread.attrs
     total = 0
 
     thread.commit()
 
-    for op in self.model.expireThread.ops:
+    for op in thread.app.expireThread.ops:
       log.debug("Running expire op %s", op)
 
       count = op.execute(cursor, attrs)

Modified: mgmt/trunk/mint/python/mint/util.py
===================================================================
--- mgmt/trunk/mint/python/mint/util.py	2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/util.py	2009-04-02 19:49:59 UTC (rev 3253)
@@ -5,15 +5,18 @@
 log = logging.getLogger("mint")
 
 class MintDaemonThread(Thread):
-  def __init__(self, model):
+  def __init__(self, app):
     super(MintDaemonThread, self).__init__()
 
-    self.model = model
+    self.app = app
 
     self.stopRequested = False
 
     self.setDaemon(True)
 
+  def init(self):
+    pass
+
   def stop(self):
     assert self.stopRequested is False
     self.stopRequested = True




More information about the rhmessaging-commits mailing list