rhmessaging commits: r3253 - in mgmt/trunk: mint/instance/etc and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-04-02 15:49:59 -0400 (Thu, 02 Apr 2009)
New Revision: 3253
Added:
mgmt/trunk/mint/python/mint/database.py
mgmt/trunk/mint/python/mint/expire.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/poll.py
Removed:
mgmt/trunk/mint/python/mint/dbexpire.py
Modified:
mgmt/trunk/cumin/python/cumin/broker.py
mgmt/trunk/cumin/python/cumin/limits.py
mgmt/trunk/cumin/python/cumin/managementserver.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/tools.py
mgmt/trunk/mint/instance/etc/mint.conf
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/main.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/python/mint/util.py
Log:
Don't enable debug by default for the devel mint instance
Modified: mgmt/trunk/cumin/python/cumin/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/broker.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/broker.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -79,7 +79,7 @@
def render_content(self, session, data):
scopeId = data["qmf_scope_id"]
- dt = self.app.model.data.getLatestHeartbeat(scopeId)
+ dt = self.app.model.mint.model.getLatestHeartbeat(scopeId)
if dt is None:
return fmt_none()
@@ -202,7 +202,7 @@
try:
id = broker.qmfBrokerId
- mbroker = self.app.model.data.mintBrokersById[id]
+ mbroker = self.app.model.mint.model.mintBrokersById[id]
connected = mbroker.connected
except KeyError:
pass
Modified: mgmt/trunk/cumin/python/cumin/limits.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/limits.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/limits.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -52,7 +52,7 @@
#TODO: this probably shouldn't be called until after the invoke completes
def completion():
pass
- negotiator.Reconfig(self.app.model.data, completion)
+ negotiator.Reconfig(self.app.model.mint.model, completion)
def get_raw_limits(self, session, negotiator):
action = self.app.model.negotiator.GetLimits
Modified: mgmt/trunk/cumin/python/cumin/managementserver.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/managementserver.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/managementserver.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -41,7 +41,7 @@
url = data["url"]
try:
- server = self.app.model.data.mintBrokersByUrl[url]
+ server = self.app.model.mint.model.mintBrokersByUrl[url]
if server.connected:
html = "Connected"
@@ -274,7 +274,7 @@
url = data["url"]
try:
- server = self.app.model.data.mintBrokersByUrl[url]
+ server = self.app.model.mint.model.mintBrokersByUrl[url]
if server.connected:
html = "Connected"
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/model.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,31 +1,37 @@
+import logging
from datetime import datetime, timedelta
+from mint import Mint, MintConfig
+from struct import unpack, calcsize
+from types import *
+from wooly import *
+from wooly.parameters import *
+from wooly.widgets import *
+
from formats import *
from job import *
from parameters import *
from pool import PoolSlotSet, PoolJobStats
from slot import SlotStatSet
-from struct import unpack, calcsize
from system import SystemSlotSet
from time import *
-from types import *
from util import *
-from wooly import *
-from wooly.parameters import *
-from wooly.widgets import *
-from mint.schema import *
-import logging
-
log = logging.getLogger("cumin.model")
class CuminModel(object):
def __init__(self, app, data_uri):
self.app = app
- self.data = MintModel(data_uri)
- self.data.updateObjects = False
- self.data.expireObjects = False
+ opts = {"data": data_uri, "qmf": None}
+ config = MintConfig()
+ config.init(opts)
+
+ self.mint = Mint(config)
+ self.mint.updateEnabled = False
+ self.mint.pollEnabled = True
+ self.mint.expireEnabled = False
+
self.classes = list()
self.invocations = set()
@@ -68,10 +74,10 @@
CuminSlot(self)
def check(self):
- self.data.check()
+ self.mint.check()
def init(self):
- self.data.init()
+ self.mint.init()
self.frame = self.app.main_page.main
@@ -79,10 +85,10 @@
cls.init()
def start(self):
- self.data.start()
+ self.mint.start()
def stop(self):
- self.data.stop()
+ self.mint.stop()
def add_class(self, cls):
self.classes.append(cls)
@@ -306,7 +312,7 @@
def get_session_by_object(self, object):
assert object
- broker = self.model.data.mintBrokersById[object.qmfBrokerId]
+ broker = self.mint.model.mintBrokersById[object.qmfBrokerId]
return broker.getAmqpSession()
@@ -737,7 +743,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Start(self.model.data, completion, args["subsystem"])
+ master.Start(self.mint.model, completion, args["subsystem"])
class Stop(CuminAction):
def do_invoke(self, object, args, completion):
@@ -746,7 +752,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- #master.Stop(self.model.data, completion, args["subsystem"])
+ #master.Stop(self.mint.model, completion, args["subsystem"])
class CuminBroker(RemoteClass):
def __init__(self, model):
@@ -816,7 +822,8 @@
connected = False
if broker:
try:
- mbroker = self.model.data.mintBrokersById[broker.qmfBrokerId]
+ mbroker = self.mint.model.mintBrokersById \
+ [broker.qmfBrokerId]
connected = mbroker.connected
except KeyError:
pass
@@ -897,7 +904,7 @@
else:
authMechanism = "PLAIN"
- broker.connect(self.model.data, completion, host, port, durable,
+ broker.connect(self.mint.model, completion, host, port, durable,
authMechanism, username, password, transport)
class AddQueue(CuminAction):
@@ -1131,7 +1138,7 @@
return "Purge"
def do_invoke(self, queue, args, completion):
- queue.purge(self.model.data, completion, args["request"])
+ queue.purge(self.mint.model, completion, args["request"])
class Remove(CuminAction):
def show(self, session, queue):
@@ -1174,7 +1181,9 @@
def do_invoke(self, queue, args, completion):
broker = queue.vhost.broker
- broker.queueMoveMessages(self.model.data, completion, args["srcQueue"], args["destQueue"], args["qty"])
+ broker.queueMoveMessages(self.mint.model, completion,
+ args["srcQueue"], args["destQueue"],
+ args["qty"])
class CuminExchange(RemoteClass):
def __init__(self, model):
@@ -1351,7 +1360,7 @@
return frame.show_remove(session)
def do_invoke(self, bridge, args, completion):
- bridge.close(self.model.data, completion)
+ bridge.close(self.mint.model, completion)
class CuminConnection(RemoteClass):
def __init__(self, model):
@@ -1421,7 +1430,7 @@
def do_invoke(self, conn, args, completion):
session_ids = set()
- for broker in self.model.data.mintBrokersByQmfBroker:
+ for broker in self.mint.model.mintBrokersByQmfBroker:
session_ids.add(broker.getSessionId())
for sess in conn.sessions:
@@ -1430,7 +1439,7 @@
("Cannot close management connection %s" % \
conn.address)
- conn.close(self.model.data, completion)
+ conn.close(self.mint.model, completion)
class CuminSession(RemoteClass):
def __init__(self, model):
@@ -1472,38 +1481,38 @@
return "Close"
def do_invoke(self, sess, args, completion):
- for broker in self.model.data.mintBrokersByQmfBroker:
+ for broker in self.mint.model.mintBrokersByQmfBroker:
if sess.name == broker.getSessionId():
raise Exception \
("Cannot close management session %s" % sess.name)
- sess.close(self.model.data, completion)
+ sess.close(self.mint.model, completion)
class Detach(CuminAction):
def get_title(self, session):
return "Detach"
def do_invoke(self, sess, args, completion):
- for broker in self.model.data.mintBrokersByQmfBroker:
+ for broker in self.mint.model.mintBrokersByQmfBroker:
if sess.name == broker.getSessionId():
raise Exception \
("Cannot detach management session %s" % sess.name)
- sess.detach(self.model.data, completion)
+ sess.detach(self.mint.model, completion)
class ResetLifespan(CuminAction):
def get_title(self, session):
return "Reset Lifespan"
def do_invoke(self, object, args, completion):
- object.resetLifespan(self.model.data, completion)
+ object.resetLifespan(self.mint.model, completion)
class SolicitAck(CuminAction):
def get_title(self, session):
return "Solicit Acknowledgment"
def do_invoke(self, object, args, completion):
- object.solicitAck(self.model.data, completion)
+ object.solicitAck(self.mint.model, completion)
class CuminLink(RemoteClass):
def __init__(self, model):
@@ -1577,7 +1586,7 @@
excludes = args["excludes"]
sync = args["sync"]
- link.bridge(self.model.data, completion,
+ link.bridge(self.mint.model, completion,
durable, src, dest, key,
tag, excludes, False, False, dynamic, sync)
@@ -1590,7 +1599,7 @@
return "Close"
def do_invoke(self, link, args, completion):
- link.close(self.model.data, completion)
+ link.close(self.mint.model, completion)
class CuminBrokerStoreModule(RemoteClass):
def __init__(self, model):
@@ -2110,8 +2119,8 @@
def do_invoke(self, limit, negotiator, completion):
Name = limit.id
Max = limit.max
- negotiator.SetLimit(self.model.data, completion, Name, Max)
- #negotiator.SetLimit(self.model.data, completion, Name, str(Max))
+ negotiator.SetLimit(self.mint.model, completion, Name, Max)
+ #negotiator.SetLimit(self.mint.model, completion, Name, str(Max))
class CuminJobGroup(CuminClass):
def __init__(self, model):
@@ -2418,7 +2427,7 @@
return self.got_data
try:
- job.GetAd(self.model.data, completion, None)
+ job.GetAd(self.mint.model, completion, None)
except Exception, e:
return self.job_ads
@@ -2452,7 +2461,7 @@
try:
data = None
- job.Fetch(self.model.data, completion, file, start, end, data)
+ job.Fetch(self.mint.model, completion, file, start, end, data)
# wait for up to 20 seconds for completion to be called
wait(predicate, timeout=20)
if not self.got_data:
@@ -2475,7 +2484,7 @@
return "Hold"
def do_invoke(self, job, reason, completion):
- job.Hold(self.model.data, completion, reason)
+ job.Hold(self.mint.model, completion, reason)
def get_enabled(self, session, job):
is_held = JobStatusInfo.get_status_int("Held") == job.JobStatus
@@ -2492,7 +2501,7 @@
return "Release"
def do_invoke(self, job, reason, completion):
- job.Release(self.model.data, completion, reason)
+ job.Release(self.mint.model, completion, reason)
def get_enabled(self, session, job):
is_held = JobStatusInfo.get_status_int("Held") == job.JobStatus
@@ -2509,7 +2518,7 @@
return "Remove"
def do_invoke(self, job, reason, completion):
- job.Remove(self.model.data, completion, reason)
+ job.Remove(self.mint.model, completion, reason)
def get_enabled(self, session, job):
is_deleted = job.qmfDeleteTime is not None
@@ -2525,7 +2534,7 @@
def do_invoke(self, job, args, completion):
Name = args[0]
Value = args[1]
- job.SetAttribute(self.model.data, completion, Name, str(Value))
+ job.SetAttribute(self.mint.model, completion, Name, str(Value))
class GetStartedAction(CuminAction):
def get_xml_response(self, session, object, *args):
@@ -2808,7 +2817,7 @@
return self.got_data
try:
- negotiator.GetLimits(self.model.data, completion, None)
+ negotiator.GetLimits(self.mint.model, completion, None)
except Exception, e:
return self.lim
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -155,7 +155,15 @@
def init(self):
super(CuminAdminTool, self).init()
- self.database = MintDatabase(self.config.data)
+ config = MintConfig()
+ config.init({"data": self.config.data})
+
+ app = Mint(config)
+ app.updateEnabled = False
+ app.pollEnabled = False
+ app.expireEnabled = False
+
+ self.database = MintDatabase(app)
self.database.check()
self.database.init()
@@ -189,8 +197,6 @@
try:
opts, args = command.parse(remaining)
-
- print opts, args, remaining
except CommandException, e:
print "Error: %s" % e.message
e.command.print_help()
Modified: mgmt/trunk/mint/instance/etc/mint.conf
===================================================================
--- mgmt/trunk/mint/instance/etc/mint.conf 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/instance/etc/mint.conf 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,3 +1,3 @@
[main]
data: postgresql://cumin@localhost/cumin
-debug: True
+#debug: True
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/__init__.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,615 +1,2 @@
-import logging
-import qmf.console
-import pickle
-import struct
-import sys
-import os
-import types
-import socket
-
-from parsley.config import Config, ConfigParameter
-from psycopg2 import OperationalError
-from qmf.console import ClassKey
-from qpid.datatypes import UUID
-from qpid.util import URL
-from sqlobject import *
-from threading import Lock, RLock
-from time import sleep
-from traceback import print_exc
-
-from mint import update
-from mint.cache import MintCache
-from mint.dbexpire import DBExpireThread
-from mint.schema import *
-from util import *
-
-log = logging.getLogger("mint")
-
-thisModule = __import__(__name__)
-
-for item in dir(schema):
- cls = getattr(schema, item)
-
- try:
- if issubclass(cls, SQLObject) and cls is not SQLObject:
- setattr(thisModule, item, cls)
- except TypeError:
- pass
-
-Broker.sqlmeta.addColumn(ForeignKey("BrokerRegistration",
- cascade="null", default=None,
- name="registration"))
-
-class MintDatabase(object):
- def __init__(self, uri):
- self.uri = uri
-
- def getConnection(self):
- return connectionForURI(self.uri).getConnection()
-
- def check(self):
- self.checkConnection()
-
- def init(self):
- sqlhub.processConnection = connectionForURI(self.uri)
-
- def checkConnection(self):
- conn = self.getConnection()
-
- try:
- cursor = conn.cursor()
- cursor.execute("select now()")
- finally:
- conn.close()
-
- def checkSchema(self):
- pass
-
- def dropSchema(self):
- conn = self.getConnection()
- try:
- cursor = conn.cursor()
-
- cursor.execute("drop schema public cascade")
-
- conn.commit()
- finally:
- conn.close()
-
- def __splitSQLStatements(self, text):
- result = list()
- unmatchedQuote = False
- tmpStmt = ""
-
- for stmt in text.split(";"):
- stmt = stmt.rstrip()
- quotePos = stmt.find("'")
- while quotePos > 0:
- quotePos += 1
- if quotePos < len(stmt):
- if stmt[quotePos] != "'":
- unmatchedQuote = not unmatchedQuote
- else:
- # ignore 2 single quotes
- quotePos += 1
- quotePos = stmt.find("'", quotePos)
-
- if len(stmt.lstrip()) > 0:
- tmpStmt += stmt + ";"
- if not unmatchedQuote:
- # single quote has been matched/closed, generate statement
- result.append(tmpStmt.lstrip())
- tmpStmt = ""
-
- if unmatchedQuote:
- result.append(tmpStmt.lstrip())
- return result
-
- def createSchema(self, file_paths):
- conn = self.getConnection()
-
- scripts = list()
-
- for path in file_paths:
- file = open(path, "r")
- try:
- scripts.append((path, file.read()))
- finally:
- file.close()
-
- try:
- cursor = conn.cursor()
-
- try:
- cursor.execute("create schema public");
- except:
- conn.rollback()
- pass
-
- for path, text in scripts:
- stmts = self.__splitSQLStatements(text)
- count = 0
-
- for stmt in stmts:
- stmt = stmt.strip()
-
- if stmt:
- try:
- cursor.execute(stmt)
- except Exception, e:
- print "Failed executing statement:"
- print stmt
-
- raise e
-
- count += 1
-
- print "Executed %i statements from file '%s'" % (count, path)
-
- conn.commit()
-
- info = MintInfo(version="0.1")
- info.sync()
-
- # Standard roles
-
- user = Role(name="user")
- user.sync()
-
- admin = Role(name="admin")
- admin.sync()
- finally:
- conn.close()
-
- def checkSchema(self):
- conn = self.getConnection()
-
- try:
- cursor = conn.cursor()
-
- try:
- cursor.execute("select version from mint_info");
- except Exception, e:
- print "No schema present"
- return
-
- for rec in cursor:
- print "OK (version %s)" % rec[0]
- return;
-
- print "No schema present"
- finally:
- conn.close()
-
-class Subject(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- password = StringCol(length=1000, default=None)
- lastChallenged = TimestampCol(default=None)
- lastLoggedIn = TimestampCol(default=None)
- lastLoggedOut = TimestampCol(default=None)
- roles = SQLRelatedJoin("Role", intermediateTable="subject_role_mapping",
- createRelatedTable=False)
-
- def getByName(cls, name):
- try:
- return Subject.selectBy(name=name)[0]
- except IndexError:
- pass
-
- getByName = classmethod(getByName)
-
-class Role(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- subjects = SQLRelatedJoin("Subject",
- intermediateTable="subject_role_mapping",
- createRelatedTable=False)
-
- def getByName(cls, name):
- try:
- return Role.selectBy(name=name)[0]
- except IndexError:
- pass
-
- getByName = classmethod(getByName)
-
-class SubjectRoleMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- subject = ForeignKey("Subject", notNull=True, cascade=True)
- role = ForeignKey("Role", notNull=True, cascade=True)
- unique = DatabaseIndex(subject, role, unique=True)
-
-class ObjectNotFound(Exception):
- pass
-
-class MintInfo(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- version = StringCol(length=1000, default="0.1", notNone=True)
-
-class CollectorRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None)
- collectorId = StringCol(length=1000, default=None)
-
-class BrokerRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- url = StringCol(length=1000, default=None)
- broker = ForeignKey("Broker", cascade="null", default=None)
- cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
- profile = ForeignKey("BrokerProfile", cascade="null", default=None)
-
- url_unique = DatabaseIndex(url, unique=True)
-
- def getBrokerId(self):
- return self.mintBroker.qmfId
-
- def getDefaultVhost(self):
- if self.broker:
- try:
- return Vhost.selectBy(broker=self.broker, name="/")[0]
- except IndexError:
- return None
-
-class BrokerGroup(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- brokers = SQLRelatedJoin("Broker",
- intermediateTable="broker_group_mapping",
- createRelatedTable=False)
-
-class BrokerGroupMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- broker = ForeignKey("Broker", notNull=True, cascade=True)
- brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
- unique = DatabaseIndex(broker, brokerGroup, unique=True)
-
-class BrokerCluster(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
-
-class BrokerProfile(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
- properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
-
-class MintBroker(object):
- def __init__(self, url, qmfBroker):
- self.url = url
- self.qmfBroker = qmfBroker
-
- self.qmfId = None
- self.databaseId = None
-
- self.objectDatabaseIds = MintCache() # database ids by qmf object id
- self.orphans = dict() # updates by qmf object id
-
- self.connected = False
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, self.url)
-
- def getAmqpSession(self):
- return self.qmfBroker.getAmqpSession()
-
- def getAmqpSessionId(self):
- return self.qmfBroker.getSessionId()
-
- def getFullUrl(self):
- return self.qmfBroker.getFullUrl()
-
-class MintModel(qmf.console.Console):
- staticInstance = None
-
- def __init__(self, dataUri, debug=False):
- self.dataUri = dataUri
- self.debug = debug
-
- self.updateObjects = True
-
- self.pollRegistrations = True
-
- self.expireObjects = True
- self.expireFrequency = 600
- self.expireThreshold = 24 * 3600
-
- assert MintModel.staticInstance is None
- MintModel.staticInstance = self
-
- # Lookup tables used for recovering MintBroker objects, which have
- # mint-specific accounting and wrap a qmf broker object
-
- self.mintBrokersByQmfBroker = dict()
- self.mintBrokersById = dict()
- self.mintBrokersByUrl = dict()
-
- # Agent heartbeats
- # (broker bank, agent bank) => latest heartbeat timestamp
-
- self.heartbeatsByAgentId = dict()
-
- self.__lock = RLock()
-
- self.dbConn = None
- self.qmfSession = None
-
- self.updateThread = update.ModelUpdateThread(self)
- self.expireThread = dbexpire.DBExpireThread(self)
- self.registrationThread = RegistrationThread(self)
-
- self.outstandingMethodCalls = dict()
-
- def lock(self):
- self.__lock.acquire()
-
- def unlock(self):
- self.__lock.release()
-
- def check(self):
- try:
- connectionForURI(self.dataUri)
- except Exception, e:
- if hasattr(e, "message") and e.message.find("does not exist"):
- print "Database not found; run cumin-database-init"
- raise e
-
- def init(self):
- log.info("Object updates are %s",
- self.updateObjects and "enabled" or "disabled")
-
- log.info("QMF server polling is %s",
- self.pollRegistrations and "enabled" or "disabled")
-
- log.info("Object expiration is %s",
- self.expireObjects and "enabled" or "disabled")
-
- sqlhub.processConnection = self.dbConn = connectionForURI(self.dataUri)
-
- assert self.qmfSession is None
-
- self.qmfSession = qmf.console.Session \
- (self, manageConnections=True, rcvObjects=self.updateObjects)
-
- self.updateThread.init()
- self.expireThread.init()
-
- def start(self):
- self.updateThread.start()
-
- if self.expireObjects:
- self.expireThread.start()
-
- if self.pollRegistrations:
- self.registrationThread.start()
-
- def stop(self):
- self.updateThread.stop()
-
- if self.expireObjects:
- self.expireThread.stop()
-
- if self.pollRegistrations:
- self.registrationThread.stop()
-
- def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
- self.lock()
- try:
- broker = self.mintBrokersById[brokerId]
- finally:
- self.unlock()
-
- seq = self.qmfSession._sendMethodRequest \
- (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
-
- if seq is not None:
- self.lock()
- try:
- self.outstandingMethodCalls[seq] = callback
- finally:
- self.unlock()
- return seq
-
- def addBroker(self, url):
- log.info("Adding qmf broker at %s", url)
-
- self.lock()
- try:
- qbroker = self.qmfSession.addBroker(url)
- mbroker = MintBroker(url, qbroker)
-
- # Can't add the by-id mapping here, as the id is not set yet;
- # instead we add it when brokerConnected is called
-
- self.mintBrokersByQmfBroker[qbroker] = mbroker
- self.mintBrokersByUrl[url] = mbroker
-
- return mbroker
- finally:
- self.unlock()
-
- def delBroker(self, mbroker):
- assert isinstance(mbroker, MintBroker)
-
- log.info("Removing qmf broker at %s", mbroker.url)
-
- self.lock()
- try:
- self.qmfSession.delBroker(mbroker.qmfBroker)
-
- del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
- del self.mintBrokersByUrl[mbroker.url]
-
- if mbroker.qmfId:
- del self.mintBrokersById[mbroker.qmfId]
- finally:
- self.unlock()
-
- def brokerConnected(self, qbroker):
- """ Invoked when a connection is established to a broker """
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- assert mbroker.connected is False
-
- mbroker.connected = True
- finally:
- self.unlock()
-
- def brokerInfo(self, qbroker):
- self.lock()
- try:
- id = str(qbroker.getBrokerId())
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- mbroker.qmfId = id
- self.mintBrokersById[id] = mbroker
- finally:
- self.unlock()
-
- def brokerDisconnected(self, qbroker):
- """ Invoked when the connection to a broker is lost """
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- assert mbroker.connected is True
-
- mbroker.connected = False
- finally:
- self.unlock()
-
- def getMintBrokerByQmfBroker(self, qbroker):
- self.lock()
- try:
- return self.mintBrokersByQmfBroker[qbroker]
- finally:
- self.unlock()
-
- def newPackage(self, name):
- """ Invoked when a QMF package is discovered. """
- pass
-
- def newClass(self, kind, classKey):
- """ Invoked when a new class is discovered. Session.getSchema can be
- used to obtain details about the class."""
- pass
-
- def newAgent(self, agent):
- """ Invoked when a QMF agent is discovered. """
- pass
-
- def delAgent(self, agent):
- """ Invoked when a QMF agent disconects. """
- pass
-
- def objectProps(self, broker, record):
- """ Invoked when an object is updated. """
-
- if not self.updateObjects:
- return
-
- mbroker = self.getMintBrokerByQmfBroker(broker)
-
- up = update.PropertyUpdate(self, mbroker, record)
-
- if record.getClassKey().getClassName() == "job":
- up.priority = 1
-
- self.updateThread.enqueue(up)
-
- def objectStats(self, broker, record):
- """ Invoked when an object is updated. """
-
- if not self.updateObjects:
- return
-
- if record.getClassKey().getClassName() == "job":
- return
-
- mbroker = self.getMintBrokerByQmfBroker(broker)
- up = update.StatisticUpdate(self, mbroker, record)
-
- self.updateThread.enqueue(up)
-
- def event(self, broker, event):
- """ Invoked when an event is raised. """
- pass
-
- def heartbeat(self, agent, timestamp):
- key = (agent.getBrokerBank(), agent.getAgentBank())
- timestamp = timestamp / 1000000000
- self.heartbeatsByAgentId[key] = datetime.fromtimestamp(timestamp)
-
- def getLatestHeartbeat(self, scopeId):
- scope = int(scopeId)
- broker = (scope & 0x0000FFFFF0000000) >> 28
- agent = scope & 0x000000000FFFFFFF
- key = (broker, agent)
-
- return self.heartbeatsByAgentId.get(key)
-
- def methodResponse(self, broker, seq, response):
- mbroker = self.getMintBrokerByQmfBroker(broker)
- up = update.MethodUpdate(self, mbroker, seq, response)
-
- self.updateThread.enqueue(up)
-
-class RegistrationThread(MintDaemonThread):
- def run(self):
- log.info("Polling for registration changes every 5 seconds")
-
- while True:
- try:
- if self.stopRequested:
- break
-
- self.do_run()
- except OperationalError, e:
- log.exception(e)
- if str(e).find("server closed the connection unexpectedly") >= 0:
- sys.exit()
- except Exception, e:
- log.exception(e)
-
- def do_run(self):
- regUrls = set()
-
- for reg in BrokerRegistration.select():
- if reg.url not in self.model.mintBrokersByUrl:
- try:
- self.model.addBroker(reg.url)
- except socket.error, e:
- log.info("Can't connect to broker at %s: %s", reg.url, e)
- pass
-
- regUrls.add(reg.url)
-
- for mbroker in self.model.mintBrokersByQmfBroker.values():
- if mbroker.url not in regUrls:
- self.model.delBroker(mbroker)
-
- sleep(5)
+from main import *
+from model import *
Added: mgmt/trunk/mint/python/mint/database.py
===================================================================
--- mgmt/trunk/mint/python/mint/database.py (rev 0)
+++ mgmt/trunk/mint/python/mint/database.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,145 @@
+from sqlobject import connectionForURI, sqlhub
+
+from model import MintInfo, Role
+
+class MintDatabase(object):
+ def __init__(self, app):
+ self.app = app
+
+ def getConnection(self):
+ return connectionForURI(self.app.config.data).getConnection()
+
+ def check(self):
+ self.checkConnection()
+
+ def init(self):
+ sqlhub.processConnection = connectionForURI(self.app.config.data)
+
+ def checkConnection(self):
+ conn = self.getConnection()
+
+ try:
+ cursor = conn.cursor()
+ cursor.execute("select now()")
+ finally:
+ conn.close()
+
+ def checkSchema(self):
+ pass
+
+ def dropSchema(self):
+ conn = self.getConnection()
+
+ try:
+ cursor = conn.cursor()
+
+ cursor.execute("drop schema public cascade")
+
+ conn.commit()
+ finally:
+ conn.close()
+
+ def __splitSQLStatements(self, text):
+ result = list()
+ unmatchedQuote = False
+ tmpStmt = ""
+
+ for stmt in text.split(";"):
+ stmt = stmt.rstrip()
+ quotePos = stmt.find("'")
+ while quotePos > 0:
+ quotePos += 1
+ if quotePos < len(stmt):
+ if stmt[quotePos] != "'":
+ unmatchedQuote = not unmatchedQuote
+ else:
+ # ignore 2 single quotes
+ quotePos += 1
+ quotePos = stmt.find("'", quotePos)
+
+ if len(stmt.lstrip()) > 0:
+ tmpStmt += stmt + ";"
+ if not unmatchedQuote:
+ # single quote has been matched/closed, generate statement
+ result.append(tmpStmt.lstrip())
+ tmpStmt = ""
+
+ if unmatchedQuote:
+ result.append(tmpStmt.lstrip())
+ return result
+
+ def createSchema(self, file_paths):
+ conn = self.getConnection()
+
+ scripts = list()
+
+ for path in file_paths:
+ file = open(path, "r")
+ try:
+ scripts.append((path, file.read()))
+ finally:
+ file.close()
+
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("create schema public");
+ except:
+ conn.rollback()
+ pass
+
+ for path, text in scripts:
+ stmts = self.__splitSQLStatements(text)
+ count = 0
+
+ for stmt in stmts:
+ stmt = stmt.strip()
+
+ if stmt:
+ try:
+ cursor.execute(stmt)
+ except Exception, e:
+ print "Failed executing statement:"
+ print stmt
+
+ raise e
+
+ count += 1
+
+ print "Executed %i statements from file '%s'" % (count, path)
+
+ conn.commit()
+
+ info = MintInfo(version="0.1")
+ info.sync()
+
+ # Standard roles
+
+ user = Role(name="user")
+ user.sync()
+
+ admin = Role(name="admin")
+ admin.sync()
+ finally:
+ conn.close()
+
+ def checkSchema(self):
+ conn = self.getConnection()
+
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("select version from mint_info");
+ except Exception, e:
+ print "No schema present"
+ return
+
+ for rec in cursor:
+ print "OK (version %s)" % rec[0]
+ return;
+
+ print "No schema present"
+ finally:
+ conn.close()
Deleted: mgmt/trunk/mint/python/mint/dbexpire.py
===================================================================
--- mgmt/trunk/mint/python/mint/dbexpire.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/dbexpire.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,58 +0,0 @@
-import logging
-import mint
-import time
-from threading import Thread
-from mint.schema import *
-from sql import *
-from util import *
-
-log = logging.getLogger("mint.dbexpire")
-
-class DBExpireThread(MintDaemonThread):
- def __init__(self, model):
- super(DBExpireThread, self).__init__(model)
-
- self.keepCurrStats = False
-
- self.ops = []
- self.attrs = dict()
-
- def init(self):
- frequency = self.model.expireFrequency
- threshold = self.model.expireThreshold
-
- for cls in mint.schema.statsClasses:
- self.ops.append(SqlExpire(eval(cls), self.keepCurrStats))
- self.ops.append(SqlExpire(Job, self.keepCurrStats))
-
- self.attrs["threshold"] = threshold
-
- frequency_out, frequency_unit = self.__convertTimeUnits(frequency)
- threshold_out, threshold_unit = self.__convertTimeUnits(threshold)
- log.debug("Expiring database records older than %d %s, every %d %s" % \
- (threshold_out, threshold_unit, frequency_out, frequency_unit))
-
- def run(self):
- frequency = self.model.expireFrequency
-
- while True:
- if self.stopRequested:
- break
- up = mint.update.DBExpireUpdate(self.model)
- self.model.updateThread.enqueue(up)
- time.sleep(frequency)
-
- def __convertTimeUnits(self, t):
- if t / (24*3600) >= 1:
- t_out = t / (24*3600)
- t_unit = "days"
- elif t / 3600 >= 1:
- t_out = t / 3600
- t_unit = "hours"
- elif t / 60 >= 1:
- t_out = t / 60
- t_unit = "minutes"
- else:
- t_out = t
- t_unit = "seconds"
- return (t_out, t_unit)
Copied: mgmt/trunk/mint/python/mint/expire.py (from rev 3248, mgmt/trunk/mint/python/mint/dbexpire.py)
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py (rev 0)
+++ mgmt/trunk/mint/python/mint/expire.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,58 @@
+import logging
+import mint
+import time
+from threading import Thread
+from mint.schema import *
+from sql import *
+from util import *
+
+log = logging.getLogger("mint.expire")
+
+class ExpireThread(MintDaemonThread):
+ def __init__(self, app):
+ super(ExpireThread, self).__init__(app)
+
+ self.keepCurrStats = False
+
+ self.ops = []
+ self.attrs = dict()
+
+ def init(self):
+ frequency = self.app.expireFrequency
+ threshold = self.app.expireThreshold
+
+ for cls in mint.schema.statsClasses:
+ self.ops.append(SqlExpire(eval(cls), self.keepCurrStats))
+ self.ops.append(SqlExpire(Job, self.keepCurrStats))
+
+ self.attrs["threshold"] = threshold
+
+ frequency_out, frequency_unit = self.__convertTimeUnits(frequency)
+ threshold_out, threshold_unit = self.__convertTimeUnits(threshold)
+ log.debug("Expiring database records older than %d %s, every %d %s" % \
+ (threshold_out, threshold_unit, frequency_out, frequency_unit))
+
+ def run(self):
+ frequency = self.app.expireFrequency
+
+ while True:
+ if self.stopRequested:
+ break
+ up = mint.update.ExpireUpdate(self.app.model)
+ self.app.updateThread.enqueue(up)
+ time.sleep(frequency)
+
+ def __convertTimeUnits(self, t):
+ if t / (24*3600) >= 1:
+ t_out = t / (24*3600)
+ t_unit = "days"
+ elif t / 3600 >= 1:
+ t_out = t / 3600
+ t_unit = "hours"
+ elif t / 60 >= 1:
+ t_out = t / 60
+ t_unit = "minutes"
+ else:
+ t_out = t
+ t_unit = "seconds"
+ return (t_out, t_unit)
Property changes on: mgmt/trunk/mint/python/mint/expire.py
___________________________________________________________________
Name: svn:mergeinfo
+
Name: svn:eol-style
+ native
Modified: mgmt/trunk/mint/python/mint/main.py
===================================================================
--- mgmt/trunk/mint/python/mint/main.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/main.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -1,30 +1,77 @@
import sys
import os
+import logging
from parsley.config import Config, ConfigParameter
from parsley.loggingex import enable_logging
-from mint import MintModel
+from database import MintDatabase
+from model import MintModel
+from update import UpdateThread
+from poll import PollThread
+from expire import ExpireThread
+log = logging.getLogger("mint.main")
+
class Mint(object):
def __init__(self, config):
self.config = config
+ self.database = MintDatabase(self)
+ self.model = MintModel(self)
- self.model = MintModel(self.config.data)
- self.model.expireFrequency = self.config.expire_frequency
- self.model.expireThreshold = self.config.expire_threshold
+ self.updateEnabled = True
+ self.updateThread = UpdateThread(self)
+ self.pollEnabled = False
+ self.pollThread = PollThread(self)
+
+ self.expireEnabled = True
+ self.expireFrequency = self.config.expire_frequency
+ self.expireThreshold = self.config.expire_threshold
+ self.expireThread = ExpireThread(self)
+
def check(self):
+ self.database.check()
self.model.check()
def init(self):
+ self.database.init()
self.model.init()
+ def state(cond):
+ return cond and "enabled" or "disabled"
+
+ log.info("Updates are %s", state(self.updateEnabled))
+ log.info("Polling is %s", state(self.pollEnabled))
+ log.info("Expiration is %s", state(self.expireEnabled))
+
+ self.updateThread.init()
+ self.pollThread.init()
+ self.expireThread.init()
+
def start(self):
self.model.start()
+ if self.updateEnabled:
+ self.updateThread.start()
+
+ if self.pollEnabled:
+ self.pollThread.start()
+
+ if self.expireEnabled:
+ self.expireThread.start()
+
def stop(self):
self.model.stop()
+ if self.updateEnabled:
+ self.updateThread.stop()
+
+ if self.pollEnabled:
+ self.pollThread.stop()
+
+ if self.expireEnabled:
+ self.expireThread.stop()
+
class MintConfig(Config):
def __init__(self):
super(MintConfig, self).__init__()
@@ -35,6 +82,9 @@
param = ConfigParameter(self, "data", str)
param.default = "postgresql://mint@localhost/mint"
+ param = ConfigParameter(self, "qmf", str)
+ param.default = "amqp://localhost"
+
param = ConfigParameter(self, "log-file", str)
param.default = os.path.join(self.home, "log", "mint.log")
@@ -50,13 +100,15 @@
param = ConfigParameter(self, "expire-threshold", int)
param.default = 24 * 3600 # 1 day
- def init(self, opts):
+ def init(self, opts=None):
super(MintConfig, self).init()
self.load_file(os.path.join(self.home, "etc", "mint.conf"))
self.load_file(os.path.join(os.path.expanduser("~"), ".mint.conf"))
- self.load_dict(opts)
+ if opts:
+ self.load_dict(opts)
+
if self.debug:
enable_logging("mint", "debug", sys.stderr)
else:
Added: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py (rev 0)
+++ mgmt/trunk/mint/python/mint/model.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,397 @@
+import logging
+import qmf.console
+import pickle
+import struct
+import sys
+import os
+import types
+import socket
+
+from qmf.console import ClassKey
+from qpid.datatypes import UUID
+from qpid.util import URL
+from sqlobject import *
+from threading import Lock, RLock
+from time import sleep
+from traceback import print_exc
+
+from mint import update
+from mint.cache import MintCache
+from mint.schema import *
+from util import *
+
+log = logging.getLogger("mint.model")
+
+thisModule = __import__(__name__)
+
+for item in dir(mint.schema):
+ cls = getattr(mint.schema, item)
+
+ try:
+ if issubclass(cls, SQLObject) and cls is not SQLObject:
+ setattr(thisModule, item, cls)
+ except TypeError:
+ pass
+
+Broker.sqlmeta.addColumn(ForeignKey("BrokerRegistration",
+ cascade="null", default=None,
+ name="registration"))
+
+class Subject(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ password = StringCol(length=1000, default=None)
+ lastChallenged = TimestampCol(default=None)
+ lastLoggedIn = TimestampCol(default=None)
+ lastLoggedOut = TimestampCol(default=None)
+ roles = SQLRelatedJoin("Role", intermediateTable="subject_role_mapping",
+ createRelatedTable=False)
+
+ def getByName(cls, name):
+ try:
+ return Subject.selectBy(name=name)[0]
+ except IndexError:
+ pass
+
+ getByName = classmethod(getByName)
+
+class Role(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ subjects = SQLRelatedJoin("Subject",
+ intermediateTable="subject_role_mapping",
+ createRelatedTable=False)
+
+ def getByName(cls, name):
+ try:
+ return Role.selectBy(name=name)[0]
+ except IndexError:
+ pass
+
+ getByName = classmethod(getByName)
+
+class SubjectRoleMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ subject = ForeignKey("Subject", notNull=True, cascade=True)
+ role = ForeignKey("Role", notNull=True, cascade=True)
+ unique = DatabaseIndex(subject, role, unique=True)
+
+class ObjectNotFound(Exception):
+ pass
+
+class MintInfo(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ version = StringCol(length=1000, default="0.1", notNone=True)
+
+class CollectorRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None)
+ collectorId = StringCol(length=1000, default=None)
+
+class BrokerRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ url = StringCol(length=1000, default=None)
+ broker = ForeignKey("Broker", cascade="null", default=None)
+ cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
+ profile = ForeignKey("BrokerProfile", cascade="null", default=None)
+
+ url_unique = DatabaseIndex(url, unique=True)
+
+ def getBrokerId(self):
+ return self.mintBroker.qmfId
+
+ def getDefaultVhost(self):
+ if self.broker:
+ try:
+ return Vhost.selectBy(broker=self.broker, name="/")[0]
+ except IndexError:
+ return None
+
+class BrokerGroup(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ brokers = SQLRelatedJoin("Broker",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
+
+class BrokerGroupMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ broker = ForeignKey("Broker", notNull=True, cascade=True)
+ brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+ unique = DatabaseIndex(broker, brokerGroup, unique=True)
+
+class BrokerCluster(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
+
+class BrokerProfile(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
+ properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
+
+class MintBroker(object):
+ def __init__(self, url, qmfBroker):
+ self.url = url
+ self.qmfBroker = qmfBroker
+
+ self.qmfId = None
+ self.databaseId = None
+
+ self.objectDatabaseIds = MintCache() # database ids by qmf object id
+ self.orphans = dict() # updates by qmf object id
+
+ self.connected = False
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.url)
+
+ def getAmqpSession(self):
+ return self.qmfBroker.getAmqpSession()
+
+ def getAmqpSessionId(self):
+ return self.qmfBroker.getSessionId()
+
+ def getFullUrl(self):
+ return self.qmfBroker.getFullUrl()
+
+class MintModel(qmf.console.Console):
+ staticInstance = None
+
+ def __init__(self, app):
+ assert MintModel.staticInstance is None
+ MintModel.staticInstance = self
+
+ self.app = app
+
+ # Lookup tables used for recovering MintBroker objects, which have
+ # mint-specific accounting and wrap a qmf broker object
+
+ self.mintBrokersByQmfBroker = dict()
+ self.mintBrokersById = dict()
+ self.mintBrokersByUrl = dict()
+
+ # Agent heartbeats
+ # (broker bank, agent bank) => latest heartbeat timestamp
+
+ self.heartbeatsByAgentId = dict()
+
+ self.__lock = RLock()
+
+ self.dbConn = None
+ self.qmfSession = None
+
+ self.outstandingMethodCalls = dict()
+
+ def lock(self):
+ self.__lock.acquire()
+
+ def unlock(self):
+ self.__lock.release()
+
+ def check(self):
+ pass
+
+ def init(self):
+ assert self.qmfSession is None
+
+ self.qmfSession = qmf.console.Session \
+ (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+
+ def start(self):
+ pass
+
+ def stop(self):
+ for mbroker in self.mintBrokersById.values():
+ self.delBroker(mbroker)
+
+ def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
+ self.lock()
+ try:
+ broker = self.mintBrokersById[brokerId]
+ finally:
+ self.unlock()
+
+ seq = self.qmfSession._sendMethodRequest \
+ (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
+
+ if seq is not None:
+ self.lock()
+ try:
+ self.outstandingMethodCalls[seq] = callback
+ finally:
+ self.unlock()
+ return seq
+
+ def addBroker(self, url):
+ log.info("Adding qmf broker at %s", url)
+
+ self.lock()
+ try:
+ qbroker = self.qmfSession.addBroker(url)
+ mbroker = MintBroker(url, qbroker)
+
+ # Can't add the by-id mapping here, as the id is not set yet;
+ # instead we add it when brokerConnected is called
+
+ self.mintBrokersByQmfBroker[qbroker] = mbroker
+ self.mintBrokersByUrl[url] = mbroker
+
+ return mbroker
+ finally:
+ self.unlock()
+
+ def delBroker(self, mbroker):
+ assert isinstance(mbroker, MintBroker)
+
+ log.info("Removing qmf broker at %s", mbroker.url)
+
+ self.lock()
+ try:
+ self.qmfSession.delBroker(mbroker.qmfBroker)
+
+ del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
+ del self.mintBrokersByUrl[mbroker.url]
+
+ if mbroker.qmfId:
+ del self.mintBrokersById[mbroker.qmfId]
+ finally:
+ self.unlock()
+
+ def brokerConnected(self, qbroker):
+ """ Invoked when a connection is established to a broker """
+ self.lock()
+ try:
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ assert mbroker.connected is False
+
+ mbroker.connected = True
+ finally:
+ self.unlock()
+
+ def brokerInfo(self, qbroker):
+ self.lock()
+ try:
+ id = str(qbroker.getBrokerId())
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ mbroker.qmfId = id
+ self.mintBrokersById[id] = mbroker
+ finally:
+ self.unlock()
+
+ def brokerDisconnected(self, qbroker):
+ """ Invoked when the connection to a broker is lost """
+ self.lock()
+ try:
+ mbroker = self.mintBrokersByQmfBroker[qbroker]
+
+ assert mbroker.connected is True
+
+ mbroker.connected = False
+ finally:
+ self.unlock()
+
+ def getMintBrokerByQmfBroker(self, qbroker):
+ self.lock()
+ try:
+ return self.mintBrokersByQmfBroker[qbroker]
+ finally:
+ self.unlock()
+
+ def newPackage(self, name):
+ """ Invoked when a QMF package is discovered. """
+ pass
+
+ def newClass(self, kind, classKey):
+ """ Invoked when a new class is discovered. Session.getSchema can be
+ used to obtain details about the class."""
+ pass
+
+ def newAgent(self, agent):
+ """ Invoked when a QMF agent is discovered. """
+ pass
+
+ def delAgent(self, agent):
+ """ Invoked when a QMF agent disconects. """
+ pass
+
+ def objectProps(self, broker, record):
+ """ Invoked when an object is updated. """
+
+ if not self.app.updateThread.isAlive():
+ return
+
+ mbroker = self.getMintBrokerByQmfBroker(broker)
+
+ up = update.PropertyUpdate(self, mbroker, record)
+
+ if record.getClassKey().getClassName() == "job":
+ up.priority = 1
+
+ self.app.updateThread.enqueue(up)
+
+ def objectStats(self, broker, record):
+ """ Invoked when an object is updated. """
+
+ if not self.app.updateThread.isAlive():
+ return
+
+ if record.getClassKey().getClassName() == "job":
+ return
+
+ mbroker = self.getMintBrokerByQmfBroker(broker)
+ up = update.StatisticUpdate(self, mbroker, record)
+
+ self.app.updateThread.enqueue(up)
+
+ def event(self, broker, event):
+ """ Invoked when an event is raised. """
+ pass
+
+ def heartbeat(self, agent, timestamp):
+ key = (agent.getBrokerBank(), agent.getAgentBank())
+ timestamp = timestamp / 1000000000
+ self.heartbeatsByAgentId[key] = datetime.fromtimestamp(timestamp)
+
+ def getLatestHeartbeat(self, scopeId):
+ scope = int(scopeId)
+ broker = (scope & 0x0000FFFFF0000000) >> 28
+ agent = scope & 0x000000000FFFFFFF
+ key = (broker, agent)
+
+ return self.heartbeatsByAgentId.get(key)
+
+ def methodResponse(self, broker, seq, response):
+ # XXX don't do this via the update thread?
+
+ if not self.updateThread.isAlive():
+ return
+
+ mbroker = self.getMintBrokerByQmfBroker(broker)
+ up = update.MethodUpdate(self, mbroker, seq, response)
+
+ self.app.updateThread.enqueue(up)
Added: mgmt/trunk/mint/python/mint/poll.py
===================================================================
--- mgmt/trunk/mint/python/mint/poll.py (rev 0)
+++ mgmt/trunk/mint/python/mint/poll.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -0,0 +1,46 @@
+import logging
+from time import sleep
+from psycopg2 import OperationalError
+from mint.model import BrokerRegistration
+
+from util import MintDaemonThread
+
+log = logging.getLogger("mint.poll")
+
+class PollThread(MintDaemonThread):
+ interval = 5
+
+ def run(self):
+ log.info("Polling for changes every %i seconds", self.interval)
+
+ while True:
+ try:
+ if self.stopRequested:
+ break
+
+ self.do_run()
+ except OperationalError, e:
+ log.exception(e)
+ if str(e).find("server closed the connection unexpectedly") >= 0:
+ sys.exit() # XXX This seems like the wrong thing to do
+ except Exception, e:
+ log.exception(e)
+
+ sleep(self.interval)
+
+ def do_run(self):
+ regUrls = set()
+
+ for reg in BrokerRegistration.select():
+ if reg.url not in self.app.model.mintBrokersByUrl:
+ try:
+ self.app.model.addBroker(reg.url)
+ except socket.error, e:
+ log.info("Can't connect to broker at %s: %s", reg.url, e)
+
+ regUrls.add(reg.url)
+
+ for mbroker in self.app.model.mintBrokersByQmfBroker.values():
+ if mbroker.url not in regUrls:
+ self.app.model.delBroker(mbroker)
+
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/tools.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -89,6 +89,9 @@
def do_run(self, opts, args):
app = Mint(self.config)
+
+ app.pollEnabled = True
+
app.check()
app.init()
app.start()
@@ -109,24 +112,17 @@
def do_run(self, opts, args):
app = Mint(self.config)
- app.model.pollRegistrations = False
-
app.check()
app.init()
app.start()
- added = list()
-
try:
for arg in args[1:]:
- added.append(app.model.addBroker(arg))
+ app.model.addBroker(arg)
while True:
sleep(2)
finally:
- for broker in added:
- app.model.delBroker(broker)
-
app.stop()
class MintBenchTool(BaseMintTool):
@@ -142,14 +138,10 @@
def do_run(self, opts, args):
app = Mint(self.config)
- app.model.pollRegistrations = False
-
app.check()
app.init()
app.start()
- added = list()
-
head = "%6s %6s %6s %6s %6s %6s %6s %6s %6s" % \
("enqs", "deqs", "depth", "drop", "defer",
"prop", "stat", "meth", "exp")
@@ -158,7 +150,7 @@
try:
for arg in args[1:]:
try:
- added.append(app.model.addBroker(arg))
+ app.model.addBroker(arg)
except socket.error, e:
print "Warning: Failed connecting to broker at '%s'" % arg
@@ -182,7 +174,7 @@
sleep(1)
- ut = app.model.updateThread
+ ut = app.updateThread
enq = ut.enqueueCount
deq = ut.dequeueCount
@@ -228,9 +220,5 @@
finally:
#from threading import enumerate
#for item in enumerate():
- # print item
- for broker in added:
- app.model.delBroker(broker)
-
app.stop()
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/update.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -17,10 +17,14 @@
log = logging.getLogger("mint.update")
-class ModelUpdateThread(MintDaemonThread):
- def __init__(self, model):
- super(ModelUpdateThread, self).__init__(model)
+class UpdateThread(MintDaemonThread):
+ """
+ Only the update thread writes to the database
+ """
+ def __init__(self, app):
+ super(UpdateThread, self).__init__(app)
+
self.updates = UpdateQueue(slotCount=2)
self.enqueueCount = 0
@@ -36,7 +40,7 @@
self.conn = None
def init(self):
- self.conn = self.model.dbConn.getConnection()
+ self.conn = self.app.database.getConnection()
def enqueue(self, update):
try:
@@ -71,7 +75,7 @@
continue
self.process_update(update)
-
+
def process_update(self, update):
try:
update.process(self)
@@ -93,13 +97,13 @@
def commit(self):
self.conn.commit()
- for broker in self.model.mintBrokersByQmfBroker.values():
+ for broker in self.app.model.mintBrokersByQmfBroker.values():
broker.objectDatabaseIds.commit()
def rollback(self):
self.conn.rollback()
- for broker in self.model.mintBrokersByQmfBroker.values():
+ for broker in self.app.model.mintBrokersByQmfBroker.values():
broker.objectDatabaseIds.rollback()
class ReferenceException(Exception):
@@ -214,10 +218,15 @@
self.processTimestamp("qmfUpdateTime", timestamps[0], attrs)
- if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - timedelta(seconds=thread.model.expireThread.threshold):
- # drop updates for Jobs that are older then the expiration threshold,
- # since they would be deleted anyway in the next run of the db expiration thread
+ delta = timedelta(seconds=thread.app.expireThreshold)
+
+ if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - delta:
+ # drop updates for Jobs that are older then the expiration
+ # threshold, since they would be deleted anyway in the next run
+ # of the db expiration thread
+
log.debug("Property update is older than expiration threshold; skipping it")
+
thread.dropCount += 1
return
@@ -384,18 +393,18 @@
thread.methUpdateCount += 1
-class DBExpireUpdate(ModelUpdate):
+class ExpireUpdate(ModelUpdate):
def __init__(self, model):
- super(DBExpireUpdate, self).__init__(model, None, None)
+ super(ExpireUpdate, self).__init__(model, None, None)
def process(self, thread):
cursor = thread.cursor()
- attrs = self.model.expireThread.attrs
+ attrs = thread.app.expireThread.attrs
total = 0
thread.commit()
- for op in self.model.expireThread.ops:
+ for op in thread.app.expireThread.ops:
log.debug("Running expire op %s", op)
count = op.execute(cursor, attrs)
Modified: mgmt/trunk/mint/python/mint/util.py
===================================================================
--- mgmt/trunk/mint/python/mint/util.py 2009-04-02 19:42:25 UTC (rev 3252)
+++ mgmt/trunk/mint/python/mint/util.py 2009-04-02 19:49:59 UTC (rev 3253)
@@ -5,15 +5,18 @@
log = logging.getLogger("mint")
class MintDaemonThread(Thread):
- def __init__(self, model):
+ def __init__(self, app):
super(MintDaemonThread, self).__init__()
- self.model = model
+ self.app = app
self.stopRequested = False
self.setDaemon(True)
+ def init(self):
+ pass
+
def stop(self):
assert self.stopRequested is False
self.stopRequested = True
15 years, 9 months
rhmessaging commits: r3252 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-04-02 15:42:25 -0400 (Thu, 02 Apr 2009)
New Revision: 3252
Modified:
mgmt/trunk/cumin/python/cumin/parameters.py
Log:
Work around a name collision
Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py 2009-04-02 16:52:13 UTC (rev 3251)
+++ mgmt/trunk/cumin/python/cumin/parameters.py 2009-04-02 19:42:25 UTC (rev 3252)
@@ -1,6 +1,6 @@
-import model
from wooly import *
from mint import *
+import model
class CuminObjectParameter(Parameter):
def __init__(self, app, name, cumin_class):
15 years, 9 months
rhmessaging commits: r3251 - in store/trunk/java/bdbstore: src/test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2009-04-02 12:52:13 -0400 (Thu, 02 Apr 2009)
New Revision: 3251
Removed:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
Modified:
store/trunk/java/bdbstore/build.xml
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
Log:
Update based on QPID-1764 to correct tests and ensure testing can be carried out correctly
Modified: store/trunk/java/bdbstore/build.xml
===================================================================
--- store/trunk/java/bdbstore/build.xml 2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/build.xml 2009-04-02 16:52:13 UTC (rev 3251)
@@ -17,6 +17,8 @@
<property name="java.target" value="1.5"/>
<property name="java.source" value="1.5"/>
+ <dirname property="project.root" file="${ant.file.common}"/>
+
<property name="build.classes" location="build/classes"/>
<property name="build.test.classes" location="build/test/classes"/>
<property name="build.tools.classes" location="build/tools/classes"/>
@@ -28,14 +30,13 @@
<property name="release.tar" location="${release.dir}/${project.namever}.tar"/>
<property name="release.tgz" location="${release.dir}/${project.namever}.tgz"/>
<property name="release.bz2" location="${release.dir}/${project.namever}.bz2"/>
+ <property name="qpid.work.dir" location="${project.root}/build/test-work"/>
<property name="java.naming.factory.initial" value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory"/>
<available property="src.test.dir.exists" file="${src.test.dir}"/>
- <dirname property="project.root" file="${ant.file.common}"/>
-
<property file="${project.root}/default.testprofile"/>
<path id="class.path">
@@ -102,7 +103,7 @@
<jar destfile="${bdbtools.jar}" basedir="${build.tools.classes}"/>
</target>
- <target name="test" depends="build-tests" if="src.test.dir.exists"
+ <target name="test" depends="build-tests,prepare-tests" if="src.test.dir.exists"
unless="${dontruntest}" description="execute unit tests">
<delete file="${module.failed}"/>
@@ -110,7 +111,6 @@
<junit fork="${test.fork}" maxmemory="${test.mem}" reloading="no"
haltonfailure="${haltonfailure}" haltonerror="${haltonerror}"
failureproperty="test.failures" printsummary="on" timeout="600000" >
-
<sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
<sysproperty key="root.logging.level" value="${root.logging.level}"/>
<sysproperty key="log4j.configuration" value="${log4j.configuration}"/>
@@ -125,8 +125,8 @@
<sysproperty key="max_prefetch" value ="${max_prefetch}"/>
<sysproperty key="example.plugin.target" value="${project.root}/build/lib/plugins"/>
<sysproperty key="QPID_HOME" value="${project.root}"/>
- <sysproperty key="QPID_WORK" value="${project.root}/build/test-work"/>
- <sysproperty key="BDB_WORK" value="${project.root}/build/test-work/bdbstore"/>
+ <sysproperty key="QPID_WORK" value="${qpid.work.dir}"/>
+ <sysproperty key="BDB_WORK" value="${qpid.work.dir}/bdbstore"/>
<sysproperty key="BDB_HOME" value="${project.root}"/>
<sysproperty key="test.excludes" value="false"/>
@@ -160,6 +160,10 @@
<mkdir dir="${release.dir}"/>
</target>
+ <target name="prepare-tests">
+ <mkdir dir="${qpid.work.dir}"/>
+ </target>
+
<target name="zip" depends="build,prepare" description="build release archive">
<zip destfile="${release.zip}">
<zipfileset dir="${bin.dir}" prefix="${project.namever}/bin" filemode="755">
Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-04-02 16:52:13 UTC (rev 3251)
@@ -34,46 +34,39 @@
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.MockContentChunk;
-import org.apache.qpid.server.queue.MockPersistentAMQMessage;
-import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.io.File;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
public class BDBStoreTest extends BDBVMTestCase
{
private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
- private BDBMessageStore _transactionLog;
+ private BDBMessageStore _bdbMessageStore;
+ private TransactionLog _transactionLog;
+ private RoutingTable _routingTable;
private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
private StoreContext _storeContext = new StoreContext();
@@ -104,12 +97,18 @@
deleteDirectory(bdbDir);
BDB_DIR.mkdirs();
- _transactionLog = new InspectableBDBMessageStore();
- _transactionLog.configure(BDB_DIR);
+ _bdbMessageStore= new BDBMessageStore();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", new PropertiesConfiguration()), _transactionLog);
- _transactionLog.setVirtualHost(_virtualHost);
+ _routingTable = _bdbMessageStore;
+ VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", new PropertiesConfiguration());
+
+ _transactionLog = new TestableTransactionLog(_bdbMessageStore.configure(BDB_DIR));
+
+ _virtualHost = new VirtualHost(vhostConfig, _transactionLog);
+
+ _bdbMessageStore.setVirtualHost(_virtualHost);
+
_txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>());
}
@@ -120,10 +119,12 @@
PropertiesConfiguration env = new PropertiesConfiguration();
env.addProperty("store.environment-path", STORE_LOCATION);
- env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
+ env.addProperty("store.class", "org.apache.qpid.server.transactionlog.TestableTransactionLog");
+ env.addProperty("store.delegate", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+
_virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
- _transactionLog = (BDBMessageStore) _virtualHost.getTransactionLog();
+ _transactionLog = _virtualHost.getTransactionLog();
}
public void tearDown() throws Exception
@@ -170,13 +171,13 @@
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, queueArguments);
- _transactionLog.createQueue(queue, queueArguments);
+ _routingTable.createQueue(queue, queueArguments);
AMQShortString routingKey = new AMQShortString("Test-Key");
FieldTable bindArguments = new FieldTable();
bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
- _transactionLog.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
+ _routingTable.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
reload();
@@ -252,7 +253,7 @@
_transactionLog.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb, 1));
_transactionLog.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
- MessageMetaData mmd = _transactionLog.getMessageMetaData(_storeContext, 14L);
+ MessageMetaData mmd = _bdbMessageStore.getMessageMetaData(_storeContext, 14L);
MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
Assert.assertEquals("Message exchange has changed", pubBody.getExchange(), returnedPubBody.getExchange());
Assert.assertEquals("Immediate flag has changed", pubBody.isImmediate(), returnedPubBody.isImmediate());
@@ -267,7 +268,7 @@
Assert.assertEquals("Property ContentType has changed", props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
Assert.assertEquals("Property MessageID has changed", props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
Assert.assertEquals("MessageMD ChunkCount has changed", mmd.getContentChunkCount(), 1);
- ContentChunk returnedContentBody = _transactionLog.getContentBodyChunk(_storeContext, 14L, 0);
+ ContentChunk returnedContentBody = _bdbMessageStore.getContentBodyChunk(_storeContext, 14L, 0);
ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
returnedPayloadAsBytes.get(returnedPayload);
@@ -285,13 +286,13 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
_transactionLog.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb, 1));
_transactionLog.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
- _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+ _bdbMessageStore.getContentBodyChunk(_storeContext, 15L, 0);
_transactionLog.removeMessage(_storeContext, 15L);
// the next line should throw since the message id should not be found
try
{
- _transactionLog.getMessageMetaData(_storeContext, 15L);
+ _bdbMessageStore.getMessageMetaData(_storeContext, 15L);
Assert.fail("No exception thrown when message id not found getting metadata");
}
catch (AMQException e)
@@ -301,7 +302,7 @@
try
{
- _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+ _bdbMessageStore.getContentBodyChunk(_storeContext, 15L, 0);
Assert.fail("No exception thrown when message id not found getting content chunk");
}
catch (AMQException e)
@@ -324,14 +325,16 @@
_transactionLog.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 20L);
- _transactionLog.enqueueMessage(_storeContext, queue, 21L);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+ _transactionLog.enqueueMessage(_storeContext, queues, 20L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 21L);
_transactionLog.commitTran(_storeContext);
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Enqueued messages have changed", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 20L, val.longValue());
@@ -342,7 +345,7 @@
public void testTranRollback1() throws Exception
{
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -357,21 +360,23 @@
_transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 30L);
- _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+ _transactionLog.enqueueMessage(_storeContext, queues, 30L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 31L);
_transactionLog.commitTran(_storeContext);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 32L);
_transactionLog.abortTran(_storeContext);
_transactionLog.beginTran(_storeContext);
_transactionLog.commitTran(_storeContext);
- enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
@@ -383,7 +388,7 @@
public void testTranRollback2() throws Exception
{
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -398,18 +403,21 @@
_transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 30L);
_transactionLog.abortTran(_storeContext);
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 31L);
- _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 31L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 32L);
_transactionLog.commitTran(_storeContext);
- enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Incorrect Enqueued Message Count", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
@@ -419,7 +427,7 @@
public void testRecovery() throws Exception
{
- List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _bdbMessageStore.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -438,16 +446,22 @@
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
- _transactionLog.createQueue(queue);
- _transactionLog.createQueue(queue2);
+ _routingTable.createQueue(queue);
+ _routingTable.createQueue(queue2);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+
_transactionLog.beginTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 40L);
- _transactionLog.enqueueMessage(_storeContext, queue, 41L);
- _transactionLog.enqueueMessage(_storeContext, queue2, 42L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 40L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 41L);
+ ArrayList<AMQQueue> queues2 = new ArrayList<AMQQueue>();
+ queues2.add(queue2);
+
+ _transactionLog.enqueueMessage(_storeContext, queues2, 42L);
_transactionLog.commitTran(_storeContext);
- _transactionLog.enqueueMessage(_storeContext, queue, 42L);
+ _transactionLog.enqueueMessage(_storeContext, queues, 42L);
reload();
@@ -481,20 +495,23 @@
_transactionLog.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _transactionLog.createQueue(queue);
+ _routingTable.createQueue(queue);
- _transactionLog.enqueueMessage(_storeContext, queue, 50L);
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.add(queue);
+
+ _transactionLog.enqueueMessage(_storeContext, queues, 50L);
_transactionLog.dequeueMessage(_storeContext, queue, 50L);
}
public void testQueueRemove() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _transactionLog.createQueue(queue);
- _transactionLog.removeQueue(queue);
+ _routingTable.createQueue(queue);
+ _routingTable.removeQueue(queue);
try
{
- _transactionLog.removeQueue(queue);
+ _routingTable.removeQueue(queue);
Assert.fail("No exception thrown when deleting non-existant queue");
}
catch (AMQException e)
Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-04-02 16:52:13 UTC (rev 3251)
@@ -178,6 +178,7 @@
startBroker(1, VERSION_2);
+ ///* This test is currently broken due to QPID-1275
//Ensure that the selector was preseved on restart and caused all msgs to be removed.
sendAndCheckDurableSubscriber(broker, false, true, 0, null);
stopBroker(1);
Deleted: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java 2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java 2009-04-02 16:52:13 UTC (rev 3251)
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.TestTransactionLog;
-
-import java.util.List;
-
-public class InspectableBDBMessageStore extends BDBMessageStore implements TestTransactionLog
-{
- public List<AMQQueue> getMessageReferenceMap(Long messageId)
- {
- return _messageOnQueueMap.get(messageId);
- }
-}
Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java 2009-04-02 15:58:37 UTC (rev 3250)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java 2009-04-02 16:52:13 UTC (rev 3251)
@@ -29,6 +29,8 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -39,6 +41,7 @@
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -56,7 +59,8 @@
private static final Logger _log = Logger.getLogger(MessagePersistenceTest.class);
- protected InspectableBDBMessageStore _transactionLog;
+ protected TestTransactionLog _transactionLog;
+ protected BDBMessageStore _messageStore;
private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
protected VirtualHost _virtualHost;
@@ -74,6 +78,7 @@
protected boolean _transactional;
protected boolean _ack;
+
public void setUp() throws Exception
{
if (BDB_DIR.exists())
@@ -88,11 +93,12 @@
deleteDirectory(bdbDir);
BDB_DIR.mkdirs();
- _transactionLog = new InspectableBDBMessageStore();
- _transactionLog.configure(BDB_DIR);
+ _messageStore = new BDBMessageStore();
+ _transactionLog = new TestableTransactionLog(_messageStore.configure(BDB_DIR));
+
_virtualHost = new VirtualHost(new VirtualHostConfiguration("bdbtest",new PropertiesConfiguration()), _transactionLog);
- _transactionLog.setVirtualHost(_virtualHost);
+ _messageStore.setVirtualHost(_virtualHost);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -105,6 +111,13 @@
_ack = true;
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ ApplicationRegistry.removeAll();
+ super.tearDown();
+ }
+
protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
{
IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
@@ -153,11 +166,11 @@
checkMessageMetaDataExists(messageId);
// Check that it is enqueued
- List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
- assertNotNull(queueList);
- assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
- assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+ List<Long> queueList = _messageStore.getEnqueuedMessages(_queue1.getName());
+ assertTrue("Message not enqueued as expected.",queueList.contains(messageId));
+ assertEquals("Queue should only have one message.", 1, queueList.size());
+
// Create consumer to correctly consume message
AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
@@ -176,7 +189,7 @@
_queue1.registerSubscription(sub1, true);
// Give the delivery thread time to deliver the message
- Thread.sleep(200);
+ Thread.sleep(300);
Thread.yield();
if (_ack)
@@ -189,11 +202,12 @@
channel1.commit();
}
}
- // Check that it is now dequeued
- queueList = _transactionLog.getMessageReferenceMap(messageId);
- assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
checkMessageMetaDataRemoved(messageId);
+
+ // Check that it is now dequeued
+ queueList = _messageStore.getEnqueuedMessages(_queue1.getName());
+ assertTrue("Queue List was not empty:" + queueList.size() , queueList.isEmpty());
}
/**
@@ -292,18 +306,21 @@
channel1.commit();
}
}
+
+
+ checkMessageMetaDataRemoved(messageId);
+
// Check that it is now dequeued
queueList = _transactionLog.getMessageReferenceMap(messageId);
assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
- checkMessageMetaDataRemoved(messageId);
}
protected void checkMessageMetaDataExists(long messageId)
{
try
{
- _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
}
catch (AMQException amqe)
{
@@ -315,7 +332,7 @@
{
try
{
- _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
fail("Message MetaData still exists for message:" + messageId);
}
catch (AMQException amqe)
15 years, 9 months
rhmessaging commits: r3250 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2009-04-02 11:58:37 -0400 (Thu, 02 Apr 2009)
New Revision: 3250
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Update based on QPID-1764 to bring BDBMS in line with the new TransactionLog interface.
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-04-01 19:40:50 UTC (rev 3249)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-04-02 15:58:37 UTC (rev 3250)
@@ -54,6 +54,7 @@
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -130,10 +131,6 @@
private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
- protected Map<Long, List<AMQQueue>> _messageOnQueueMap = new ConcurrentHashMap<Long, List<AMQQueue>>();
-
- private final Map<Transaction, Map<Long, List<AMQQueue>>> _dequeueTxMap = new HashMap<Transaction, Map<Long, List<AMQQueue>>>();
-
// Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
@@ -196,7 +193,7 @@
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
{
Configuration config = vHostConfig.getStoreConfiguration();
File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
@@ -211,15 +208,15 @@
_version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
- configure(virtualHost, environmentPath, false);
+ return new BaseTransactionLog(configure(virtualHost, environmentPath, false));
}
- public void configure(File environmentPath) throws AMQException, DatabaseException
+ public BDBMessageStore configure(File environmentPath) throws AMQException, DatabaseException
{
- configure(null, environmentPath, false);
+ return configure(null, environmentPath, false);
}
- public void configure(VirtualHost virtualHost, File environmentPath, boolean readonly) throws AMQException, DatabaseException
+ public BDBMessageStore configure(VirtualHost virtualHost, File environmentPath, boolean readonly) throws AMQException, DatabaseException
{
stateTransition(State.INITIAL, State.CONFIGURING);
@@ -256,7 +253,7 @@
{
stateTransition(State.CONFIGURED, State.STARTED);
}
-
+ return this;
}
/**
@@ -486,7 +483,7 @@
*
* @throws AMQException If the operation fails for any reason.
*/
- void removeMessage(StoreContext context, Long messageId) throws AMQException
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
{
// _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
// + "): called");
@@ -562,6 +559,8 @@
if (localTx)
{
+ // ? Will this not perform the environment default commit? Should we not be doing this async as
+ // remove will only occur when message has been fully dequeued? 2009-03-31
tx.commit();
context.setPayload(null);
}
@@ -932,13 +931,21 @@
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param queue The the queue to place the message on.
+ * @param queues The the List of queues to place the message on.
* @param messageId The message to enqueue.
*
* @throws AMQException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
+ for (AMQQueue q : queues)
+ {
+ enqueueMessage(context, q, messageId);
+ }
+ }
+
+ void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ {
// _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
// + ", Long messageId): called");
@@ -951,8 +958,6 @@
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
- recordEnqueue(messageId, queue);
-
try
{
_deliveryDb.put(tx, key, value);
@@ -977,6 +982,9 @@
{
try
{
+ // ? Why is this providing a Null TransactionConfig should we not be using _transactionConfig
+ // or at least providing some config rather than the environment default.
+ // What is the environment Default? : 2009-03-31
tx = _environment.beginTransaction(null, null);
_log.info("Creating local transaction:" + tx);
context.setPayload(tx);
@@ -1037,27 +1045,6 @@
}
- //Record the delete for processing AFTER the commit has taken place.
- synchronized (_dequeueTxMap)
- {
- Map<Long, List<AMQQueue>> transactionMap = _dequeueTxMap.get(tx);
- if (transactionMap == null)
- {
- transactionMap = new HashMap<Long, List<AMQQueue>>();
- _dequeueTxMap.put(tx, transactionMap);
- }
-
- List<AMQQueue> queueList = transactionMap.get(messageId);
-
- if (queueList == null)
- {
- queueList = new LinkedList<AMQQueue>();
- transactionMap.put(messageId, queueList);
- }
-
- queueList.add(queue);
- }
-
if (isLocal)
{
commit(tx);
@@ -1080,13 +1067,6 @@
{
throw new AMQException("Error rolling back transaction: " + e1, e1);
}
- finally
- {
- synchronized (_dequeueTxMap)
- {
- _dequeueTxMap.remove(tx);
- }
- }
}
throw new AMQException("Error accessing database while dequeuing message: " + e, e);
@@ -1097,7 +1077,6 @@
* Begins a transactional context.
*
* @param context The transactional context to begin.
- *
* @throws AMQException If the operation fails for any reason.
*/
public void beginTran(StoreContext context) throws AMQException
@@ -1154,11 +1133,18 @@
try
{
- commit(tx);
+ if (context.isAsync())
+ {
+ tx.commitNoSync();
+ }
+ else
+ {
+ commit(tx);
+ }
if (_log.isDebugEnabled())
{
- _log.debug("commit tran completed");
+ _log.debug("commit tran Async(" + context.isAsync() + ") completed");
}
}
catch (DatabaseException e)
@@ -1695,7 +1681,6 @@
}
- recordEnqueue(messageId, queue);
queue.enqueue(context, message);
}
@@ -1749,13 +1734,8 @@
tx.commitNoSync();
- Map<Long, List<AMQQueue>> dequeueMap = null;
- synchronized (_dequeueTxMap)
- {
- dequeueMap = _dequeueTxMap.remove(tx);
- }
- Commit commit = new Commit(_commitThread, tx, dequeueMap, this);
+ Commit commit = new Commit(_commitThread, tx, this);
commit.commit();
@@ -1774,19 +1754,14 @@
private final Transaction _tx;
private DatabaseException _databaseException;
private boolean _complete;
- private Map<Long, List<AMQQueue>> _messageDequeueMap;
- private TransactionLog _transactionLog;
- public Commit(CommitThread commitThread, Transaction tx, Map<Long,
- List<AMQQueue>> messageDequeueMap, TransactionLog transactionLog)
+ public Commit(CommitThread commitThread, Transaction tx, TransactionLog transactionLog)
{
// _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
// + "): called");
_commitThread = commitThread;
_tx = tx;
- _messageDequeueMap = messageDequeueMap;
- _transactionLog = transactionLog;
}
public void prepare(boolean synch) throws DatabaseException
@@ -1819,22 +1794,6 @@
_complete = true;
- // If we have dequeuedMessages so update our internal state
- if (_messageDequeueMap != null)
- {
- _log.info("Transaction(" + _tx + ") Complete : Dequeuing messages used.");
- StoreContext dequeueMessageContext = new StoreContext();
-
- for (Map.Entry<Long, List<AMQQueue>> entry : _messageDequeueMap.entrySet())
- {
- Long id = entry.getKey();
- for (AMQQueue queue : entry.getValue())
- {
- ((BDBMessageStore) _transactionLog).recordDequeue(dequeueMessageContext, id, queue);
- }
- }
- }
-
notify();
}
@@ -1855,6 +1814,8 @@
// _log.debug("public void commit(): called");
_commitThread.addJob(this);
+ //? Is it not possible that we could be notified here that the _commitThread committed this job?
+ // Should we also sync around the addJob 2009-04-01 or synchronize this method ?
synchronized (this)
{
while (!_complete)
@@ -1881,72 +1842,6 @@
}
/**
- * Record that the give message is enqueued on the specified queue.
- *
- * @param messageId The message id to enqueue
- * @param queue The queue it is enqueued upon.
- */
- private void recordEnqueue(Long messageId, AMQQueue queue)
- {
- List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
-
- if (queues == null)
- {
- queues = new LinkedList<AMQQueue>();
- }
-
- queues.add(queue);
-
- _messageOnQueueMap.put(messageId, queues);
- }
-
- /**
- * Update our records that the given message is nolonger on the specified queue.
- * If the message no longer has any queue references then we can discard the content.
- *
- * @param context
- * @param messageId
- * @param queue
- */
- private void recordDequeue(StoreContext context, Long messageId, AMQQueue queue)
- {
- _log.info("Dequeue Message(" + messageId + ") from queue(" + queue.getName() + ") context=" + context);
- List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
-
- if (queues == null)
- {
- throw new RuntimeException("Error, Tried to dequeue a message that is not enqueued");
- }
-
- if (queues.remove(queue))
- {
- // if we now have no more references to this message we can dispose of it
- if (queues.size() == 0)
- {
- try
- {
- _messageOnQueueMap.remove(messageId);
- _log.info("Removing Message(" + messageId + ") from Tlog context=" + context);
-
- removeMessage(context, messageId);
- }
- catch (AMQException e)
- {
- //todo As we are jus swallowing exception need to add clean up in load().
- // This should purge any message content that doesn't have any delivery records.
- _log.debug("Error occured removing unreferenced message:" + e.getMessage());
- }
-
- }
- }
- else
- {
- throw new RuntimeException("Error, Tried to dequeue a message from a queue, upon which it is not enqueued");
- }
-
- }
-
- /**
* Implements a thread which batches and commits a queue of {@link Commit} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
15 years, 9 months
rhmessaging commits: r3249 - mgmt/trunk/cumin/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-04-01 15:40:50 -0400 (Wed, 01 Apr 2009)
New Revision: 3249
Modified:
mgmt/trunk/cumin/bin/cumin
Log:
A different method of ensuring the child servers die
Modified: mgmt/trunk/cumin/bin/cumin
===================================================================
--- mgmt/trunk/cumin/bin/cumin 2009-04-01 19:21:13 UTC (rev 3248)
+++ mgmt/trunk/cumin/bin/cumin 2009-04-01 19:40:50 UTC (rev 3249)
@@ -1,6 +1,18 @@
#!/bin/bash
+function die {
+ kill "$mpid"
+ kill "$cpid"
+}
+
+trap die EXIT
+
mint-server &
-trap "kill $!" EXIT
+mpid="$!"
-cumin-server
+cumin-server &
+cpid="$!"
+
+while :; do
+ sleep 10
+done
15 years, 9 months
rhmessaging commits: r3248 - in mgmt/trunk: cumin and 13 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-04-01 15:21:13 -0400 (Wed, 01 Apr 2009)
New Revision: 3248
Added:
mgmt/trunk/cumin/bin/cumin-server
mgmt/trunk/cumin/etc/module.profile
mgmt/trunk/cumin/instance/
mgmt/trunk/etc/module.profile
mgmt/trunk/mint/etc/module.profile
mgmt/trunk/mint/instance/
mgmt/trunk/mint/instance/etc/
mgmt/trunk/mint/instance/etc/mint.conf
mgmt/trunk/mint/instance/log/
mgmt/trunk/mint/python/mint/main.py
mgmt/trunk/wooly/etc/
mgmt/trunk/wooly/etc/module.profile
mgmt/trunk/wooly/instance/
Removed:
mgmt/trunk/cumin-test-0/
mgmt/trunk/wooly/wooly-demo-instance/
Modified:
mgmt/trunk/cumin/bin/cumin
mgmt/trunk/cumin/instance/bin
mgmt/trunk/cumin/instance/resources
mgmt/trunk/cumin/instance/resources-wooly
mgmt/trunk/cumin/instance/sql
mgmt/trunk/cumin/python/cumin/__init__.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/tools.py
mgmt/trunk/etc/devel.profile
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/dbexpire.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
Log:
The big goal: Make mint-server and cumin-server independent.
- Make mint-server stand by itself, with its own config instead of
using cumin's. mint-server uses a MINT_HOME and looks for config
in $MINT_HOME/etc/mint.conf.
- Introduce cumin-server, which makes no attempt to start a mint
process.
- Change "cumin" to be a simple shell wrapper that starts both above
servers.
- Introduce Mint (a top level app thing) and MintConfig.
- Refactor expire thread params to support use of MintConfig.
- Reorganize mint-bench a little.
- Institute a new general pattern for devel instances of servers,
including mint-server, wooly-demo, and cumin-server.
Modified: mgmt/trunk/cumin/bin/cumin
===================================================================
--- mgmt/trunk/cumin/bin/cumin 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/cumin/bin/cumin 2009-04-01 19:21:13 UTC (rev 3248)
@@ -1,35 +1,6 @@
-#!/usr/bin/python
+#!/bin/bash
-import sys, os
+mint-server &
+trap "kill $!" EXIT
-from cumin.tools import CuminServerTool
-
-def main():
- CuminServerTool("cumin").main()
-
-if __name__ == "__main__":
- if "--profile" in sys.argv:
- from profile import Profile
- from pstats import Stats
-
- prof = Profile()
-
- try:
- prof.run("main()")
- except KeyboardInterrupt:
- pass
-
- file = "/tmp/cumin-stats"
- prof.dump_stats(file)
-
- stats = Stats(file)
-
- stats.sort_stats("cumulative").print_stats(15)
- stats.sort_stats("time").print_stats(15)
-
- stats.strip_dirs()
- else:
- try:
- main()
- except KeyboardInterrupt:
- pass
+cumin-server
Added: mgmt/trunk/cumin/bin/cumin-server
===================================================================
--- mgmt/trunk/cumin/bin/cumin-server (rev 0)
+++ mgmt/trunk/cumin/bin/cumin-server 2009-04-01 19:21:13 UTC (rev 3248)
@@ -0,0 +1,9 @@
+#!/usr/bin/python
+
+from cumin.tools import CuminServerTool
+
+if __name__ == "__main__":
+ try:
+ CuminServerTool("cumin-server").main()
+ except KeyboardInterrupt:
+ pass
Property changes on: mgmt/trunk/cumin/bin/cumin-server
___________________________________________________________________
Name: svn:executable
+ *
Added: mgmt/trunk/cumin/etc/module.profile
===================================================================
--- mgmt/trunk/cumin/etc/module.profile (rev 0)
+++ mgmt/trunk/cumin/etc/module.profile 2009-04-01 19:21:13 UTC (rev 3248)
@@ -0,0 +1,3 @@
+export PYTHONPATH="${PWD}/python:${PYTHONPATH}"
+export PATH="${PWD}/bin:${PATH}"
+export CUMIN_HOME="${PWD}/instance"
Copied: mgmt/trunk/cumin/instance (from rev 3246, mgmt/trunk/cumin-test-0)
Property changes on: mgmt/trunk/cumin/instance
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: mgmt/trunk/cumin/instance/bin
===================================================================
--- mgmt/trunk/cumin-test-0/bin 2009-03-31 14:07:45 UTC (rev 3246)
+++ mgmt/trunk/cumin/instance/bin 2009-04-01 19:21:13 UTC (rev 3248)
@@ -1 +1 @@
-link ../cumin/bin
\ No newline at end of file
+link ../bin
\ No newline at end of file
Modified: mgmt/trunk/cumin/instance/resources
===================================================================
--- mgmt/trunk/cumin-test-0/resources 2009-03-31 14:07:45 UTC (rev 3246)
+++ mgmt/trunk/cumin/instance/resources 2009-04-01 19:21:13 UTC (rev 3248)
@@ -1 +1 @@
-link ../cumin/resources
\ No newline at end of file
+link ../resources
\ No newline at end of file
Modified: mgmt/trunk/cumin/instance/resources-wooly
===================================================================
--- mgmt/trunk/cumin-test-0/resources-wooly 2009-03-31 14:07:45 UTC (rev 3246)
+++ mgmt/trunk/cumin/instance/resources-wooly 2009-04-01 19:21:13 UTC (rev 3248)
@@ -1 +1 @@
-link ../wooly/resources
\ No newline at end of file
+link ../../wooly/resources
\ No newline at end of file
Modified: mgmt/trunk/cumin/instance/sql
===================================================================
--- mgmt/trunk/cumin-test-0/sql 2009-03-31 14:07:45 UTC (rev 3246)
+++ mgmt/trunk/cumin/instance/sql 2009-04-01 19:21:13 UTC (rev 3248)
@@ -1 +1 @@
-link ../mint/sql
\ No newline at end of file
+link ../../mint/sql
\ No newline at end of file
Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/cumin/python/cumin/__init__.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -156,29 +156,15 @@
param = ConfigParameter(self, "user", str)
- param = ConfigParameter(self, "expire-frequency", int)
- param.default = 600 # 10 minutes
-
- param = ConfigParameter(self, "expire-threshold", int)
- param.default = 24 * 3600 # 1 day
-
param = ConfigParameter(self, "operator-email", str)
def init(self):
super(CuminConfig, self).init()
- handler = StreamHandler()
- log.addHandler(handler)
self.load_file(os.path.join(self.home, "etc", "cumin.conf"))
self.load_file(os.path.join(os.path.expanduser("~"), ".cumin.conf"))
- log.removeHandler(handler)
- self.init_logging()
-
- def init_logging(self):
- enable_logging("mint", self.log_level, self.log_file)
enable_logging("cumin", self.log_level, self.log_file)
if self.debug:
- enable_logging("mint", "debug", sys.stderr)
enable_logging("cumin", "debug", sys.stderr)
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/cumin/python/cumin/model.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -21,13 +21,10 @@
class CuminModel(object):
def __init__(self, app, data_uri):
self.app = app
- self.data = MintModel(data_uri,
- dbExpireFrequency=app.config.expire_frequency,
- dbExpireThreshold=app.config.expire_threshold,
- debug=False)
+ self.data = MintModel(data_uri)
self.data.updateObjects = False
- self.data.dbExpiration = False
+ self.data.expireObjects = False
self.classes = list()
self.invocations = set()
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -4,6 +4,7 @@
from parsley.command import *
from wooly.devel import *
from mint import *
+from mint.tools import MintServerTool
from getpass import getpass
from psycopg2 import IntegrityError
from subprocess import Popen
@@ -162,15 +163,17 @@
if self.config.debug:
self.config.prt()
- args = sys.argv[1:]
-
try:
- opts, remaining = self.parse_options(args)
+ opts, remaining = self.parse_options(sys.argv[1:])
except CommandException, e:
print "Error: %s" % e.message
e.command.print_help()
sys.exit(1)
+ if "help" in opts:
+ self.print_help()
+ return
+
try:
scommand = remaining[0]
except IndexError:
@@ -187,10 +190,17 @@
try:
opts, args = command.parse(remaining)
- if "help" in opts:
- command.print_help()
- return
+ print opts, args, remaining
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+ if "help" in opts:
+ command.print_help()
+ return
+
+ try:
command.run(opts, args)
except CommandException, e:
print "Error: %s" % e.message
@@ -224,6 +234,8 @@
name, url = args[1:]
except IndexError:
raise CommandException(self, "NAME and URL are required")
+ except ValueError:
+ raise CommandException(self, "NAME and URL are required")
for reg in BrokerRegistration.selectBy(name=name):
print "Error: a broker called '%s' already exists" % name
@@ -438,8 +450,6 @@
def __init__(self, name):
super(CuminServerTool, self).__init__(name)
- signal.signal(signal.SIGTERM, self.sigTermHandler)
-
self.description = "Cumin web server"
param = ConfigParameter(self.config, "addr", str)
@@ -462,44 +472,6 @@
opt = CommandOption(self, "ssl")
opt.description = "Serve web pages using SSL"
- def sigTermHandler(self, signum, frame):
- sys.exit(1)
-
- class MintProcess(object):
- def __init__(self):
- self.proc = None
-
- def start(self, config, opts):
- args = ["mint-server",
- "--data", config.data,
- "--log-file", config.log_file,
- "--log-level", config.log_level,
- "--expire-frequency", str(config.expire_frequency),
- "--expire-threshold", str(config.expire_threshold)]
-
- if "debug" in opts or config.debug:
- args.append("--debug")
-
- self.proc = Popen(args)
-
- def stop(self):
- if self.proc.poll is not None:
- os.kill(self.proc.pid, signal.SIGTERM)
-
- for i in range(30):
- code = self.proc.poll()
-
- if code is not None:
- log.debug("Mint subprocess %i terminated", self.proc.pid)
- return
-
- sleep(1)
-
- os.kill(self.proc.pid, signal.SIGKILL)
-
- log.warn("Mint subprocess %i wouldn't go gracefully, killed",
- self.proc.pid)
-
def do_run(self, opts, args):
self.config.load_dict(opts)
@@ -536,23 +508,15 @@
log.error("SSL key file '%s' not found" % kpath)
sys.exit(1)
- mint_proc = self.MintProcess()
+ app.start()
try:
try:
- mint_proc.start(self.config, opts)
- except:
- log.exception("Failed starting mint process")
-
- app.start()
-
- try:
- server.start()
+ server.start() # XXX fix the weird api here
finally:
server.stop()
finally:
app.stop()
- mint_proc.stop()
class CuminTestTool(BaseCuminTool):
def __init__(self, name):
Modified: mgmt/trunk/etc/devel.profile
===================================================================
--- mgmt/trunk/etc/devel.profile 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/etc/devel.profile 2009-04-01 19:21:13 UTC (rev 3248)
@@ -1,34 +1,28 @@
export DEVEL_HOME="$PWD"
export DEVEL_MODULES="mint cumin basil parsley wooly"
-# PYTHONPATH
+if [[ -z "$DEVEL_ORIGINAL_PATH" ]]; then
+ export DEVEL_ORIGINAL_PATH="$PATH"
+fi
-test -z "$DEVEL_ORIGINAL_PYTHONPATH" && {
+export PATH="${DEVEL_HOME}/bin:${DEVEL_ORIGINAL_PATH}"
+
+if [[ -z "$DEVEL_ORIGINAL_PYTHONPATH" ]]; then
export DEVEL_ORIGINAL_PYTHONPATH="$PYTHONPATH"
-}
+fi
-pypath="${DEVEL_HOME}/lib/python:${HOME}/lib/python:${DEVEL_ORIGINAL_PYTHONPATH}"
+export PYTHONPATH="${DEVEL_HOME}/lib/python:${HOME}/lib/python:${DEVEL_ORIGINAL_PYTHONPATH}"
for module in $DEVEL_MODULES; do
- pypath="$DEVEL_HOME"/"$module"/python:"$pypath"
-done
+ echo "Configuring module '${module}'"
-export PYTHONPATH="$pypath"
+ pushd "${DEVEL_HOME}/${module}" > /dev/null
-# PATH
+ if [[ -f "etc/module.profile" ]]; then
+ source "etc/module.profile"
+ else
+ source "../etc/module.profile"
+ fi
-test -z "$DEVEL_ORIGINAL_PATH" && {
- export DEVEL_ORIGINAL_PATH="$PATH"
-}
-
-path="${DEVEL_HOME}/bin:${DEVEL_ORIGINAL_PATH}"
-
-for module in $DEVEL_MODULES; do
- path="$DEVEL_HOME/${module}/bin:${path}"
+ popd > /dev/null
done
-
-export PATH="$path"
-
-# cumin test instance
-export CUMIN_HOME="${DEVEL_HOME}/cumin-test-0"
-export WOOLY_DEMO_HOME="${DEVEL_HOME}/wooly/wooly-demo-instance"
Added: mgmt/trunk/etc/module.profile
===================================================================
--- mgmt/trunk/etc/module.profile (rev 0)
+++ mgmt/trunk/etc/module.profile 2009-04-01 19:21:13 UTC (rev 3248)
@@ -0,0 +1,2 @@
+export PYTHONPATH="${PWD}/python:${PYTHONPATH}"
+export PATH="${PWD}/bin:${PATH}"
Added: mgmt/trunk/mint/etc/module.profile
===================================================================
--- mgmt/trunk/mint/etc/module.profile (rev 0)
+++ mgmt/trunk/mint/etc/module.profile 2009-04-01 19:21:13 UTC (rev 3248)
@@ -0,0 +1,3 @@
+export PYTHONPATH="${PWD}/python:${PYTHONPATH}"
+export PATH="${PWD}/bin:${PATH}"
+export MINT_HOME="${PWD}/instance"
Added: mgmt/trunk/mint/instance/etc/mint.conf
===================================================================
--- mgmt/trunk/mint/instance/etc/mint.conf (rev 0)
+++ mgmt/trunk/mint/instance/etc/mint.conf 2009-04-01 19:21:13 UTC (rev 3248)
@@ -0,0 +1,3 @@
+[main]
+data: postgresql://cumin@localhost/cumin
+debug: True
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/mint/python/mint/__init__.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -3,21 +3,25 @@
import pickle
import struct
import sys
+import os
import types
import socket
+
+from parsley.config import Config, ConfigParameter
+from psycopg2 import OperationalError
+from qmf.console import ClassKey
+from qpid.datatypes import UUID
+from qpid.util import URL
+from sqlobject import *
from threading import Lock, RLock
-from sqlobject import *
+from time import sleep
from traceback import print_exc
-from qpid.util import URL
-from qpid.datatypes import UUID
-from mint.schema import *
+
from mint import update
from mint.cache import MintCache
from mint.dbexpire import DBExpireThread
-from qmf.console import ClassKey
+from mint.schema import *
from util import *
-from time import sleep
-from psycopg2 import OperationalError
log = logging.getLogger("mint")
@@ -320,13 +324,18 @@
class MintModel(qmf.console.Console):
staticInstance = None
- def __init__(self, dataUri, dbExpireFrequency, dbExpireThreshold, debug=False):
+ def __init__(self, dataUri, debug=False):
self.dataUri = dataUri
self.debug = debug
+
self.updateObjects = True
+
self.pollRegistrations = True
- self.dbExpiration = True
+ self.expireObjects = True
+ self.expireFrequency = 600
+ self.expireThreshold = 24 * 3600
+
assert MintModel.staticInstance is None
MintModel.staticInstance = self
@@ -348,8 +357,7 @@
self.qmfSession = None
self.updateThread = update.ModelUpdateThread(self)
- self.dbExpireThread = dbexpire.DBExpireThread \
- (self, frequency=dbExpireFrequency, threshold=dbExpireThreshold)
+ self.expireThread = dbexpire.DBExpireThread(self)
self.registrationThread = RegistrationThread(self)
self.outstandingMethodCalls = dict()
@@ -376,7 +384,7 @@
self.pollRegistrations and "enabled" or "disabled")
log.info("Object expiration is %s",
- self.dbExpiration and "enabled" or "disabled")
+ self.expireObjects and "enabled" or "disabled")
sqlhub.processConnection = self.dbConn = connectionForURI(self.dataUri)
@@ -386,12 +394,13 @@
(self, manageConnections=True, rcvObjects=self.updateObjects)
self.updateThread.init()
+ self.expireThread.init()
def start(self):
self.updateThread.start()
- if self.dbExpiration:
- self.dbExpireThread.start()
+ if self.expireObjects:
+ self.expireThread.start()
if self.pollRegistrations:
self.registrationThread.start()
@@ -399,8 +408,8 @@
def stop(self):
self.updateThread.stop()
- if self.dbExpiration:
- self.dbExpireThread.stop()
+ if self.expireObjects:
+ self.expireThread.stop()
if self.pollRegistrations:
self.registrationThread.stop()
Modified: mgmt/trunk/mint/python/mint/dbexpire.py
===================================================================
--- mgmt/trunk/mint/python/mint/dbexpire.py 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/mint/python/mint/dbexpire.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -9,33 +9,38 @@
log = logging.getLogger("mint.dbexpire")
class DBExpireThread(MintDaemonThread):
- def __init__(self, model, frequency, threshold, keepCurrStats=False):
+ def __init__(self, model):
super(DBExpireThread, self).__init__(model)
- self.frequency = frequency
- self.threshold = threshold
- self.keepCurrStats = keepCurrStats
+ self.keepCurrStats = False
self.ops = []
+ self.attrs = dict()
+
+ def init(self):
+ frequency = self.model.expireFrequency
+ threshold = self.model.expireThreshold
+
for cls in mint.schema.statsClasses:
self.ops.append(SqlExpire(eval(cls), self.keepCurrStats))
self.ops.append(SqlExpire(Job, self.keepCurrStats))
- self.attrs = dict()
- self.attrs["threshold"] = self.threshold
-
+ self.attrs["threshold"] = threshold
+
frequency_out, frequency_unit = self.__convertTimeUnits(frequency)
threshold_out, threshold_unit = self.__convertTimeUnits(threshold)
- log.debug("Expiring database records older than %d %s, every %d %s" \
- % (threshold_out, threshold_unit, frequency_out, frequency_unit))
+ log.debug("Expiring database records older than %d %s, every %d %s" % \
+ (threshold_out, threshold_unit, frequency_out, frequency_unit))
def run(self):
+ frequency = self.model.expireFrequency
+
while True:
if self.stopRequested:
break
up = mint.update.DBExpireUpdate(self.model)
self.model.updateThread.enqueue(up)
- time.sleep(self.frequency)
+ time.sleep(frequency)
def __convertTimeUnits(self, t):
if t / (24*3600) >= 1:
Added: mgmt/trunk/mint/python/mint/main.py
===================================================================
--- mgmt/trunk/mint/python/mint/main.py (rev 0)
+++ mgmt/trunk/mint/python/mint/main.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -0,0 +1,63 @@
+import sys
+import os
+from parsley.config import Config, ConfigParameter
+from parsley.loggingex import enable_logging
+
+from mint import MintModel
+
+class Mint(object):
+ def __init__(self, config):
+ self.config = config
+
+ self.model = MintModel(self.config.data)
+ self.model.expireFrequency = self.config.expire_frequency
+ self.model.expireThreshold = self.config.expire_threshold
+
+ def check(self):
+ self.model.check()
+
+ def init(self):
+ self.model.init()
+
+ def start(self):
+ self.model.start()
+
+ def stop(self):
+ self.model.stop()
+
+class MintConfig(Config):
+ def __init__(self):
+ super(MintConfig, self).__init__()
+
+ hdef = os.path.normpath("/var/lib/mint")
+ self.home = os.environ.get("MINT_HOME", hdef)
+
+ param = ConfigParameter(self, "data", str)
+ param.default = "postgresql://mint@localhost/mint"
+
+ param = ConfigParameter(self, "log-file", str)
+ param.default = os.path.join(self.home, "log", "mint.log")
+
+ param = ConfigParameter(self, "log-level", str)
+ param.default = "warn"
+
+ param = ConfigParameter(self, "debug", bool)
+ param.default = False
+
+ param = ConfigParameter(self, "expire-frequency", int)
+ param.default = 600 # 10 minutes
+
+ param = ConfigParameter(self, "expire-threshold", int)
+ param.default = 24 * 3600 # 1 day
+
+ def init(self, opts):
+ super(MintConfig, self).init()
+
+ self.load_file(os.path.join(self.home, "etc", "mint.conf"))
+ self.load_file(os.path.join(os.path.expanduser("~"), ".mint.conf"))
+ self.load_dict(opts)
+
+ if self.debug:
+ enable_logging("mint", "debug", sys.stderr)
+ else:
+ enable_logging("mint", self.log_level, self.log_file)
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/mint/python/mint/tools.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -6,11 +6,15 @@
from parsley.loggingex import *
from mint import *
+from mint.main import *
+from util import *
class BaseMintTool(Command):
def __init__(self, name):
super(BaseMintTool, self).__init__(None, name)
+ self.config = MintConfig()
+
opt = CommandOption(self, "data")
opt.argument = "URI"
opt.description = "Connect to database at URI"
@@ -43,8 +47,6 @@
def init(self):
super(BaseMintTool, self).init()
- #self.config.init()
-
try:
import psyco
psyco.full()
@@ -63,25 +65,11 @@
self.print_help()
sys.exit(0)
- level = opts.get("log-level", "warn")
+ self.config.init(opts)
- if "debug" in opts:
- level = "debug"
- file = sys.stderr
- else:
- file = opts.get("log-file", sys.stderr)
+ self.do_run(opts, args)
- enable_logging("mint", level, file)
-
- data = opts.get("data", "postgresql://cumin@localhost/cumin")
- freq = int(opts.get("expire-frequency", 600))
- threshold = int(opts.get("expire-threshold", 24 * 3600))
-
- model = MintModel(data, freq, threshold, debug="debug" in opts)
-
- self.do_run(opts, args, model)
-
- def do_run(self, opts, args, model):
+ def do_run(self, opts, args):
raise Exception("Not implemented")
def main(self):
@@ -99,44 +87,47 @@
# get better thread switching performance
sys.setcheckinterval(200)
- def do_run(self, opts, args, model):
- model.check()
- model.init()
- model.start()
+ def do_run(self, opts, args):
+ app = Mint(self.config)
+ app.check()
+ app.init()
+ app.start()
try:
for arg in args[1:]:
- model.addBroker(arg)
+ app.model.addBroker(arg)
while True:
sleep(2)
finally:
- model.stop()
+ app.stop()
class MintTestTool(BaseMintTool):
def __init__(self, name):
super(MintTestTool, self).__init__(name)
- def do_run(self, opts, args, model):
- model.pollRegistrations = False
+ def do_run(self, opts, args):
+ app = Mint(self.config)
- model.check()
- model.init()
- model.start()
+ app.model.pollRegistrations = False
+ app.check()
+ app.init()
+ app.start()
+
added = list()
try:
for arg in args[1:]:
- added.append(model.addBroker(arg))
+ added.append(app.model.addBroker(arg))
while True:
sleep(2)
finally:
for broker in added:
- model.delBroker(broker)
+ app.model.delBroker(broker)
- model.stop()
+ app.stop()
class MintBenchTool(BaseMintTool):
def __init__(self, name):
@@ -148,93 +139,98 @@
# get better thread switching performance
sys.setcheckinterval(200)
- def do_run(self, opts, args, model):
- model.pollRegistrations = False
+ def do_run(self, opts, args):
+ app = Mint(self.config)
- model.check()
- model.init()
- model.start()
+ app.model.pollRegistrations = False
+ app.check()
+ app.init()
+ app.start()
+
added = list()
+ head = "%6s %6s %6s %6s %6s %6s %6s %6s %6s" % \
+ ("enqs", "deqs", "depth", "drop", "defer",
+ "prop", "stat", "meth", "exp")
+ row = "%6i %6i %6i %6i %6i %6i %6i %6i %6i"
+
try:
for arg in args[1:]:
- added.append(model.addBroker(arg))
+ try:
+ added.append(app.model.addBroker(arg))
+ except socket.error, e:
+ print "Warning: Failed connecting to broker at '%s'" % arg
- enq_last = 0
- deq_last = 0
- drp_last = 0
- dfr_last = 0
- prop_last = 0
- stat_last = 0
- meth_last = 0
- exp_last = 0
+ try:
+ enq_last = 0
+ deq_last = 0
+ drp_last = 0
+ dfr_last = 0
+ prop_last = 0
+ stat_last = 0
+ meth_last = 0
+ exp_last = 0
- head = "%6s %6s %6s %6s %6s %6s %6s %6s %6s" % \
- ("enqs", "deqs", "depth", "drop", "defer",
- "prop", "stat", "meth", "exp")
- row = "%6i %6i %6i %6i %6i %6i %6i %6i %6i"
+ samples = 0
- samples = 0
+ while True:
+ if samples % 24 == 0:
+ print head
- while True:
- if samples % 24 == 0:
- print head
+ samples += 1
- samples += 1
+ sleep(1)
- sleep(1)
+ ut = app.model.updateThread
- ut = model.updateThread
+ enq = ut.enqueueCount
+ deq = ut.dequeueCount
+ drp = ut.dropCount
+ dfr = ut.deferCount
- enq = ut.enqueueCount
- deq = ut.dequeueCount
- drp = ut.dropCount
- dfr = ut.deferCount
+ prop = ut.propUpdateCount
+ stat = ut.statUpdateCount
+ meth = ut.methUpdateCount
+ exp = ut.expireUpdateCount
- prop = ut.propUpdateCount
- stat = ut.statUpdateCount
- meth = ut.methUpdateCount
- exp = ut.expireUpdateCount
+ print row % (enq - enq_last,
+ deq - deq_last,
+ enq - deq,
+ drp - drp_last,
+ dfr - dfr_last,
+ prop - prop_last,
+ stat - stat_last,
+ meth - meth_last,
+ exp - exp_last)
- print row % (enq - enq_last,
- deq - deq_last,
- enq - deq,
- drp - drp_last,
- dfr - dfr_last,
- prop - prop_last,
- stat - stat_last,
- meth - meth_last,
- exp - exp_last)
+ enq_last = enq
+ deq_last = deq
+ drp_last = drp
+ dfr_last = dfr
- enq_last = enq
- deq_last = deq
- drp_last = drp
- dfr_last = dfr
+ prop_last = prop
+ stat_last = stat
+ meth_last = meth
+ exp_last = exp
+ finally:
+ print "Totals:"
- prop_last = prop
- stat_last = stat
- meth_last = meth
- exp_last = exp
+ print row % (enq,
+ deq,
+ enq - deq,
+ drp,
+ dfr,
+ prop,
+ stat,
+ meth,
+ exp)
finally:
-
- print "Totals:"
-
- print row % (enq,
- deq,
- enq - deq,
- drp,
- dfr,
- prop,
- stat,
- meth,
- exp)
-
#from threading import enumerate
#for item in enumerate():
# print item
for broker in added:
- model.delBroker(broker)
+ app.model.delBroker(broker)
- model.stop()
+ app.stop()
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2009-04-01 19:08:41 UTC (rev 3247)
+++ mgmt/trunk/mint/python/mint/update.py 2009-04-01 19:21:13 UTC (rev 3248)
@@ -214,7 +214,7 @@
self.processTimestamp("qmfUpdateTime", timestamps[0], attrs)
- if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - timedelta(seconds=thread.model.dbExpireThread.threshold):
+ if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - timedelta(seconds=thread.model.expireThread.threshold):
# drop updates for Jobs that are older then the expiration threshold,
# since they would be deleted anyway in the next run of the db expiration thread
log.debug("Property update is older than expiration threshold; skipping it")
@@ -390,12 +390,12 @@
def process(self, thread):
cursor = thread.cursor()
- attrs = self.model.dbExpireThread.attrs
+ attrs = self.model.expireThread.attrs
total = 0
thread.commit()
- for op in self.model.dbExpireThread.ops:
+ for op in self.model.expireThread.ops:
log.debug("Running expire op %s", op)
count = op.execute(cursor, attrs)
Added: mgmt/trunk/wooly/etc/module.profile
===================================================================
--- mgmt/trunk/wooly/etc/module.profile (rev 0)
+++ mgmt/trunk/wooly/etc/module.profile 2009-04-01 19:21:13 UTC (rev 3248)
@@ -0,0 +1,3 @@
+export PYTHONPATH="${PWD}/python:${PYTHONPATH}"
+export PATH="${PWD}/bin:${PATH}"
+export WOOLY_DEMO_HOME="${PWD}/instance"
Copied: mgmt/trunk/wooly/instance (from rev 3246, mgmt/trunk/wooly/wooly-demo-instance)
Property changes on: mgmt/trunk/wooly/instance
___________________________________________________________________
Name: svn:mergeinfo
+
15 years, 9 months
rhmessaging commits: r3247 - mgmt/trunk/cumin/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-04-01 15:08:41 -0400 (Wed, 01 Apr 2009)
New Revision: 3247
Added:
mgmt/trunk/cumin/bin/cumin-admin-test
Log:
Add a test to exercise the cumin-admin command
Added: mgmt/trunk/cumin/bin/cumin-admin-test
===================================================================
--- mgmt/trunk/cumin/bin/cumin-admin-test (rev 0)
+++ mgmt/trunk/cumin/bin/cumin-admin-test 2009-04-01 19:08:41 UTC (rev 3247)
@@ -0,0 +1,35 @@
+#!/bin/bash
+
+id="$RANDOM"
+code=0
+tmpdir=$(mktemp -d)
+trap "rm -rf ${tmpdir}" EXIT
+
+while read command; do
+ echo -n "Testing command '$command'..."
+
+ $command &> "${tmpdir}/output"
+
+ if [[ $? == 0 ]]; then
+ echo " OK"
+ else
+ echo
+ echo "Command failed with exit code $?"
+ echo "Output:"
+ cat "${tmpdir}/output"
+ code=1
+ fi
+done <<EOF
+cumin-admin --help
+cumin-admin add-user "$id" changeme
+cumin-admin assign "$id" admin
+cumin-admin unassign "$id" admin
+cumin-admin list-users
+cumin-admin remove-user "$id" --force
+cumin-admin add-qmf-server "$id" "$id"
+cumin-admin list-qmf-servers
+cumin-admin remove-qmf-server "$id"
+cumin-admin list-roles
+EOF
+
+exit "$code"
Property changes on: mgmt/trunk/cumin/bin/cumin-admin-test
___________________________________________________________________
Name: svn:executable
+ *
15 years, 9 months