rhmessaging commits: r3773 - in mgmt/trunk/cumin/python/cumin: grid and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 13:54:29 -0500 (Fri, 08 Jan 2010)
New Revision: 3773
Modified:
mgmt/trunk/cumin/python/cumin/grid/model.py
mgmt/trunk/cumin/python/cumin/messaging/model.py
mgmt/trunk/cumin/python/cumin/model.py
Log:
Adapt cumin method calls to the new pattern
Modified: mgmt/trunk/cumin/python/cumin/grid/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/model.py 2010-01-08 18:53:58 UTC (rev 3772)
+++ mgmt/trunk/cumin/python/cumin/grid/model.py 2010-01-08 18:54:29 UTC (rev 3773)
@@ -77,7 +77,7 @@
# "User": {"TYPE": self.STRING_TYPE,
# "VALUE": condor_string("example(a)example.com")}
- scheduler.Submit(self.app.model.mint.model, completion, ad, None)
+ scheduler.Submit(completion, ad, None)
def condor_string(string):
return "\"%s\"" % string
@@ -105,7 +105,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Start(self.app.model.mint.model, completion, "NEGOTIATOR")
+ master.Start(completion, "NEGOTIATOR")
class NegotiatorSetStartTask(SetTask):
def __init__(self, app, cls):
@@ -140,7 +140,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Stop(self.app.model.mint.model, completion, "NEGOTIATOR")
+ master.Stop(completion, "NEGOTIATOR")
class NegotiatorSetStopTask(SetTask):
def __init__(self, app, cls):
@@ -168,12 +168,12 @@
def do_invoke(self, completion, session, negotiator, name, max):
assert isinstance(negotiator, Negotiator)
- negotiator.SetLimit(self.app.model.mint.model, completion, name, max)
+ negotiator.SetLimit(completion, name, max)
def completion():
pass
- negotiator.Reconfig(self.app.model.mint.model, completion)
+ negotiator.Reconfig(completion)
class CollectorStartTask(QmfTask):
def __init__(self, app, cls):
@@ -198,7 +198,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Start(self.app.model.mint.model, completion, "COLLECTOR")
+ master.Start(completion, "COLLECTOR")
class CollectorSetStartTask(SetTask):
def __init__(self, app, cls):
@@ -233,7 +233,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Stop(self.app.model.mint.model, completion, "COLLECTOR")
+ master.Stop(completion, "COLLECTOR")
class CollectorSetStopTask(SetTask):
def __init__(self, app, cls):
@@ -268,7 +268,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Start(self.app.model.mint.model, completion, "SCHEDD")
+ master.Start(completion, "SCHEDD")
class SchedulerSetStartTask(SetTask):
def __init__(self, app, cls):
@@ -303,7 +303,7 @@
master = Master.select("System = '%s'" % system_name)[0]
except IndexError:
raise Exception("Master daemon not running")
- master.Stop(self.app.model.mint.model, completion, "SCHEDD")
+ master.Stop(completion, "SCHEDD")
class SchedulerSetStopTask(SetTask):
def __init__(self, app, cls):
@@ -355,7 +355,7 @@
def do_invoke(self, completion, session, job, reason, scheduler):
assert isinstance(scheduler, Scheduler)
- scheduler.Hold(self.app.model.mint.model, completion, job, reason)
+ scheduler.Hold(completion, job, reason)
class JobSetHoldTask(JobSetBaseTask):
def __init__(self, app, cls):
@@ -371,7 +371,7 @@
def do_invoke(self, completion, session, job, reason, scheduler):
assert isinstance(scheduler, Scheduler)
- scheduler.Release(self.app.model.mint.model, completion, job, reason)
+ scheduler.Release(completion, job, reason)
class JobSetReleaseTask(JobSetBaseTask):
def __init__(self, app, cls):
@@ -387,7 +387,7 @@
def do_invoke(self, completion, session, job, reason, scheduler):
assert isinstance(scheduler, Scheduler)
- scheduler.Remove(self.app.model.mint.model, completion, job, reason)
+ scheduler.Remove(completion, job, reason)
class JobSetRemoveTask(JobSetBaseTask):
def __init__(self, app, cls):
@@ -403,7 +403,7 @@
def do_invoke(self, completion, session, job, name, value, scheduler):
assert isinstance(scheduler, Scheduler)
- scheduler.SetAttribute(self.app.model.mint.model, completion, job, name, str(value))
+ scheduler.SetAttribute(completion, job, name, str(value))
def get_title(self, session):
return "Set Job Attribute"
@@ -424,9 +424,9 @@
assert isinstance(negotiator, Negotiator)
if group == "Reconfig":
- negotiator.Reconfig(self.app.model.mint.model, completion)
+ negotiator.Reconfig(completion)
else:
- negotiator.SetRawConfig(self.app.model.mint.model, completion, group, value)
+ negotiator.SetRawConfig(completion, group, value)
class NegotiatorAddGroupTask(NegotiatorGroupTask):
def __init__(self, app, cls):
Modified: mgmt/trunk/cumin/python/cumin/messaging/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-08 18:53:58 UTC (rev 3772)
+++ mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-08 18:54:29 UTC (rev 3773)
@@ -34,7 +34,7 @@
raise Exception \
("Cannot close management connection %s" % conn.address)
- conn.close(self.app.model.mint.model, completion)
+ conn.close(completion)
class ConnectionSetCloseTask(SetTask):
def __init__(self, app, cls):
@@ -54,7 +54,7 @@
return "Detach"
def do_invoke(self, completion, session, sess):
- sess.detach(self.app.model.mint.model, completion)
+ sess.detach(completion)
class SessionSetDetachTask(SetTask):
def __init__(self, app, cls):
@@ -71,7 +71,7 @@
return "Close"
def do_invoke(self, completion, session, sess):
- sess.close(self.app.model.mint.model, completion)
+ sess.close(completion)
class SessionSetCloseTask(SetTask):
def __init__(self, app, cls):
@@ -149,7 +149,7 @@
assert isinstance(queue, Queue)
- queue.purge(self.app.model.mint.model, completion, count)
+ queue.purge(completion, count)
class QueueSetPurgeTask(SetTask):
def __init__(self, app, cls):
@@ -226,10 +226,9 @@
self.form.queue.set(session, queue)
def do_invoke(self, completion, session, queue, dest_queue, count):
- model = self.app.model.mint.model
broker = queue.vhost.broker
broker.queueMoveMessages \
- (model, completion, queue.name, dest_queue.name, count)
+ (completion, queue.name, dest_queue.name, count)
class ExchangeAddTask(Task):
MSG_SEQUENCE = "qpid.msg_sequence"
@@ -322,9 +321,8 @@
else:
mech = "PLAIN"
- broker.connect(self.app.model.mint.model, completion,
- host, port, durable, mech, username, password,
- transport)
+ broker.connect(completion, host, port, durable, mech,
+ username, password, transport)
class LinkRemoveTask(QmfTask):
def __init__(self, app, cls):
@@ -339,7 +337,7 @@
self.form.object.set(session, link)
def do_invoke(self, completion, session, link):
- link.close(self.app.model.mint.model, completion)
+ link.close(completion)
class LinkSetRemoveTask(SetTask):
def __init__(self, app, cls):
@@ -368,8 +366,7 @@
def do_invoke(self, completion, session, link, exchange, key, tag,
dynamic, sync, excludes):
- link.bridge(self.app.model.mint.model, completion,
- link.durable, exchange.name, exchange.name,
+ link.bridge(completion, link.durable, exchange.name, exchange.name,
key, tag, excludes, False, False, dynamic, sync)
class RouteRemoveTask(QmfTask):
@@ -377,7 +374,7 @@
return "Remove"
def do_invoke(self, completion, session, route):
- route.close(self.app.model.mint.model, completion)
+ route.close(completion)
class RouteSetRemoveTask(SetTask):
def __init__(self, app, cls):
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 18:53:58 UTC (rev 3772)
+++ mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 18:54:29 UTC (rev 3773)
@@ -2075,7 +2075,6 @@
self.got_data = False
self.error = False
self.status = None
- self.model = app.model.mint.model
def get_completion(self):
def completion(status, data):
@@ -2122,7 +2121,7 @@
self.data = {'Value': None}
def execute(self, negotiator, config_name, timeout=5):
- negotiator.GetRawConfig(self.model, self.get_completion(), config_name, None)
+ negotiator.GetRawConfig(self.get_completion(), config_name, None)
return self.do_wait(timeout)
class FetchRawConfigSet(QmfCallSet):
@@ -2130,7 +2129,7 @@
default = {'Value': 0}
for group in groups:
call = self.add_call(group, default)
- negotiator.GetRawConfig(call.model, call.get_completion(), prepend+group, None)
+ negotiator.GetRawConfig(call.get_completion(), prepend+group, None)
return self.do_wait(timeout)
@@ -2140,7 +2139,7 @@
self.data = {'JobAd': {"":{"VALUE":"", "TYPE":0}}}
def execute(self, scheduler, jobId, timeout=10):
- scheduler.GetAd(self.model, self.get_completion(), jobId, None)
+ scheduler.GetAd(self.get_completion(), jobId, None)
return self.do_wait(timeout)
class FetchJobOutput(QmfCall):
@@ -2149,7 +2148,7 @@
self.data = {'Data': ""}
def execute(self, scheduler, jobId, file, start, end, timeout=10):
- scheduler.Fetch(self.model, self.get_completion(), jobId, file, start, end, None)
+ scheduler.Fetch(self.get_completion(), jobId, file, start, end, None)
return self.do_wait(timeout)
class CuminScheduler(RemoteClass):
@@ -2448,12 +2447,12 @@
def completion(status, data):
self.data = data["Limits"]
- self.negotiator.GetLimits(self.model.mint.model, completion, None)
+ self.negotiator.GetLimits(completion, None)
def delete(self):
del self.model.limits_by_negotiator[self.negotiator]
- super(SubmissionJobStore, self).delete()
+ super(NegotiatorLimitStore, self).delete()
class SubmissionJobStore(ObjectStore):
def __init__(self, model, submission):
@@ -2467,8 +2466,7 @@
scheduler = self.submission.scheduler
- scheduler.GetJobs \
- (self.model.mint.model, completion, self.submission.Name, None)
+ scheduler.GetJobs(completion, self.submission.Name, None)
def delete(self):
del self.model.jobs_by_submission[self.submission]
14 years, 12 months
rhmessaging commits: r3772 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 13:53:58 -0500 (Fri, 08 Jan 2010)
New Revision: 3772
Added:
mgmt/trunk/mint/python/mint/schemalocal.py
Log:
Forgot to commit this new file with change 3771
Added: mgmt/trunk/mint/python/mint/schemalocal.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemalocal.py (rev 0)
+++ mgmt/trunk/mint/python/mint/schemalocal.py 2010-01-08 18:53:58 UTC (rev 3772)
@@ -0,0 +1,75 @@
+from sqlobject import *
+
+from mint.util import *
+
+class Subject(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(unique=True, notNone=True)
+ password = StringCol()
+ lastChallenged = TimestampCol(default=None)
+ lastLoggedIn = TimestampCol(default=None)
+ lastLoggedOut = TimestampCol(default=None)
+ roles = SQLRelatedJoin("Role", intermediateTable="subject_role_mapping",
+ createRelatedTable=False)
+
+ def getByName(cls, name):
+ try:
+ return Subject.selectBy(name=name)[0]
+ except IndexError:
+ pass
+
+ getByName = classmethod(getByName)
+
+class Role(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(unique=True, notNone=True)
+ subjects = SQLRelatedJoin("Subject",
+ intermediateTable="subject_role_mapping",
+ createRelatedTable=False)
+
+ def getByName(cls, name):
+ try:
+ return Role.selectBy(name=name)[0]
+ except IndexError:
+ pass
+
+ getByName = classmethod(getByName)
+
+class SubjectRoleMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ subject = ForeignKey("Subject", notNull=True, cascade=True)
+ role = ForeignKey("Role", notNull=True, cascade=True)
+ unique = DatabaseIndex(subject, role, unique=True)
+
+class ObjectNotFound(Exception):
+ pass
+
+class MintInfo(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ version = StringCol(default="0.1", notNone=True)
+
+class BrokerGroup(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ name = StringCol(unique=True, notNone=True)
+ brokers = SQLRelatedJoin("Broker",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
+
+class BrokerGroupMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+
+ broker = ForeignKey("Broker", notNull=True, cascade=True)
+ brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+ unique = DatabaseIndex(broker, brokerGroup, unique=True)
+
14 years, 12 months
rhmessaging commits: r3771 - in mgmt/trunk/mint: sql and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 13:48:28 -0500 (Fri, 08 Jan 2010)
New Revision: 3771
Modified:
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/python/mint/schemaparser.py
mgmt/trunk/mint/sql/Makefile
mgmt/trunk/mint/sql/schema.sql
Log:
Move callMethod to MintAgent and simplify schema methods
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-08 16:28:35 UTC (rev 3770)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-08 18:48:28 UTC (rev 3771)
@@ -1,108 +1,25 @@
from parsley.collectionsex import defaultdict
from parsley.threadingex import Lifecycle
-from qpid.datatypes import UUID
-from qpid.util import URL
from sqlobject import *
from mint import update
from mint.cache import MintCache
from mint.schema import *
+from mint.schemalocal import *
from mint.util import *
+import mint.schema
+
from qmf.console import ClassKey, Console, Session
log = logging.getLogger("mint.model")
-thisModule = __import__(__name__)
-
-for item in dir(mint.schema):
- cls = getattr(mint.schema, item)
-
- try:
- if issubclass(cls, SQLObject) and cls is not SQLObject:
- setattr(thisModule, item, cls)
- except TypeError:
- pass
-
-class Subject(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(unique=True, notNone=True)
- password = StringCol()
- lastChallenged = TimestampCol(default=None)
- lastLoggedIn = TimestampCol(default=None)
- lastLoggedOut = TimestampCol(default=None)
- roles = SQLRelatedJoin("Role", intermediateTable="subject_role_mapping",
- createRelatedTable=False)
-
- def getByName(cls, name):
- try:
- return Subject.selectBy(name=name)[0]
- except IndexError:
- pass
-
- getByName = classmethod(getByName)
-
-class Role(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(unique=True, notNone=True)
- subjects = SQLRelatedJoin("Subject",
- intermediateTable="subject_role_mapping",
- createRelatedTable=False)
-
- def getByName(cls, name):
- try:
- return Role.selectBy(name=name)[0]
- except IndexError:
- pass
-
- getByName = classmethod(getByName)
-
-class SubjectRoleMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- subject = ForeignKey("Subject", notNull=True, cascade=True)
- role = ForeignKey("Role", notNull=True, cascade=True)
- unique = DatabaseIndex(subject, role, unique=True)
-
-class ObjectNotFound(Exception):
- pass
-
-class MintInfo(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- version = StringCol(default="0.1", notNone=True)
-
-class BrokerGroup(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- name = StringCol(unique=True, notNone=True)
- brokers = SQLRelatedJoin("Broker",
- intermediateTable="broker_group_mapping",
- createRelatedTable=False)
-
-class BrokerGroupMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
-
- broker = ForeignKey("Broker", notNull=True, cascade=True)
- brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
- unique = DatabaseIndex(broker, brokerGroup, unique=True)
-
class MintModel(Console, Lifecycle):
- staticInstance = None
-
def __init__(self, app):
self.log = log
- assert MintModel.staticInstance is None
- MintModel.staticInstance = self
+ assert mint.schema.model is None
+ mint.schema.model = self
self.app = app
@@ -288,25 +205,6 @@
""" Invoked when an event is raised. """
pass
- def callMethod(self, mintObject, methodName, callback, args):
- classKey = ClassKey(mintObject.qmfClassKey)
- objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
-
- self.lock.acquire()
- try:
- agent = self.agents[mintObject.qmfAgentId]
- broker = agent.agent.getBroker()
-
- seq = self.qmfSession._sendMethodRequest \
- (broker, classKey, objectId, methodName, args)
-
- if seq is not None:
- self.outstandingMethodCalls[seq] = callback
-
- return seq
- finally:
- self.lock.release()
-
def methodResponse(self, broker, seq, response):
log.debug("Method response for request %i received from %s", seq, broker)
log.debug("Response: %s", response)
@@ -333,5 +231,23 @@
# qmfObjectId => list of ModelUpdate objects
self.deferredUpdates = defaultdict(list)
+ def callMethod(self, mintObject, methodName, callback, args):
+ classKey = ClassKey(mintObject.qmfClassKey)
+ objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
+
+ self.model.lock.acquire()
+ try:
+ broker = self.agent.getBroker()
+
+ seq = self.model.qmfSession._sendMethodRequest \
+ (broker, classKey, objectId, methodName, args)
+
+ if seq is not None:
+ self.model.outstandingMethodCalls[seq] = callback
+
+ return seq
+ finally:
+ self.model.lock.release()
+
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.id)
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2010-01-08 16:28:35 UTC (rev 3770)
+++ mgmt/trunk/mint/python/mint/schema.py 2010-01-08 18:48:28 UTC (rev 3771)
@@ -1,13 +1,8 @@
-import mint
from sqlobject import *
-from datetime import datetime
-from qmf.console import ObjectId
-class Pool(SQLObject):
- class sqlmeta:
- lazyUpdate = True
- sourceId = StringCol(default=None, unique=True)
+from mint.util import *
+model = None
class Slot(SQLObject):
class sqlmeta:
@@ -157,96 +152,166 @@
DaemonStartTime = TimestampCol(default=None)
- def Submit(self, model, callback, Ad, Id):
- actualArgs = list()
+ def Submit(self, callback, Ad, Id):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Ad is not None:
- actualArgs.append(Ad)
+ args.append(Ad)
if Id is not None:
- actualArgs.append(Id)
- model.callMethod(self, "Submit", callback, args=actualArgs)
+ args.append(Id)
- def GetAd(self, model, callback, Id, JobAd):
- actualArgs = list()
+ agent.callMethod(self, "Submit", callback, args)
+
+ def GetAd(self, callback, Id, JobAd):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Id is not None:
- actualArgs.append(Id)
+ args.append(Id)
if JobAd is not None:
- actualArgs.append(JobAd)
- model.callMethod(self, "GetAd", callback, args=actualArgs)
+ args.append(JobAd)
- def SetAttribute(self, model, callback, Id, Name, Value):
- actualArgs = list()
+ agent.callMethod(self, "GetAd", callback, args)
+
+ def SetAttribute(self, callback, Id, Name, Value):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Id is not None:
- actualArgs.append(Id)
+ args.append(Id)
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if Value is not None:
- actualArgs.append(Value)
- model.callMethod(self, "SetAttribute", callback, args=actualArgs)
+ args.append(Value)
- def Hold(self, model, callback, Id, Reason):
- actualArgs = list()
+ agent.callMethod(self, "SetAttribute", callback, args)
+
+ def Hold(self, callback, Id, Reason):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Id is not None:
- actualArgs.append(Id)
+ args.append(Id)
if Reason is not None:
- actualArgs.append(Reason)
- model.callMethod(self, "Hold", callback, args=actualArgs)
+ args.append(Reason)
- def Release(self, model, callback, Id, Reason):
- actualArgs = list()
+ agent.callMethod(self, "Hold", callback, args)
+
+ def Release(self, callback, Id, Reason):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Id is not None:
- actualArgs.append(Id)
+ args.append(Id)
if Reason is not None:
- actualArgs.append(Reason)
- model.callMethod(self, "Release", callback, args=actualArgs)
+ args.append(Reason)
- def Remove(self, model, callback, Id, Reason):
- actualArgs = list()
+ agent.callMethod(self, "Release", callback, args)
+
+ def Remove(self, callback, Id, Reason):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Id is not None:
- actualArgs.append(Id)
+ args.append(Id)
if Reason is not None:
- actualArgs.append(Reason)
- model.callMethod(self, "Remove", callback, args=actualArgs)
+ args.append(Reason)
- def Fetch(self, model, callback, Id, File, Start, End, Data):
- actualArgs = list()
+ agent.callMethod(self, "Remove", callback, args)
+
+ def Fetch(self, callback, Id, File, Start, End, Data):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Id is not None:
- actualArgs.append(Id)
+ args.append(Id)
if File is not None:
- actualArgs.append(File)
+ args.append(File)
if Start is not None:
- actualArgs.append(Start)
+ args.append(Start)
if End is not None:
- actualArgs.append(End)
+ args.append(End)
if Data is not None:
- actualArgs.append(Data)
- model.callMethod(self, "Fetch", callback, args=actualArgs)
+ args.append(Data)
- def GetStates(self, model, callback, Submission, State, Count):
- actualArgs = list()
+ agent.callMethod(self, "Fetch", callback, args)
+
+ def GetStates(self, callback, Submission, State, Count):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Submission is not None:
- actualArgs.append(Submission)
+ args.append(Submission)
if State is not None:
- actualArgs.append(State)
+ args.append(State)
if Count is not None:
- actualArgs.append(Count)
- model.callMethod(self, "GetStates", callback, args=actualArgs)
+ args.append(Count)
- def GetJobs(self, model, callback, Submission, Jobs):
- actualArgs = list()
+ agent.callMethod(self, "GetStates", callback, args)
+
+ def GetJobs(self, callback, Submission, Jobs):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Submission is not None:
- actualArgs.append(Submission)
+ args.append(Submission)
if Jobs is not None:
- actualArgs.append(Jobs)
- model.callMethod(self, "GetJobs", callback, args=actualArgs)
+ args.append(Jobs)
- def echo(self, model, callback, sequence, body):
- actualArgs = list()
+ agent.callMethod(self, "GetJobs", callback, args)
+
+ def echo(self, callback, sequence, body):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if sequence is not None:
- actualArgs.append(sequence)
+ args.append(sequence)
if body is not None:
- actualArgs.append(body)
- model.callMethod(self, "echo", callback, args=actualArgs)
+ args.append(body)
+ agent.callMethod(self, "echo", callback, args)
+
class SchedulerStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -329,72 +394,135 @@
DaemonStartTime = TimestampCol(default=None)
- def GetLimits(self, model, callback, Limits):
- actualArgs = list()
+ def GetLimits(self, callback, Limits):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Limits is not None:
- actualArgs.append(Limits)
- model.callMethod(self, "GetLimits", callback, args=actualArgs)
+ args.append(Limits)
- def SetLimit(self, model, callback, Name, Max):
- actualArgs = list()
+ agent.callMethod(self, "GetLimits", callback, args)
+
+ def SetLimit(self, callback, Name, Max):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if Max is not None:
- actualArgs.append(Max)
- model.callMethod(self, "SetLimit", callback, args=actualArgs)
+ args.append(Max)
- def GetStats(self, model, callback, Name, Ad):
- actualArgs = list()
+ agent.callMethod(self, "SetLimit", callback, args)
+
+ def GetStats(self, callback, Name, Ad):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if Ad is not None:
- actualArgs.append(Ad)
- model.callMethod(self, "GetStats", callback, args=actualArgs)
+ args.append(Ad)
- def SetPriority(self, model, callback, Name, Priority):
- actualArgs = list()
+ agent.callMethod(self, "GetStats", callback, args)
+
+ def SetPriority(self, callback, Name, Priority):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if Priority is not None:
- actualArgs.append(Priority)
- model.callMethod(self, "SetPriority", callback, args=actualArgs)
+ args.append(Priority)
- def SetPriorityFactor(self, model, callback, Name, PriorityFactor):
- actualArgs = list()
+ agent.callMethod(self, "SetPriority", callback, args)
+
+ def SetPriorityFactor(self, callback, Name, PriorityFactor):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if PriorityFactor is not None:
- actualArgs.append(PriorityFactor)
- model.callMethod(self, "SetPriorityFactor", callback, args=actualArgs)
+ args.append(PriorityFactor)
- def SetUsage(self, model, callback, Name, Usage):
- actualArgs = list()
+ agent.callMethod(self, "SetPriorityFactor", callback, args)
+
+ def SetUsage(self, callback, Name, Usage):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if Usage is not None:
- actualArgs.append(Usage)
- model.callMethod(self, "SetUsage", callback, args=actualArgs)
+ args.append(Usage)
- def GetRawConfig(self, model, callback, Name, Value):
- actualArgs = list()
+ agent.callMethod(self, "SetUsage", callback, args)
+
+ def GetRawConfig(self, callback, Name, Value):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if Value is not None:
- actualArgs.append(Value)
- model.callMethod(self, "GetRawConfig", callback, args=actualArgs)
+ args.append(Value)
- def SetRawConfig(self, model, callback, Name, Value):
- actualArgs = list()
+ agent.callMethod(self, "GetRawConfig", callback, args)
+
+ def SetRawConfig(self, callback, Name, Value):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Name is not None:
- actualArgs.append(Name)
+ args.append(Name)
if Value is not None:
- actualArgs.append(Value)
- model.callMethod(self, "SetRawConfig", callback, args=actualArgs)
+ args.append(Value)
- def Reconfig(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "Reconfig", callback, args=actualArgs)
+ agent.callMethod(self, "SetRawConfig", callback, args)
+ def Reconfig(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
+
+ agent.callMethod(self, "Reconfig", callback, args)
+
class NegotiatorStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -477,18 +605,32 @@
DaemonStartTime = TimestampCol(default=None)
- def Start(self, model, callback, Subsystem):
- actualArgs = list()
+ def Start(self, callback, Subsystem):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Subsystem is not None:
- actualArgs.append(Subsystem)
- model.callMethod(self, "Start", callback, args=actualArgs)
+ args.append(Subsystem)
- def Stop(self, model, callback, Subsystem):
- actualArgs = list()
+ agent.callMethod(self, "Start", callback, args)
+
+ def Stop(self, callback, Subsystem):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if Subsystem is not None:
- actualArgs.append(Subsystem)
- model.callMethod(self, "Stop", callback, args=actualArgs)
+ args.append(Subsystem)
+ agent.callMethod(self, "Stop", callback, args)
+
class MasterStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -601,11 +743,18 @@
lastAclLoad = TimestampCol(default=None)
- def reloadACLFile(self, model, callback):
+ def reloadACLFile(self, callback):
"""Reload the ACL file"""
- actualArgs = list()
- model.callMethod(self, "reloadACLFile", callback, args=actualArgs)
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+ args = list()
+
+
+ agent.callMethod(self, "reloadACLFile", callback, args)
+
class AclStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -642,16 +791,30 @@
memberIDs = StringCol(default=None)
- def stopClusterNode(self, model, callback, brokerId):
- actualArgs = list()
+ def stopClusterNode(self, callback, brokerId):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if brokerId is not None:
- actualArgs.append(brokerId)
- model.callMethod(self, "stopClusterNode", callback, args=actualArgs)
+ args.append(brokerId)
- def stopFullCluster(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "stopFullCluster", callback, args=actualArgs)
+ agent.callMethod(self, "stopClusterNode", callback, args)
+ def stopFullCluster(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
+
+ agent.callMethod(self, "stopFullCluster", callback, args)
+
class ClusterStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -737,13 +900,20 @@
dataFileSize = BigIntCol(default=None)
- def expand(self, model, callback, by):
+ def expand(self, callback, by):
"""Increase number of files allocated for this journal"""
- actualArgs = list()
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if by is not None:
- actualArgs.append(by)
- model.callMethod(self, "expand", callback, args=actualArgs)
+ args.append(by)
+ agent.callMethod(self, "expand", callback, args)
+
class JournalStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -840,45 +1010,66 @@
dataDir = StringCol(default=None)
- def echo(self, model, callback, sequence, body):
+ def echo(self, callback, sequence, body):
"""Request a response to test the path to the management broker"""
- actualArgs = list()
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if sequence is not None:
- actualArgs.append(sequence)
+ args.append(sequence)
if body is not None:
- actualArgs.append(body)
- model.callMethod(self, "echo", callback, args=actualArgs)
+ args.append(body)
- def connect(self, model, callback, host, port, durable, authMechanism, username, password, transport):
+ agent.callMethod(self, "echo", callback, args)
+
+ def connect(self, callback, host, port, durable, authMechanism, username, password, transport):
"""Establish a connection to another broker"""
- actualArgs = list()
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if host is not None:
- actualArgs.append(host)
+ args.append(host)
if port is not None:
- actualArgs.append(port)
+ args.append(port)
if durable is not None:
- actualArgs.append(durable)
+ args.append(durable)
if authMechanism is not None:
- actualArgs.append(authMechanism)
+ args.append(authMechanism)
if username is not None:
- actualArgs.append(username)
+ args.append(username)
if password is not None:
- actualArgs.append(password)
+ args.append(password)
if transport is not None:
- actualArgs.append(transport)
- model.callMethod(self, "connect", callback, args=actualArgs)
+ args.append(transport)
- def queueMoveMessages(self, model, callback, srcQueue, destQueue, qty):
+ agent.callMethod(self, "connect", callback, args)
+
+ def queueMoveMessages(self, callback, srcQueue, destQueue, qty):
"""Move messages from one queue to another"""
- actualArgs = list()
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if srcQueue is not None:
- actualArgs.append(srcQueue)
+ args.append(srcQueue)
if destQueue is not None:
- actualArgs.append(destQueue)
+ args.append(destQueue)
if qty is not None:
- actualArgs.append(qty)
- model.callMethod(self, "queueMoveMessages", callback, args=actualArgs)
+ args.append(qty)
+ agent.callMethod(self, "queueMoveMessages", callback, args)
+
class BrokerStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -974,13 +1165,20 @@
exchange = ForeignKey('Exchange', cascade='null', default=None)
- def purge(self, model, callback, request):
+ def purge(self, callback, request):
"""Discard all or some messages on a queue"""
- actualArgs = list()
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if request is not None:
- actualArgs.append(request)
- model.callMethod(self, "purge", callback, args=actualArgs)
+ args.append(request)
+ agent.callMethod(self, "purge", callback, args)
+
class QueueStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -1155,10 +1353,17 @@
remoteParentPid = BigIntCol(default=None)
- def close(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "close", callback, args=actualArgs)
+ def close(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+ args = list()
+
+
+ agent.callMethod(self, "close", callback, args)
+
class ClientConnectionStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -1195,35 +1400,49 @@
durable = BoolCol(default=None)
- def close(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "close", callback, args=actualArgs)
+ def close(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
- def bridge(self, model, callback, durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic, sync):
+ args = list()
+
+
+ agent.callMethod(self, "close", callback, args)
+
+ def bridge(self, callback, durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic, sync):
"""Bridge messages over the link"""
- actualArgs = list()
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
if durable is not None:
- actualArgs.append(durable)
+ args.append(durable)
if src is not None:
- actualArgs.append(src)
+ args.append(src)
if dest is not None:
- actualArgs.append(dest)
+ args.append(dest)
if key is not None:
- actualArgs.append(key)
+ args.append(key)
if tag is not None:
- actualArgs.append(tag)
+ args.append(tag)
if excludes is not None:
- actualArgs.append(excludes)
+ args.append(excludes)
if srcIsQueue is not None:
- actualArgs.append(srcIsQueue)
+ args.append(srcIsQueue)
if srcIsLocal is not None:
- actualArgs.append(srcIsLocal)
+ args.append(srcIsLocal)
if dynamic is not None:
- actualArgs.append(dynamic)
+ args.append(dynamic)
if sync is not None:
- actualArgs.append(sync)
- model.callMethod(self, "bridge", callback, args=actualArgs)
+ args.append(sync)
+ agent.callMethod(self, "bridge", callback, args)
+
class LinkStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -1264,10 +1483,17 @@
syncRsv = IntCol(default=None)
- def close(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "close", callback, args=actualArgs)
+ def close(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+ args = list()
+
+
+ agent.callMethod(self, "close", callback, args)
+
class BridgeStats(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -1302,22 +1528,50 @@
maxClientRate = BigIntCol(default=None)
- def solicitAck(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "solicitAck", callback, args=actualArgs)
+ def solicitAck(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
- def detach(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "detach", callback, args=actualArgs)
+ args = list()
- def resetLifespan(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "resetLifespan", callback, args=actualArgs)
- def close(self, model, callback):
- actualArgs = list()
- model.callMethod(self, "close", callback, args=actualArgs)
+ agent.callMethod(self, "solicitAck", callback, args)
+ def detach(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
+
+ agent.callMethod(self, "detach", callback, args)
+
+ def resetLifespan(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
+
+ agent.callMethod(self, "resetLifespan", callback, args)
+
+ def close(self, callback):
+ try:
+ agent = model.agents[self.qmfAgentId]
+ except KeyError:
+ raise Exception("Agent not found")
+
+ args = list()
+
+
+ agent.callMethod(self, "close", callback, args)
+
class SessionStats(SQLObject):
class sqlmeta:
lazyUpdate = True
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2010-01-08 16:28:35 UTC (rev 3770)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2010-01-08 18:48:28 UTC (rev 3771)
@@ -177,21 +177,25 @@
else:
comment = ""
formalArgs = ", "
- actualArgs = " actualArgs = list()\n"
+ actualArgs = " args = list()\n\n"
for arg in elem.query["arg"]:
formalArgs += "%s, " % (arg["@name"])
actualArgs += " if %s is not None:\n" % (arg["@name"])
- actualArgs += " actualArgs.append(%s)\n" % (arg["@name"])
+ actualArgs += " args.append(%s)\n" % (arg["@name"])
if (formalArgs != ", "):
formalArgs = formalArgs[:-2]
else:
formalArgs = ""
- self.pythonOutput += "\n def %s(self, model, callback%s):\n" % (elem["@name"], formalArgs)
+ self.pythonOutput += "\n def %s(self, callback%s):\n" % (elem["@name"], formalArgs)
self.pythonOutput += comment
- self.pythonOutput += actualArgs
- self.pythonOutput += " model.callMethod(self, \"%s\", " % elem["@name"]
- self.pythonOutput += "callback, args=actualArgs)\n"
+ self.pythonOutput += " try:\n"
+ self.pythonOutput += " agent = model.agents[self.qmfAgentId]\n"
+ self.pythonOutput += " except KeyError:\n"
+ self.pythonOutput += " raise Exception(\"Agent not found\")\n\n"
+ self.pythonOutput += actualArgs + "\n"
+ self.pythonOutput += " agent.callMethod(self, \"%s\", " % elem["@name"]
+ self.pythonOutput += "callback, args)\n"
def endClass(self):
if (self.additionalPythonOutput != ""):
@@ -202,14 +206,13 @@
self.currentClass = ""
def generateCode(self):
- self.pythonOutput += "import mint\n"
- self.pythonOutput += "from sqlobject import *\n"
- self.pythonOutput += "from datetime import datetime\n"
- self.pythonOutput += "from qmf.console import ObjectId\n\n"
- self.pythonOutput += "class Pool(SQLObject):\n"
- self.pythonOutput += " class sqlmeta:\n"
- self.pythonOutput += " lazyUpdate = True\n"
- self.pythonOutput += " sourceId = StringCol(default=None, unique=True)\n\n"
+# self.pythonOutput += "import mint\n\n"
+# self.pythonOutput += "from qmf.console import ObjectId\n\n"
+ self.pythonOutput += "from sqlobject import *\n\n"
+ self.pythonOutput += "from mint.util import *\n\n"
+
+ self.pythonOutput += "model = None\n"
+
self.finalPythonOutput += "\nclassToSchemaNameMap = dict()\n"
self.finalPythonOutput += "schemaNameToClassMap = dict()\n"
self.finalPythonOutput += 'schemaReservedWordsMap = {"in": "inRsv", "In": "InRsv", \n'
Modified: mgmt/trunk/mint/sql/Makefile
===================================================================
--- mgmt/trunk/mint/sql/Makefile 2010-01-08 16:28:35 UTC (rev 3770)
+++ mgmt/trunk/mint/sql/Makefile 2010-01-08 18:48:28 UTC (rev 3771)
@@ -5,7 +5,7 @@
schema: schema.sql
schema.sql: ../python/mint/*.py
- sqlobject-admin sql -m mint.model -m mint.schema -c ${dsn} | sed -e '1,2d' > schema.sql
+ sqlobject-admin sql -m mint.schema -m mint.schemalocal -c ${dsn} | sed -e '1,2d' > schema.sql
clean:
rm -f schema.sql
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2010-01-08 16:28:35 UTC (rev 3770)
+++ mgmt/trunk/mint/sql/schema.sql 2010-01-08 18:48:28 UTC (rev 3771)
@@ -1,41 +1,3 @@
-CREATE TABLE broker_group (
- id SERIAL PRIMARY KEY,
- name TEXT NOT NULL UNIQUE
-);
-
-CREATE TABLE broker_group_mapping (
- id SERIAL PRIMARY KEY,
- broker_id INT NOT NULL,
- broker_group_id INT NOT NULL
-);
-CREATE UNIQUE INDEX broker_group_mapping_unique ON broker_group_mapping (broker_id, broker_group_id);
-
-CREATE TABLE mint_info (
- id SERIAL PRIMARY KEY,
- version TEXT NOT NULL
-);
-
-CREATE TABLE role (
- id SERIAL PRIMARY KEY,
- name TEXT NOT NULL UNIQUE
-);
-
-CREATE TABLE subject (
- id SERIAL PRIMARY KEY,
- name TEXT NOT NULL UNIQUE,
- password TEXT,
- last_challenged TIMESTAMP,
- last_logged_in TIMESTAMP,
- last_logged_out TIMESTAMP
-);
-
-CREATE TABLE subject_role_mapping (
- id SERIAL PRIMARY KEY,
- subject_id INT NOT NULL,
- role_id INT NOT NULL
-);
-CREATE UNIQUE INDEX subject_role_mapping_unique ON subject_role_mapping (subject_id, role_id);
-
CREATE TABLE acl (
id SERIAL PRIMARY KEY,
qmf_agent_id TEXT NOT NULL,
@@ -531,11 +493,6 @@
monitor_self_time TIMESTAMP
);
-CREATE TABLE pool (
- id SERIAL PRIMARY KEY,
- source_id TEXT UNIQUE
-);
-
CREATE TABLE queue (
id SERIAL PRIMARY KEY,
qmf_agent_id TEXT NOT NULL,
@@ -1010,14 +967,44 @@
vhost_id INT
);
-ALTER TABLE broker_group_mapping ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE CASCADE;
+CREATE TABLE broker_group (
+ id SERIAL PRIMARY KEY,
+ name TEXT NOT NULL UNIQUE
+);
-ALTER TABLE broker_group_mapping ADD CONSTRAINT broker_group_id_exists FOREIGN KEY (broker_group_id) REFERENCES broker_group (id) ON DELETE CASCADE;
+CREATE TABLE broker_group_mapping (
+ id SERIAL PRIMARY KEY,
+ broker_id INT NOT NULL,
+ broker_group_id INT NOT NULL
+);
+CREATE UNIQUE INDEX broker_group_mapping_unique ON broker_group_mapping (broker_id, broker_group_id);
-ALTER TABLE subject_role_mapping ADD CONSTRAINT subject_id_exists FOREIGN KEY (subject_id) REFERENCES subject (id) ON DELETE CASCADE;
+CREATE TABLE mint_info (
+ id SERIAL PRIMARY KEY,
+ version TEXT NOT NULL
+);
-ALTER TABLE subject_role_mapping ADD CONSTRAINT role_id_exists FOREIGN KEY (role_id) REFERENCES role (id) ON DELETE CASCADE;
+CREATE TABLE role (
+ id SERIAL PRIMARY KEY,
+ name TEXT NOT NULL UNIQUE
+);
+CREATE TABLE subject (
+ id SERIAL PRIMARY KEY,
+ name TEXT NOT NULL UNIQUE,
+ password TEXT,
+ last_challenged TIMESTAMP,
+ last_logged_in TIMESTAMP,
+ last_logged_out TIMESTAMP
+);
+
+CREATE TABLE subject_role_mapping (
+ id SERIAL PRIMARY KEY,
+ subject_id INT NOT NULL,
+ role_id INT NOT NULL
+);
+CREATE UNIQUE INDEX subject_role_mapping_unique ON subject_role_mapping (subject_id, role_id);
+
ALTER TABLE acl ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES acl_stats (id) ON DELETE SET NULL;
ALTER TABLE acl ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id) REFERENCES acl_stats (id) ON DELETE SET NULL;
@@ -1214,3 +1201,11 @@
ALTER TABLE vhost_stats ADD CONSTRAINT vhost_id_exists FOREIGN KEY (vhost_id) REFERENCES vhost (id) ON DELETE SET NULL;
+ALTER TABLE broker_group_mapping ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE CASCADE;
+
+ALTER TABLE broker_group_mapping ADD CONSTRAINT broker_group_id_exists FOREIGN KEY (broker_group_id) REFERENCES broker_group (id) ON DELETE CASCADE;
+
+ALTER TABLE subject_role_mapping ADD CONSTRAINT subject_id_exists FOREIGN KEY (subject_id) REFERENCES subject (id) ON DELETE CASCADE;
+
+ALTER TABLE subject_role_mapping ADD CONSTRAINT role_id_exists FOREIGN KEY (role_id) REFERENCES role (id) ON DELETE CASCADE;
+
14 years, 12 months
rhmessaging commits: r3770 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 11:28:35 -0500 (Fri, 08 Jan 2010)
New Revision: 3770
Modified:
mgmt/trunk/mint/python/mint/update.py
Log:
Fix agent ref
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-08 15:41:52 UTC (rev 3769)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-08 16:28:35 UTC (rev 3770)
@@ -381,7 +381,7 @@
if self.agent:
args["qmf_agent_id"] = self.agent.id
- op = SqlAgentDisconnect(agent)
+ op = SqlAgentDisconnect(self.agent)
op.execute(cursor, args)
class UpdateQueue(ConcurrentQueue):
14 years, 12 months
rhmessaging commits: r3769 - in mgmt/trunk: cumin/python/cumin/messaging and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 10:41:52 -0500 (Fri, 08 Jan 2010)
New Revision: 3769
Modified:
mgmt/trunk/cumin/python/cumin/main.py
mgmt/trunk/cumin/python/cumin/messaging/broker.py
mgmt/trunk/cumin/python/cumin/messaging/model.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/python/mint/cache.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/sql.py
mgmt/trunk/mint/python/mint/update.py
Log:
* Realign mint around agent objects; for now, this is faked somewhat,
but this will be the new model in qmf2
* Add a temporary hack to defer objectProp and objectStat calls; new
1.3 broker behavior will make this unnecessary
Modified: mgmt/trunk/cumin/python/cumin/main.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/main.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/main.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -133,7 +133,7 @@
def do_process(self, session):
super(OverviewFrame, self).do_process(session)
- count = len(self.app.model.mint.model.mintBrokersByUrl)
+ count = len(self.app.model.mint.model.qmfBrokers)
if count == 0:
self.mode.set(session, self.notice)
Modified: mgmt/trunk/cumin/python/cumin/messaging/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/broker.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/messaging/broker.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -84,13 +84,13 @@
return "Status"
def render_content(self, session, data):
- agentId = data["qmf_agent_id"]
- dt = self.app.model.mint.model.getLatestHeartbeat(agentId)
+ agent = self.app.model.mint.model.agents.get(data["qmf_agent_id"])
- if dt is None:
- return fmt_none()
- else:
- return fmt_datetime(dt)
+ if agent:
+ if agent.lastHeartbeat is None:
+ return fmt_none()
+ else:
+ return fmt_datetime(agent.lastHeartbeat)
class ClusterColumn(SqlTableColumn):
def render_title(self, session, data):
Modified: mgmt/trunk/cumin/python/cumin/messaging/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -26,7 +26,7 @@
session_ids = set()
- for broker in self.app.model.mint.model.mintBrokersByQmfBroker:
+ for broker in self.app.model.mint.model.qmfBrokers:
session_ids.add(broker.getSessionId())
for sess in conn.sessions:
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -124,9 +124,9 @@
def get_session_by_object(self, object):
assert object
- agent = self.mint.model.agentsById[object.qmfAgentId]
+ agent = self.mint.model.agents[object.qmfAgentId]
- return agent.getBroker().getAmqpSession()
+ return agent.agent.getBroker().getAmqpSession()
def get_negotiator_limits(self, negotiator):
self.lock.acquire()
@@ -1125,9 +1125,6 @@
prop = CuminProperty(self, "dataDir")
prop.title = "Data Directory"
- stat = self.StatusStat(self, "connection")
- stat.category = "status"
-
def init(self):
super(CuminBroker, self).init()
@@ -1146,25 +1143,6 @@
except:
return broker.name
- class StatusStat(CuminStat):
- def value_text(self, broker):
- connected = False
- if broker:
- try:
- mbroker = self.mint.model.mintBrokersById \
- [broker.qmfBrokerId]
- connected = mbroker.connected
- except KeyError:
- pass
-
- if connected:
- return "Connected"
- else:
- return "Disconnected"
-
- def rate_text(self, record):
- return ""
-
# XXX "do_" on this doesn't make sense
def do_bind(session, queue_name, binding_info):
for exchange in binding_info:
@@ -2437,7 +2415,7 @@
pass
def delete(self):
- pass
+ self.model = None
class UpdateThread(Thread):
def __init__(self, store):
@@ -2475,6 +2453,8 @@
def delete(self):
del self.model.limits_by_negotiator[self.negotiator]
+ super(SubmissionJobStore, self).delete()
+
class SubmissionJobStore(ObjectStore):
def __init__(self, model, submission):
super(SubmissionJobStore, self).__init__(model)
@@ -2492,3 +2472,5 @@
def delete(self):
del self.model.jobs_by_submission[self.submission]
+
+ super(SubmissionJobStore, self).delete()
Modified: mgmt/trunk/mint/python/mint/cache.py
===================================================================
--- mgmt/trunk/mint/python/mint/cache.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/cache.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -1,5 +1,3 @@
-from threading import RLock
-
class MintCache(object):
def __init__(self):
self.__cache = dict()
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -1,3 +1,4 @@
+from parsley.collectionsex import defaultdict
from parsley.threadingex import Lifecycle
from qpid.datatypes import UUID
from qpid.util import URL
@@ -94,22 +95,6 @@
brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
unique = DatabaseIndex(broker, brokerGroup, unique=True)
-class MintBroker(object):
- def __init__(self, url, qmfBroker):
- self.url = url
- self.qmfBroker = qmfBroker
-
- self.qmfId = None
- self.databaseId = None
-
- self.objectDatabaseIds = MintCache() # database ids by qmf object id
- self.orphans = dict() # updates by qmf object id
-
- self.connected = False
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, self.url)
-
class MintModel(Console, Lifecycle):
staticInstance = None
@@ -121,33 +106,20 @@
self.app = app
- # Lookup tables used for recovering MintBroker objects, which have
- # mint-specific accounting and wrap a qmf broker object
+ # qmfAgentId => MintAgent
+ self.agents = dict()
- self.mintBrokersByQmfBroker = dict()
- self.mintBrokersById = dict()
- self.mintBrokersByUrl = dict()
+ # int seq => callable
+ self.outstandingMethodCalls = dict()
- self.agentsById = dict()
-
- # Agent heartbeats
- # agentId => latest heartbeat timestamp
-
- self.heartbeatsByAgentId = dict()
-
- self.__lock = RLock()
-
- self.dbConn = None
self.qmfSession = None
+ self.qmfBrokers = list()
- self.outstandingMethodCalls = dict()
+ self.lock = RLock()
- def lock(self):
- self.__lock.acquire()
+ self.deferredObjectPropCalls = defaultdict(list)
+ self.deferredObjectStatCalls = defaultdict(list)
- def unlock(self):
- self.__lock.release()
-
def check(self):
pass
@@ -157,11 +129,11 @@
self.qmfSession = Session \
(self, manageConnections=True, rcvObjects=self.app.updateEnabled)
- # clean up any transient objects that a previous instance may have
- # left behind in the DB it's basically an unconstrained agent
+ # Clean up any transient objects that a previous instance may have
+ # left behind in the DB; it's basically an unconstrained agent
# disconnect update, for any agent
- up = update.AgentDisconnectUpdate(self, 0)
+ up = update.AgentDisconnectUpdate(self, None)
self.app.updateThread.enqueue(up)
def do_start(self):
@@ -171,159 +143,96 @@
self.addBroker(uri)
def do_stop(self):
- for mbroker in self.mintBrokersByQmfBroker.values():
- self.delBroker(mbroker)
+ for qbroker in self.qmfBrokers:
+ self.qmfSession.delBroker(qbroker)
def addBroker(self, url):
log.info("Adding qmf broker at %s", url)
- self.lock()
+ self.lock.acquire()
try:
qbroker = self.qmfSession.addBroker(url)
- mbroker = MintBroker(url, qbroker)
-
- # Can't add the by-id mapping here, as the id is not set yet;
- # instead we add it when brokerConnected is called
-
- self.mintBrokersByQmfBroker[qbroker] = mbroker
- self.mintBrokersByUrl[url] = mbroker
-
- return mbroker
+ self.qmfBrokers.append(qbroker)
finally:
- self.unlock()
+ self.lock.release()
- def delBroker(self, mbroker):
- assert isinstance(mbroker, MintBroker)
-
- log.info("Removing qmf broker at %s", mbroker.url)
-
- self.lock()
- try:
- self.qmfSession.delBroker(mbroker.qmfBroker)
-
- del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
- del self.mintBrokersByUrl[mbroker.url]
-
- if mbroker.qmfId:
- del self.mintBrokersById[mbroker.qmfId]
- finally:
- self.unlock()
-
def brokerConnected(self, qbroker):
- """ Invoked when a connection is established to a broker """
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
+ self.log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
- assert mbroker.connected is False
-
- mbroker.connected = True
- finally:
- self.unlock()
-
def brokerInfo(self, qbroker):
- self.lock()
- try:
- # We now have enough information to generate a proper agent ID;
- # it would be much better to get the broker ID earlier, or to
- # simply get a reasonable agent ID with less indirection
+ # Now we have a brokerId to use to generate fully qualified agent
+ # IDs
- for agent in qbroker.getAgents():
- agentId = str(QmfAgentId.fromAgent(agent))
+ for qagent in qbroker.getAgents():
+ self.newMintAgent(qagent)
- log.debug("Adding agent %s", agentId)
-
- self.agentsById[agentId] = agent
-
- id = str(qbroker.getBrokerId())
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- mbroker.qmfId = id
- self.mintBrokersById[id] = mbroker
- finally:
- self.unlock()
-
def brokerDisconnected(self, qbroker):
- """ Invoked when the connection to a broker is lost """
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
+ self.log.info \
+ ("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
- assert mbroker.connected is True
+ def newMintAgent(self, qagent):
+ agent = MintAgent(self, qagent)
- mbroker.connected = False
- finally:
- self.unlock()
-
- def isConnected(self):
- self.lock()
+ self.lock.acquire()
try:
- for broker in self.mintBrokersByUrl.values():
- if broker.connected:
- return True
+ assert agent.id not in self.agents
+ self.agents[agent.id] = agent
finally:
- self.unlock()
+ self.lock.release()
- def getMintBrokerByQmfBroker(self, qbroker):
- self.lock()
- try:
- return self.mintBrokersByQmfBroker[qbroker]
- finally:
- self.unlock()
+ return agent
- def newAgent(self, agent):
- """ Invoked when a QMF agent is discovered. """
+ def getMintAgent(self, qagent):
+ id = str(QmfAgentId.fromAgent(qagent))
+ return self.agents[id]
- log.info("Agent connected: %s", agent)
+ def newAgent(self, qagent):
+ log.info("Creating %s", qagent)
# Some agents come down without a brokerId, meaning we can't
# generate a fully qualified agent ID for them. Those we handle
# in brokerInfo.
- if agent.getBroker().brokerId:
- self.lock()
- try:
- agentId = str(QmfAgentId.fromAgent(agent))
-
- log.debug("Adding agent %s", agentId)
+ if qagent.getBroker().brokerId:
+ agent = self.newMintAgent(qagent)
- self.agentsById[agentId] = agent
- finally:
- self.unlock()
+ # XXX This business is to handle an agent-vs-agent data ordering
+ # problem
- def delAgent(self, agent):
- """ Invoked when a QMF agent disconects. """
+ objectPropCalls = self.deferredObjectPropCalls[agent.id]
- log.info("Agent disconnected: %s", agent)
+ for broker, object in objectPropCalls:
+ self.objectProps(broker, object)
- agentId = str(QmfAgentId.fromAgent(agent))
+ objectStatCalls = self.deferredObjectStatCalls[agent.id]
- self.lock()
+ for broker, object in objectStatCalls:
+ self.objectStats(broker, object)
+
+ def delAgent(self, qagent):
+ log.info("Deleting %s", qagent)
+
+ self.lock.acquire()
try:
- del self.agentsById[agentId]
+ agent = self.getMintAgent(qagent)
+ agent.model = None
+ del self.agents[agent.id]
finally:
- self.unlock()
+ self.lock.release()
- up = update.AgentDisconnectUpdate(self, agentId)
+ up = update.AgentDisconnectUpdate(self, agent)
self.app.updateThread.enqueue(up)
- def heartbeat(self, agent, timestamp):
- agentId = str(QmfAgentId.fromAgent(agent))
+ def heartbeat(self, qagent, timestamp):
timestamp = timestamp / 1000000000
- self.lock()
+ self.lock.acquire()
try:
- self.heartbeatsByAgentId[agentId] = datetime.fromtimestamp(timestamp)
+ agent = self.getMintAgent(qagent)
+ agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
finally:
- self.unlock()
+ self.lock.release()
- def getLatestHeartbeat(self, agentId):
- self.lock()
- try:
- return self.heartbeatsByAgentId.get(agentId)
- finally:
- self.unlock()
-
def newPackage(self, name):
""" Invoked when a QMF package is discovered. """
pass
@@ -333,26 +242,46 @@
used to obtain details about the class."""
pass
- def objectProps(self, broker, record):
+ def objectProps(self, broker, object):
""" Invoked when an object is updated. """
+
if not self.app.updateThread.isAlive():
return
- mbroker = self.getMintBrokerByQmfBroker(broker)
+ self.lock.acquire()
+ try:
+ id = str(QmfAgentId.fromObject(object))
- up = update.PropertyUpdate(self, mbroker, record)
+ try:
+ agent = self.agents[id]
+ except KeyError:
+ self.deferredObjectPropCalls[id].append((broker, object))
+ return
+ finally:
+ self.lock.release()
+ up = update.PropertyUpdate(self, agent, object)
self.app.updateThread.enqueue(up)
- def objectStats(self, broker, record):
+ def objectStats(self, broker, object):
""" Invoked when an object is updated. """
if not self.app.updateThread.isAlive():
return
- mbroker = self.getMintBrokerByQmfBroker(broker)
- up = update.StatisticUpdate(self, mbroker, record)
+ self.lock.acquire()
+ try:
+ id = str(QmfAgentId.fromObject(object))
+ try:
+ agent = self.agents[id]
+ except KeyError:
+ self.deferredObjectStatCalls[id].append((broker, object))
+ return
+ finally:
+ self.lock.release()
+
+ up = update.StatisticUpdate(self, agent, object)
self.app.updateThread.enqueue(up)
def event(self, broker, event):
@@ -363,10 +292,10 @@
classKey = ClassKey(mintObject.qmfClassKey)
objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
- self.lock()
+ self.lock.acquire()
try:
- agent = self.agentsById[mintObject.qmfAgentId]
- broker = agent.getBroker()
+ agent = self.agents[mintObject.qmfAgentId]
+ broker = agent.agent.getBroker()
seq = self.qmfSession._sendMethodRequest \
(broker, classKey, objectId, methodName, args)
@@ -376,15 +305,33 @@
return seq
finally:
- self.unlock()
+ self.lock.release()
def methodResponse(self, broker, seq, response):
log.debug("Method response for request %i received from %s", seq, broker)
log.debug("Response: %s", response)
- self.lock()
+ self.lock.acquire()
try:
methodCallback = self.outstandingMethodCalls.pop(seq)
methodCallback(response.text, response.outArgs)
finally:
- self.unlock()
+ self.lock.release()
+
+class MintAgent(object):
+ def __init__(self, model, agent):
+ self.model = model
+ self.agent = agent
+
+ self.id = str(QmfAgentId.fromAgent(agent))
+
+ self.lastHeartbeat = None
+
+ # qmfObjectId => int database id
+ self.databaseIds = MintCache()
+
+ # qmfObjectId => list of ModelUpdate objects
+ self.deferredUpdates = defaultdict(list)
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.id)
Modified: mgmt/trunk/mint/python/mint/sql.py
===================================================================
--- mgmt/trunk/mint/python/mint/sql.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/sql.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -227,9 +227,9 @@
class SqlAgentDisconnect(SqlOperation):
- def __init__(self, useAgentId = True):
+ def __init__(self, agent):
super(SqlAgentDisconnect, self).__init__("disconnect_agent")
- self.useAgentId = useAgentId
+ self.agent = agent
def generate(self):
sql = ""
@@ -239,7 +239,7 @@
set qmf_delete_time = now()
where qmf_persistent = 'f'
and qmf_delete_time is null""" % (dbStyle.pythonClassToDBTable(cls))
- if self.useAgentId:
+ if self.agent:
sql += """
and qmf_agent_id = %(qmf_agent_id)s;
"""
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -50,25 +50,21 @@
def run(self):
while True:
+ if self.stopRequested:
+ break
+
try:
update = self.updates.get(True, 1)
-
- log.debug("Processing %s", update)
-
- self.dequeueCount += 1
-
- if self.stopRequested:
- break
except Empty:
- if self.stopRequested:
- break
- else:
- #log.debug("Queue is empty")
- continue
+ continue
+ self.dequeueCount += 1
+
self.processUpdate(update)
def processUpdate(self, update):
+ log.debug("Processing %s", update)
+
try:
update.process(self)
@@ -77,30 +73,17 @@
# commit only every "commitThreshold" updates, or whenever
# the update queue is empty
- self.commit()
+ update.commit()
+ self.conn.commit()
except:
log.exception("Update failed")
- self.rollback()
+ update.rollback()
+ self.conn.rollback()
def cursor(self):
return self.conn.cursor()
- def commit(self):
- self.conn.commit()
-
- for broker in self.app.model.mintBrokersByQmfBroker.values():
- broker.objectDatabaseIds.commit()
-
- def rollback(self):
- try:
- self.conn.rollback()
- except:
- log.exception("Rollback failed")
-
- for broker in self.app.model.mintBrokersByQmfBroker.values():
- broker.objectDatabaseIds.rollback()
-
class ReferenceException(Exception):
def __init__(self, sought):
self.sought = sought
@@ -109,15 +92,19 @@
return repr(self.sought)
class ModelUpdate(object):
- def __init__(self, model, broker, object):
+ def __init__(self, model, agent, object):
+ if agent:
+ from mint.model import MintAgent
+ assert isinstance(agent, MintAgent)
+
self.model = model
- self.broker = broker
+ self.agent = agent
self.object = object
self.priority = 0
def __repr__(self):
return "%s(%s, %s, %i)" % (self.__class__.__name__,
- str(self.broker),
+ str(self.agent),
str(self.object),
self.priority)
@@ -143,6 +130,7 @@
name = mint.schema.schemaReservedWordsMap.get(name, name)
if key.type == 10:
+ # XXX don't want oid around much
self.processReference(name, value, results)
continue
@@ -180,9 +168,10 @@
foreignKey = name + "_id"
- id = self.broker.objectDatabaseIds.get(oid)
+ id = self.agent.databaseIds.get(str(QmfObjectId(oid.first, oid.second)))
if id is None:
+ # XXX don't want oid around much
raise ReferenceException(oid)
results[foreignKey] = id
@@ -192,6 +181,18 @@
t = datetime.fromtimestamp(tstamp / 1000000000)
results[name] = t
+ def commit(self):
+ # XXX commit db here too
+
+ if self.agent:
+ self.agent.databaseIds.commit()
+
+ def rollback(self):
+ # XXX rollback db here too
+
+ if self.agent:
+ self.agent.databaseIds.rollback()
+
class PropertyUpdate(ModelUpdate):
def process(self, thread):
try:
@@ -202,18 +203,14 @@
thread.dropCount += 1
return
- oid = self.object.getObjectId()
+ qmfObjectId = str(QmfObjectId.fromObject(self.object))
try:
attrs = self.processAttributes(self.object.getProperties(), cls)
except ReferenceException, e:
log.info("Referenced object %r not found", e.sought)
- try:
- orphans = self.broker.orphans[oid]
- orphans.append(self)
- except KeyError:
- self.broker.orphans[oid] = list((self,))
+ self.agent.deferredUpdates[qmfObjectId].append(self)
thread.deferCount += 1
return
@@ -226,12 +223,13 @@
if delete != 0:
self.processTimestamp("qmfDeleteTime", delete, attrs)
- log.debug("%s(%s) marked deleted", cls.__name__, oid)
+ log.debug("%s(%s,%s) marked deleted",
+ cls.__name__, self.agent.id, qmfObjectId)
- attrs["qmfAgentId"] = str(QmfAgentId.fromObject(self.object))
+ attrs["qmfAgentId"] = self.agent.id
attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(QmfObjectId.fromObject(self.object))
- attrs["qmfPersistent"] = oid.isDurable()
+ attrs["qmfObjectId"] = str(qmfObjectId)
+ attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
cursor = thread.cursor()
@@ -241,7 +239,7 @@
# 2. Object is in mint's db, but id is not yet cached
# 3. Object is in mint's db, and id is cached
- id = self.broker.objectDatabaseIds.get(oid)
+ id = self.agent.databaseIds.get(qmfObjectId)
if id is None:
# Case 1 or 2
@@ -273,7 +271,7 @@
assert cursor.rowcount == 1
- self.broker.objectDatabaseIds.set(oid, id)
+ self.agent.databaseIds.set(qmfObjectId, id)
else:
# Case 3
@@ -285,14 +283,14 @@
#assert cursor.rowcount == 1
try:
- orphans = self.broker.orphans.pop(oid)
+ updates = self.agent.deferredUpdates.pop(qmfObjectId)
- if orphans:
+ if updates:
log.info("Re-enqueueing %i orphans whose creation had been deferred",
- len(orphans))
+ len(updates))
- for orphan in orphans:
- thread.enqueue(orphan)
+ for update in updates:
+ thread.enqueue(update)
except KeyError:
pass
@@ -308,9 +306,10 @@
statsCls = getattr(mint, "%sStats" % cls.__name__)
- oid = self.object.getObjectId()
- id = self.broker.objectDatabaseIds.get(oid)
+ qmfObjectId = str(QmfObjectId.fromObject(self.object))
+ id = self.agent.databaseIds.get(qmfObjectId)
+
if id is None:
thread.dropCount += 1
return
@@ -332,7 +331,7 @@
thread.dropCount += 1
return
- attrs["qmfUpdateTime"] = t > tnow and tnow or t
+ attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want this
attrs["%s_id" % cls.sqlmeta.table] = id
cursor = thread.cursor()
@@ -353,14 +352,14 @@
attrs = thread.app.expireThread.attrs
total = 0
- thread.commit()
+ thread.conn.commit()
for op in thread.app.expireThread.ops:
log.debug("Running expire op %s", op)
count = op.execute(cursor, attrs)
- thread.commit()
+ thread.conn.commit()
log.debug("%i records expired", count)
@@ -370,21 +369,21 @@
thread.expireUpdateCount += 1
-
class AgentDisconnectUpdate(ModelUpdate):
- def __init__(self, model, agentId):
- super(AgentDisconnectUpdate, self).__init__(model, None, None)
- self.agentId = agentId
+ def __init__(self, model, agent):
+ super(AgentDisconnectUpdate, self).__init__(model, agent, None)
def process(self, thread):
cursor = thread.cursor()
- useAgentId = self.agentId != 0
- op = SqlAgentDisconnect(useAgentId)
- if useAgentId:
- op.execute(cursor, {"qmf_agent_id": self.agentId})
- else:
- op.execute(cursor)
+ args = dict()
+
+ if self.agent:
+ args["qmf_agent_id"] = self.agent.id
+
+ op = SqlAgentDisconnect(agent)
+ op.execute(cursor, args)
+
class UpdateQueue(ConcurrentQueue):
def __init__(self, maxsize=0, slotCount=1):
self.slotCount = slotCount
14 years, 12 months
rhmessaging commits: r3768 - mgmt/trunk/parsley/python/parsley.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 10:41:13 -0500 (Fri, 08 Jan 2010)
New Revision: 3768
Added:
mgmt/trunk/parsley/python/parsley/collectionsex.py
Log:
A new extension library for supporting defaultdict on older pythons
Added: mgmt/trunk/parsley/python/parsley/collectionsex.py
===================================================================
--- mgmt/trunk/parsley/python/parsley/collectionsex.py (rev 0)
+++ mgmt/trunk/parsley/python/parsley/collectionsex.py 2010-01-08 15:41:13 UTC (rev 3768)
@@ -0,0 +1,17 @@
+try:
+ from collections import defaultdict
+except ImportError:
+ class defaultdict(dict):
+ def __init__(self, default_factory=None):
+ super(dict, self).__init__()
+
+ self.default_factory = default_factory
+
+ def __getitem__(self, key):
+ try:
+ super(dict, self).__getitem__(key)
+ except KeyError:
+ if self.default_factory is None:
+ raise
+ else:
+ return self.default_factory()
14 years, 12 months
rhmessaging commits: r3767 - mgmt/trunk/mint/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 10:27:06 -0500 (Fri, 08 Jan 2010)
New Revision: 3767
Modified:
mgmt/trunk/mint/bin/mint-database
Log:
Clarify that configure modify's the config of the server, not a database object
Modified: mgmt/trunk/mint/bin/mint-database
===================================================================
--- mgmt/trunk/mint/bin/mint-database 2010-01-06 20:43:02 UTC (rev 3766)
+++ mgmt/trunk/mint/bin/mint-database 2010-01-08 15:27:06 UTC (rev 3767)
@@ -138,14 +138,14 @@
check-environment
if test -f $pghbaconf && run "grep ${dbname} ${pghbaconf}"; then
- echo "The database appears to have been configured already."
+ echo "The database server appears to have been configured already."
exit 1
fi
local i_stopped_postgres=""
if run "/sbin/service postgresql status"; then
- echo "The database is running. To proceed with configuration, I need to stop it"
+ echo "The database server is running. To proceed with configuration, I need to stop it"
echo "(I'll start it again after I'm done)."
if confirmed; then
@@ -155,7 +155,7 @@
fi
test -d /var/lib/pgsql/data || {
- echo "The database is not initialized. To proceed, I need to initialize it."
+ echo "The database server is not initialized. To proceed, I need to initialize it."
if confirmed; then
initdb
@@ -168,7 +168,7 @@
run "/sbin/service postgresql start"
fi
- echo "The database is configured. You can now run 'mint-database create'."
+ echo "The database server is configured. You can now run 'mint-database create'."
# chkconfig stuff ?
;;
@@ -195,7 +195,7 @@
echo "Usage: mint-database COMMAND"
echo "Commands:"
echo " status Check the database"
- echo " configure Modify the database configuration"
+ echo " configure Modify the database server configuration"
echo " create Create the mint user and database"
echo " destroy Discard the mint user, database, and all data"
exit 1
14 years, 12 months
rhmessaging commits: r3766 - store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-01-06 15:43:02 -0500 (Wed, 06 Jan 2010)
New Revision: 3766
Modified:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
call createExchange on the store, now that it has been decoupled from registration
Modified: store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-01-06 20:39:17 UTC (rev 3765)
+++ store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-01-06 20:43:02 UTC (rev 3766)
@@ -141,6 +141,7 @@
assertTrue("Exchange is not durable", exchange.isDurable());
_virtualHost.getExchangeRegistry().registerExchange(exchange);
+ _store.createExchange(exchange);
//Ensure it is registered correctly
exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
14 years, 12 months
rhmessaging commits: r3765 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-01-06 15:39:17 -0500 (Wed, 06 Jan 2010)
New Revision: 3765
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
If the queue was not in the store, check that there isnt already a queue in the virtualhost from earlier in the startup cycle before creating a new queue to deliver any recovered messages to, preventing creation and registration of a duplicate MBean
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-01-06 20:37:41 UTC (rev 3764)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-01-06 20:39:17 UTC (rev 3765)
@@ -1651,8 +1651,21 @@
AMQShortString queueName = dd.queueName;
AMQQueue queue = queues.get(queueName);
+
+ // If the matching queue was not already found in the store, check in case a queue
+ // with the same name exists in the virtualhost, otherwise we will create a duplicate
+ // queue and generate a JMX InstanceAlreadyExistsException, preventing startup.
if (queue == null)
{
+ queue = _virtualHost.getQueueRegistry().getQueue(queueName);
+ if (queue != null)
+ {
+ queues.put(queueName, queue);
+ }
+ }
+
+ if (queue == null)
+ {
queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queueName, queue);
14 years, 12 months
rhmessaging commits: r3764 - store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-01-06 15:37:41 -0500 (Wed, 06 Jan 2010)
New Revision: 3764
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Add the virtualhost name to the default environment to prevent overlap when failing to specify a location. Also make default location adhere to QPID_WORK
Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-01-06 20:33:27 UTC (rev 3763)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2010-01-06 20:37:41 UTC (rev 3764)
@@ -192,7 +192,7 @@
super.configure(virtualHost, base, vHostConfig);
Configuration config = vHostConfig.getStoreConfiguration();
- File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
+ File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + "/bdbstore/" + virtualHost.getName()));
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
14 years, 12 months